summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend_api.py135
-rw-r--r--src/basealt_bot_frontend/__init__.py2
-rw-r--r--src/bot.py166
-rw-r--r--src/botlogging.py24
-rw-r--r--src/config.example.py3
-rw-r--r--src/keyboards.py123
-rw-r--r--src/states.py568
-rw-r--r--src/utils.py13
8 files changed, 1032 insertions, 2 deletions
diff --git a/src/backend_api.py b/src/backend_api.py
new file mode 100644
index 0000000..94841f9
--- /dev/null
+++ b/src/backend_api.py
@@ -0,0 +1,135 @@
+import requests
+import urllib
+from config import BACKEND_URL
+import json
+import datetime as dt
+
+from typing import Tuple, Dict
+
+from botlogging import logger
+
+
+def make_request(method: str, url: str, **kwargs) -> Tuple[int, Dict]:
+ try:
+ response = requests.request(method, url, **kwargs)
+ answer = response.json()
+
+ except Exception as e:
+ logger.debug(f"Got exception while making request: {e}")
+ return 500, {}
+
+ logger.debug(
+ f"Got response from backend: "
+ f"Status={response.status_code}; "
+ f"Text={response.text[:200]}..."
+ )
+
+ return response.status_code, answer
+
+
+def post_request(url: str, **kwargs):
+ return make_request("post", url, **kwargs)
+
+
+def put_request(url: str, **kwargs):
+ return make_request("put", url, **kwargs)
+
+
+def get_request(url: str, **kwargs):
+ return make_request("get", url, **kwargs)
+
+
+def patch_request(url: str, **kwargs):
+ return make_request("patch", url, **kwargs)
+
+
+def register_user(tg_id: int, username: str, fullname: str) -> Tuple[int, Dict]:
+ logger.debug(f"Trying to register user with id={tg_id}; username={username}")
+ return post_request(f"{BACKEND_URL}/profiles/", data={
+ "tg_id": tg_id,
+ "username": username,
+ "fullname": fullname
+ })
+
+
+def get_profiles():
+ logger.debug(f"Trying to retrieve all profiles")
+ return get_request(f"{BACKEND_URL}/profiles/")
+
+
+def get_tasks():
+ logger.debug(f"Trying to retrieve all tasks")
+ return get_request(f"{BACKEND_URL}/tasks/")
+
+
+def get_published_tasks():
+ logger.debug(f"Trying to retrieve all published tasks")
+ return get_request(f"{BACKEND_URL}/api/tasks/published/")
+
+
+def get_task(title: str) -> Tuple[int, Dict]:
+ logger.debug(f"Trying to retrieve task with title={title}")
+ return get_request(f"{BACKEND_URL}/api/get_task/" + urllib.parse.quote(title))
+
+
+def save_state(last_state: int, tg_id: int, user_data: dict) -> Tuple[int, Dict]:
+ user_data_dumped = json.dumps(user_data)
+ logger.debug(f"Trying to save state for user with id={tg_id}; state={last_state}; user_data={user_data_dumped}")
+
+ return put_request(f"{BACKEND_URL}/api/state/update/{tg_id}/", data={
+ "last_state": last_state,
+ "tg_id": tg_id,
+ "user_data": user_data_dumped
+ })
+
+
+def get_state(tg_id: int) -> Tuple[int, dict]:
+ logger.debug(f"Trying to get state for user with id={tg_id}")
+ return get_request(f"{BACKEND_URL}/api/state/get/{tg_id}/")
+
+
+def create_attempt(tg_id: int, task_title: str, answer: str):
+ return post_request(f"{BACKEND_URL}/attempts/", data={
+ "profile": json.dumps({"tg_id": tg_id}),
+ "task": json.dumps({"title": task_title}),
+ "answer": answer
+ })
+
+
+def get_attempts(tg_id: int=None, task_title: str=None):
+ data = {}
+ if tg_id is not None:
+ data["tg_id"] = tg_id
+ if task_title is not None:
+ data["task_title"] = task_title
+
+ return get_request(f"{BACKEND_URL}/api/attempts/", data=data)
+
+
+def get_profile(tg_id: int):
+ return get_request(f"{BACKEND_URL}/api/profile/get/{tg_id}")
+
+
+def publish_task(title: str):
+ url_title = urllib.parse.quote(title)
+ code, resp = get_task(title)
+
+ if code != 200:
+ return code, {}
+
+ if resp["first_published"] is None:
+ return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={
+ "first_published": dt.datetime.now(),
+ "is_public": True
+ })
+ else:
+ return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={
+ "is_public": True
+ })
+
+
+def hide_task(title: str):
+ url_title = urllib.parse.quote(title)
+ return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={
+ "is_public": False
+ })
diff --git a/src/basealt_bot_frontend/__init__.py b/src/basealt_bot_frontend/__init__.py
deleted file mode 100644
index 4268070..0000000
--- a/src/basealt_bot_frontend/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-def hello() -> str:
- return "Hello from basealt-bot-frontend!"
diff --git a/src/bot.py b/src/bot.py
new file mode 100644
index 0000000..37e0ac8
--- /dev/null
+++ b/src/bot.py
@@ -0,0 +1,166 @@
+from telegram.ext import Updater, CommandHandler, ConversationHandler, MessageHandler, Filters, CallbackQueryHandler
+from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, InlineKeyboardMarkup, InlineKeyboardButton
+
+# Typing
+from telegram import Update, User, Bot
+
+from os import getenv
+import json
+from config import TG_TOKEN, REQUEST_KWARGS
+
+import backend_api
+from keyboards import (
+ MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard,
+ AnsweringKeyboard, AdminKeyboard, BackToMenuKeyboard
+)
+from utils import *
+from states import States, AdminStates
+
+from botlogging import logger
+
+
+def start(bot: Bot, update: Update, user_data: dict):
+ user_data.update({"chosen_task": None})
+
+ update.message.reply_text(
+ "Квиз Базальт СПО на День работодателя КНиИТ СГУ",
+ reply_markup=ReplyKeyboardRemove()
+ )
+
+ user: User = update.message.from_user
+ if user.username is None:
+ return States.wait_for_username(bot, update, user_data)
+
+ else:
+ logger.debug(backend_api.register_user(user.id, user.username, user.full_name))
+ return States.main_menu(bot, update, user_data)
+
+
+def username_check(bot: Bot, update: Update, user_data):
+ user: User = update.message.from_user
+ if user.username is None:
+ return States.wait_for_username(bot, update, user_data)
+
+ else:
+ logger.debug(backend_api.register_user(user.id, user.username, user.full_name))
+ return States.main_menu(bot, update, user_data)
+
+
+def resume_bot(bot: Bot, update: Update, user_data):
+ status_code, state = backend_api.get_state(update.message.from_user.id)
+
+ user_data.update(json.loads(state["user_data"]))
+ last_state = state["last_state"]
+
+ for handler in conversation_handler.states[last_state]:
+ if handler.filters.filter(update):
+ return handler.callback(bot, update, user_data)
+
+ return last_state
+
+
+def stop(bot, update):
+ update.message.reply_text('Пока!', reply_markup=ReplyKeyboardRemove())
+ update.message.reply_text('Для того, чтобы начать работу с ботом заново напишите /start')
+ return ConversationHandler.END
+
+
+def error(bot, update, error):
+ logger.warning('Update "%s" caused error "%s"', update, error)
+
+
+def main():
+ is_prod = getenv('production', '')
+
+ if is_prod:
+ print('PRODUCTION ENVIRONMENT')
+ updater = Updater(TG_TOKEN)
+ else:
+ print('DEVELOPMENT ENVIRONMENT')
+ updater = Updater(TG_TOKEN, request_kwargs=REQUEST_KWARGS)
+
+ dp = updater.dispatcher
+ dp.add_error_handler(error)
+ dp.add_handler(conversation_handler)
+
+ updater.start_polling()
+ updater.idle()
+
+
+conversation_handler = ConversationHandler(
+ entry_points=[
+ CommandHandler('start', start, pass_user_data=True),
+ MessageHandler(Filters.text, resume_bot, pass_user_data=True),
+ ],
+
+ states={
+ WAIT_FOR_USERNAME: [MessageHandler(Filters.text, username_check, pass_user_data=True)],
+
+ MAIN_MENU: [
+ MessageHandler(Filters.regex(MenuKeyboard.CHOOSE_TASK), States.choose_task, pass_user_data=True),
+ MessageHandler(Filters.regex(MenuKeyboard.TOP_10), States.top_10, pass_user_data=True),
+ MessageHandler(Filters.regex(MenuKeyboard.RULES), States.rules, pass_user_data=True),
+ CommandHandler('admin', AdminStates.admin_panel, pass_user_data=True)
+ ],
+ TASK_CHOOSING: [
+ MessageHandler(Filters.regex(TasksKeyboard.CANCEL), States.main_menu, pass_user_data=True),
+ MessageHandler(Filters.text, States.show_task, pass_user_data=True),
+ ],
+ TASK_SHOWN: [
+ MessageHandler(Filters.regex(TaskChosenKeyboard.CANCEL), States.choose_task, pass_user_data=True),
+ MessageHandler(Filters.text, States.accept_answer, pass_user_data=True),
+ # MessageHandler(Filters.regex(TaskChosenKeyboard.TYPE_ANSWER), States.type_answer, pass_user_data=True),
+ ],
+ ANSWERING: [
+ MessageHandler(Filters.regex(AnsweringKeyboard.CANCEL), States.show_task, pass_user_data=True),
+ MessageHandler(Filters.text, States.accept_answer, pass_user_data=True),
+ ],
+ ANSWER_RIGHT: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)],
+ ANSWER_WRONG: [MessageHandler(Filters.text, States.show_task, pass_user_data=True)],
+
+ ASKING_QUESTION: [
+ MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), States.main_menu, pass_user_data=True),
+ MessageHandler(Filters.text, States.ask_question, pass_user_data=True),
+ ],
+
+ # ADMIN PANEL
+
+ ADMIN_MENU: [
+ MessageHandler(Filters.regex(AdminKeyboard.CANCEL), States.main_menu, pass_user_data=True),
+ MessageHandler(Filters.regex(AdminKeyboard.PUBLISH_TASK), AdminStates.choose_task_publish, pass_user_data=True),
+ MessageHandler(Filters.regex(AdminKeyboard.HIDE_TASK), AdminStates.choose_task_hide, pass_user_data=True),
+ MessageHandler(Filters.regex(AdminKeyboard.ANNOUNCE), AdminStates.wait_for_announcement, pass_user_data=True),
+ MessageHandler(Filters.regex(AdminKeyboard.MESSAGE_PLAYER), AdminStates.wait_for_message, pass_user_data=True),
+ ],
+
+ ADMIN_WAIT_FOR_ANNOUNCEMENT: [
+ MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True),
+ MessageHandler(Filters.text, AdminStates.announce_message, pass_user_data=True),
+ ],
+ ADMIN_WAIT_FOR_MESSAGE: [
+ MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True),
+ MessageHandler(Filters.text, AdminStates.message_plr, pass_user_data=True),
+ ],
+
+ ADMIN_TASK_CHOOSE_HIDE: [
+ MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True),
+ MessageHandler(Filters.text, AdminStates.hide_task, pass_user_data=True),
+ ],
+
+ ADMIN_TASK_CHOOSE_PUBLISH: [
+ MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True),
+ MessageHandler(Filters.text, AdminStates.publish_task, pass_user_data=True),
+ ],
+
+ ADMIN_TASK_PUBLISHED: [MessageHandler(Filters.text, AdminStates.admin_panel, pass_user_data=True)],
+ ADMIN_ACCESS_DENIED: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)],
+ },
+
+ fallbacks=[
+ CommandHandler('stop', stop),
+ MessageHandler(Filters.regex(MenuKeyboard.HELP), States.prompt_question, pass_user_data=True),
+ ]
+)
+
+if __name__ == '__main__':
+ main()
diff --git a/src/botlogging.py b/src/botlogging.py
new file mode 100644
index 0000000..0a08a8d
--- /dev/null
+++ b/src/botlogging.py
@@ -0,0 +1,24 @@
+import logging
+from pathlib import Path
+
+Path("logs").mkdir(exist_ok=True)
+
+fh = logging.FileHandler("logs/log.log")
+fh.setLevel(logging.DEBUG)
+
+ch = logging.StreamHandler()
+ch.setLevel(logging.DEBUG)
+
+log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+fh.setFormatter(log_format)
+ch.setFormatter(log_format)
+
+logging.basicConfig(
+ level=logging.DEBUG, handlers=[fh, ch],
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+logger.addHandler(fh)
+logger.addHandler(ch)
diff --git a/src/config.example.py b/src/config.example.py
new file mode 100644
index 0000000..ba0e5bc
--- /dev/null
+++ b/src/config.example.py
@@ -0,0 +1,3 @@
+TG_TOKEN = ""
+REQUEST_KWARGS = {}
+BACKEND_URL = "http://localhost:8000"
diff --git a/src/keyboards.py b/src/keyboards.py
new file mode 100644
index 0000000..aa8d761
--- /dev/null
+++ b/src/keyboards.py
@@ -0,0 +1,123 @@
+import backend_api
+from abc import ABC
+
+
+class Keyboard(ABC):
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ pass
+
+
+class MenuKeyboard(Keyboard):
+ CHOOSE_TASK = "Выбрать задание📚"
+ TOP_10 = "hidden Топ-10📊"
+ RULES = "Правилаℹ️"
+ ADMIN = "/admin"
+ HELP = "Связаться с Базальт СПО🐧"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None, solved=False):
+ if telegram_id is not None:
+ status_code, data = backend_api.get_profile(telegram_id)
+ if status_code == 200 and data["is_admin"]:
+ return [
+ [cls.ADMIN],
+ [cls.CHOOSE_TASK],
+ [cls.TOP_10, cls.RULES],
+ [cls.HELP],
+ ]
+
+ if solved:
+ return [
+ [cls.RULES],
+ [cls.HELP],
+ ]
+ else:
+ return [
+ [cls.CHOOSE_TASK],
+ [cls.RULES],
+ [cls.HELP],
+ ]
+
+
+class BackToMenuKeyboard(Keyboard):
+ CANCEL = "Вернуться в меню↩️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ return [[cls.CANCEL]]
+
+
+class TasksKeyboard(Keyboard):
+ CANCEL = "Вернуться в меню↩️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ status, tasks = backend_api.get_published_tasks()
+ titles_keyboard = [[cls.CANCEL]]
+ if status == 200:
+ titles = []
+ for task in sorted(tasks, key=lambda t: t.get("title")):
+ titles.append([task.get("title")])
+ titles_keyboard.extend(titles)
+
+ return titles_keyboard
+
+
+class PublishTasksKeyboard(Keyboard):
+ CANCEL = "Вернуться в меню↩️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ status, tasks = backend_api.get_tasks()
+ titles_keyboard = [[cls.CANCEL]]
+ if status == 200:
+ titles_keyboard.extend([task.get("title")] for task in tasks)
+
+ return titles_keyboard
+
+
+class TaskChosenKeyboard(Keyboard):
+ TYPE_ANSWER = "Ввести ответ✏️"
+ CANCEL = "Назад↩️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ return [
+ # [cls.TYPE_ANSWER],
+ [cls.CANCEL],
+ ]
+
+
+class ContinueKeyboard(Keyboard):
+ CONTINUE = "Продолжить➡️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ return [[cls.CONTINUE]]
+
+
+class AnsweringKeyboard(Keyboard):
+ CANCEL = "Отмена↩️"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ return [[cls.CANCEL]]
+
+
+class AdminKeyboard(Keyboard):
+ CANCEL = "Вернуться в меню↩️"
+ PUBLISH_TASK = "Опубликовать задачу"
+ HIDE_TASK = "Скрыть задачу"
+ ANNOUNCE = "Сделать объявление"
+ MESSAGE_PLAYER = "Написать сообщение от имени бота"
+
+ @classmethod
+ def get_keyboard(cls, telegram_id=None):
+ return [
+ [cls.CANCEL],
+ [cls.PUBLISH_TASK],
+ [cls.HIDE_TASK],
+ [cls.ANNOUNCE],
+ [cls.MESSAGE_PLAYER],
+ ]
diff --git a/src/states.py b/src/states.py
new file mode 100644
index 0000000..afb00cb
--- /dev/null
+++ b/src/states.py
@@ -0,0 +1,568 @@
+from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove
+
+from keyboards import (
+ MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard,
+ AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard
+)
+from utils import *
+import backend_api
+from time import sleep
+import datetime as dt
+from decimal import Decimal
+
+# Typing
+from telegram import Update, User, Bot
+from typing import List
+
+from botlogging import logger
+
+
+def save_state(func):
+ def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs):
+ last_state = func(bot, update, user_data, *args, **kwargs)
+ backend_api.save_state(last_state, update.message.from_user.id, user_data)
+ return last_state
+
+ return wrapper
+
+
+def calc_score(t, base_score=1000):
+ base_score = Decimal(base_score)
+ min_score = base_score * Decimal(0.1)
+ max_t = Decimal(720)
+ k = (base_score - min_score) / (max_t * max_t)
+
+ if t >= max_t:
+ return round(min_score, 2)
+
+ return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2)
+
+
+class States:
+ @staticmethod
+ @save_state
+ def prompt_question(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите свой вопрос, он будет перенаправлен представителям компании. "
+ "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ASKING_QUESTION
+
+ @staticmethod
+ @save_state
+ def ask_question(bot: Bot, update: Update, user_data: dict):
+ question = update.message.text
+
+ code, profiles = backend_api.get_profiles()
+ if code != 200:
+ pass
+
+ user_id = update.message.from_user.id
+ username = update.message.from_user.full_name
+
+ for profile in profiles:
+ if profile["is_admin"]:
+ bot.send_message(
+ profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}",
+ parse_mode="Markdown"
+ )
+
+ update.message.reply_text(
+ "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return States.main_menu(bot, update, user_data)
+
+
+ @staticmethod
+ @save_state
+ def wait_for_username(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "По правилам квиза ты не можешь участвовать, если у тебя не указано "
+ "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!",
+ reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']])
+ )
+ return WAIT_FOR_USERNAME
+
+ @staticmethod
+ @save_state
+ def main_menu(bot: Bot, update: Update, user_data: dict):
+ user_data["chosen_task"] = None
+
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+
+ menu_text = [
+
+ ]
+ solved = False
+
+ full_score = Decimal(0.0)
+ if status_code == 200:
+ if len(response) == 5:
+ solved = True
+ if len(response) != 0:
+ menu_text.append("Твои решенные задачи:")
+
+ for attempt in response:
+ try:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ")
+
+ try:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ")
+
+ t = Decimal((ts - fp).total_seconds()) / Decimal(60)
+ # plr_score = calc_score(t, attempt["task"]["base_score"])
+ plr_score = int(attempt["task"]["base_score"])
+ full_score += plr_score
+
+ menu_text.append(
+ f"_{attempt['task']['title']}_ "
+ f"({plr_score})"
+ )
+ else:
+ menu_text.append("У тебя еще нет решенных задач")
+ else:
+ menu_text.append(
+ "К сожалению, не удалось получить данные о твоих попытках =(\n"
+ "Попробуй обратиться к боту чуть позже."
+ )
+
+ menu_text.append(f"\n*Итоговый счет*: {full_score}")
+
+ update.message.reply_text("\n".join(menu_text), parse_mode="Markdown")
+
+ if solved:
+ update.message.reply_text(
+ "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*",
+ parse_mode="Markdown"
+ )
+
+ update.message.reply_text(
+ "Выбери следующее действие...",
+ reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved))
+ )
+
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def choose_task(bot: Bot, update: Update, user_data: dict):
+ user_data["chosen_task"] = None
+
+ status, published = backend_api.get_published_tasks()
+ if len(published) == 0:
+ update.message.reply_text(
+ "Пока что не опубликовано ни одной задачи =(",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ # okay this is epic (pile of shit)
+ return ANSWER_RIGHT
+ else:
+ update.message.reply_text(
+ "Какую задачу ты хочешь сдать?",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+ return TASK_CHOOSING
+
+ @staticmethod
+ @save_state
+ def top_10(bot: Bot, update: Update, user_data: dict):
+ code, attempts = backend_api.get_attempts()
+
+ top = {}
+
+ for attempt in attempts:
+ if not attempt["solved"] or attempt["profile"]["is_hidden"]:
+ continue
+
+ if attempt["profile"]["tg_id"] not in top:
+ top[attempt["profile"]["tg_id"]] = {
+ "fullname": attempt["profile"]["fullname"],
+ "username": attempt["profile"]["username"],
+ "score": Decimal(0.0)
+ }
+
+ try:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ")
+
+ try:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ")
+
+ t = Decimal((ts - fp).total_seconds()) / Decimal(60)
+ plr_score = calc_score(t, attempt["task"]["base_score"])
+ top[attempt["profile"]["tg_id"]]["score"] += plr_score
+
+ try:
+ top_list = []
+ for tg_id, stats in top.items():
+ top_list.append((
+ stats['score'], tg_id,
+ f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts"
+ ))
+
+ top_list.sort(key=lambda p: p[0], reverse=True)
+
+ top_10_ids = []
+ places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)}
+
+ top = ["Топ-10:"]
+ for place, (score, tg_id, text) in enumerate(top_list[:10], 1):
+ top.append(str(place).rjust(2, " ") + ". " + text)
+ top_10_ids.append(tg_id)
+
+ if update.message.from_user.id not in top_10_ids:
+ if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11:
+ top.append("...")
+ this_place = places[update.message.from_user.id]
+ top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2])
+
+ except Exception as e:
+ print(e)
+
+ update.message.reply_text("\n".join(top))
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def rules(bot: Bot, update: Update, user_data: dict):
+ r = """Вот несколько правил, которые следует соблюдать:
+1) Всего в квизе 5 задач - после решения который можно получить призы.
+2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать!
+3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время!
+4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт."""
+ # with open("rules.jpg", "rb") as f:
+ # try:
+ # print(update.message.reply_photo(f, timeout=5))
+ # except Exception as e:
+ # print(e)
+ update.message.reply_text(r)
+
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def show_task(bot: Bot, update: Update, user_data: dict):
+ if "chosen_task" in user_data and user_data["chosen_task"] is not None:
+ task_title = user_data["chosen_task"]
+ else:
+ task_title = update.message.text
+ user_data["chosen_task"] = task_title
+
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+ if status_code != 200:
+ user_data["chosen_task"] = None
+
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ if len(response) != 0:
+ user_data["chosen_task"] = None
+
+ update.message.reply_text(
+ "Ты уже решил эту задачу! Выбери другую.",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ status_code, tasks_response = backend_api.get_published_tasks()
+ if status_code != 200:
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return MAIN_MENU
+
+ titles = {task.get("title"): task for task in tasks_response}
+
+ if task_title not in titles.keys():
+ update.message.reply_text(
+ "Такой задачи не найдено, попробуй ввести другое название!",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ task = titles[task_title]
+
+ message = '\n'.join([
+ f"*{task['title']}*",
+ f"{task['statement']}",
+ ])
+ keyboard = TaskChosenKeyboard.get_keyboard()
+
+ update.message.reply_text(
+ message, parse_mode="Markdown",
+ reply_markup=ReplyKeyboardMarkup(keyboard)
+ )
+
+ update.message.reply_text(
+ "Вводи свой ответ и я его проверю, "
+ "или нажми кнопку Назад, чтобы выбрать другую задачу"
+ )
+
+ return TASK_SHOWN
+
+ @staticmethod
+ @save_state
+ def type_answer(bot: Bot, update: Update, user_data: dict):
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+ if len(response) != 0:
+ update.message.reply_text(
+ "Ты уже решил эту задачу! Выбери другую.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ else:
+ update.message.reply_text(
+ "Вводи свой ответ, я его проверю.",
+ reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard())
+ )
+ return ANSWERING
+
+ @staticmethod
+ @save_state
+ def accept_answer(bot: Bot, update: Update, user_data: dict):
+ answer = update.message.text
+ status_code, task = backend_api.get_task(user_data["chosen_task"])
+ if status_code == 200:
+ backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower())
+
+ if answer.lower() == task["answer"].lower():
+ update.message.reply_text(
+ "Ты ввел правильный ответ! Возвращаемся в главное меню."
+ )
+ return States.main_menu(bot, update, user_data)
+ else:
+ update.message.reply_text(
+ "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.",
+ reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard())
+ )
+
+ # return ANSWER_WRONG
+ return TASK_SHOWN
+
+ else:
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ANSWER_RIGHT
+
+
+class AdminStates:
+ @staticmethod
+ @save_state
+ def admin_panel(bot: Bot, update: Update, user_data: dict):
+ status_code, data = backend_api.get_profile(update.message.from_user.id)
+ if status_code != 200:
+ update.message.reply_text(
+ "Не удалось аутентифицировать пользователя. Доступ запрещен.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_ACCESS_DENIED
+
+ if not data["is_admin"]:
+ update.message.reply_text(
+ "Вы не являетесь администратором. Доступ запрещен.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_ACCESS_DENIED
+
+ update.message.reply_text(
+ "Выберите действие",
+ reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard())
+ )
+
+ return ADMIN_MENU
+
+ @staticmethod
+ @save_state
+ def choose_task_hide(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Выберите задачу, которую хотите скрыть",
+ reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_CHOOSE_HIDE
+
+ @staticmethod
+ @save_state
+ def choose_task_publish(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Выберите задачу, которую хотите опубликовать",
+ reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_CHOOSE_PUBLISH
+
+ @staticmethod
+ @save_state
+ def hide_task(bot: Bot, update: Update, user_data: dict):
+ title = update.message.text
+ status, data = backend_api.hide_task(title)
+ if status != 200:
+ update.message.reply_text(
+ "Не удалось скрыть задачу.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ else:
+ update.message.reply_text(
+ "Задача была скрыта.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ @staticmethod
+ @save_state
+ def publish_task(bot: Bot, update: Update, user_data: dict):
+ title = update.message.text
+ status, data = backend_api.publish_task(title)
+ if status != 200:
+ update.message.reply_text(
+ "Не удалось опубликовать задачу.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ else:
+ update.message.reply_text(
+ "Задача была опубликована.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ @staticmethod
+ @save_state
+ def wait_for_announcement(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите объявление (Отправка сообщений может занять несколько секунд)",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ADMIN_WAIT_FOR_ANNOUNCEMENT
+
+ @staticmethod
+ @save_state
+ def wait_for_message(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите сообщение (Отправка сообщений может занять несколько секунд)",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ADMIN_WAIT_FOR_MESSAGE
+
+ @staticmethod
+ @save_state
+ def announce_message(bot: Bot, update: Update, user_data: dict):
+ status, profiles = backend_api.get_profiles()
+ if status != 200:
+ pass
+ else:
+ ids = []
+ for profile in profiles:
+ ids.append(int(profile["tg_id"]))
+
+ return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update)
+
+ @staticmethod
+ @save_state
+ def message_plr(bot: Bot, update: Update, user_data: dict):
+ text = update.message.text
+ _ = text.split(maxsplit=1)
+
+ if len(_) != 2:
+ update.message.reply_text(
+ "Ошибка в сообщении, попробуйте еще раз",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ return ADMIN_TASK_PUBLISHED
+
+ ids_text, msg = _
+ ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(','))
+ ids = list(map(int, ids_split))
+
+ if len(ids) == 0:
+ update.message.reply_text(
+ "Все id пользователей неверно введены",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ else:
+ return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update)
+
+ @staticmethod
+ def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update):
+ errors = []
+ for tg_id in ids:
+ try:
+ bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown")
+ print(tg_id)
+ except Exception as e:
+ logger.debug(f"Got exception while announcing message: {e}")
+ errors.append((tg_id, str(e)))
+ sleep(0.3)
+
+ sleep(0.05)
+
+ if len(errors) == 0:
+ update.message.reply_text(
+ "Сообщение успешно отправлено всем пользователям",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()),
+ )
+
+ # OKAY
+ return ADMIN_TASK_PUBLISHED
+
+ else:
+ error_msg = []
+ user_ids = []
+
+ for err in errors:
+ user_ids.append(str(err[0]))
+ error_msg.append(f"{err[0]}. Reason: {err[1]}")
+
+ msg = "\n".join(error_msg)
+ user_ids = ",".join(user_ids)
+
+ update.message.reply_text(
+ "Во время отправки сообщений возникли следующие ошибки:\n"
+ f"{msg}\n\n{user_ids}",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ # OKAY
+ return ADMIN_TASK_PUBLISHED
diff --git a/src/utils.py b/src/utils.py
new file mode 100644
index 0000000..933f0e8
--- /dev/null
+++ b/src/utils.py
@@ -0,0 +1,13 @@
+(
+ WAIT_FOR_USERNAME,
+ MAIN_MENU, TOP_10, RULES,
+ TASK_CHOOSING, TASK_SHOWN, ANSWERING,
+ ANSWER_RIGHT, ANSWER_WRONG,
+ ASKING_QUESTION,
+
+ ADMIN_MENU, ADMIN_TASK_CHOOSE_PUBLISH, ADMIN_TASK_CHOOSE_HIDE,
+ ADMIN_WAIT_FOR_ANNOUNCEMENT, ADMIN_WAIT_FOR_MESSAGE,
+ ADMIN_TASK_PUBLISHED,
+ ADMIN_ACCESS_DENIED,
+ *_
+) = range(100)