diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend_api.py | 135 | ||||
| -rw-r--r-- | src/basealt_bot_frontend/__init__.py | 2 | ||||
| -rw-r--r-- | src/bot.py | 166 | ||||
| -rw-r--r-- | src/botlogging.py | 24 | ||||
| -rw-r--r-- | src/config.example.py | 3 | ||||
| -rw-r--r-- | src/keyboards.py | 123 | ||||
| -rw-r--r-- | src/states.py | 568 | ||||
| -rw-r--r-- | src/utils.py | 13 |
8 files changed, 1032 insertions, 2 deletions
diff --git a/src/backend_api.py b/src/backend_api.py new file mode 100644 index 0000000..94841f9 --- /dev/null +++ b/src/backend_api.py @@ -0,0 +1,135 @@ +import requests +import urllib +from config import BACKEND_URL +import json +import datetime as dt + +from typing import Tuple, Dict + +from botlogging import logger + + +def make_request(method: str, url: str, **kwargs) -> Tuple[int, Dict]: + try: + response = requests.request(method, url, **kwargs) + answer = response.json() + + except Exception as e: + logger.debug(f"Got exception while making request: {e}") + return 500, {} + + logger.debug( + f"Got response from backend: " + f"Status={response.status_code}; " + f"Text={response.text[:200]}..." + ) + + return response.status_code, answer + + +def post_request(url: str, **kwargs): + return make_request("post", url, **kwargs) + + +def put_request(url: str, **kwargs): + return make_request("put", url, **kwargs) + + +def get_request(url: str, **kwargs): + return make_request("get", url, **kwargs) + + +def patch_request(url: str, **kwargs): + return make_request("patch", url, **kwargs) + + +def register_user(tg_id: int, username: str, fullname: str) -> Tuple[int, Dict]: + logger.debug(f"Trying to register user with id={tg_id}; username={username}") + return post_request(f"{BACKEND_URL}/profiles/", data={ + "tg_id": tg_id, + "username": username, + "fullname": fullname + }) + + +def get_profiles(): + logger.debug(f"Trying to retrieve all profiles") + return get_request(f"{BACKEND_URL}/profiles/") + + +def get_tasks(): + logger.debug(f"Trying to retrieve all tasks") + return get_request(f"{BACKEND_URL}/tasks/") + + +def get_published_tasks(): + logger.debug(f"Trying to retrieve all published tasks") + return get_request(f"{BACKEND_URL}/api/tasks/published/") + + +def get_task(title: str) -> Tuple[int, Dict]: + logger.debug(f"Trying to retrieve task with title={title}") + return get_request(f"{BACKEND_URL}/api/get_task/" + urllib.parse.quote(title)) + + +def save_state(last_state: int, tg_id: int, user_data: dict) -> Tuple[int, Dict]: + user_data_dumped = json.dumps(user_data) + logger.debug(f"Trying to save state for user with id={tg_id}; state={last_state}; user_data={user_data_dumped}") + + return put_request(f"{BACKEND_URL}/api/state/update/{tg_id}/", data={ + "last_state": last_state, + "tg_id": tg_id, + "user_data": user_data_dumped + }) + + +def get_state(tg_id: int) -> Tuple[int, dict]: + logger.debug(f"Trying to get state for user with id={tg_id}") + return get_request(f"{BACKEND_URL}/api/state/get/{tg_id}/") + + +def create_attempt(tg_id: int, task_title: str, answer: str): + return post_request(f"{BACKEND_URL}/attempts/", data={ + "profile": json.dumps({"tg_id": tg_id}), + "task": json.dumps({"title": task_title}), + "answer": answer + }) + + +def get_attempts(tg_id: int=None, task_title: str=None): + data = {} + if tg_id is not None: + data["tg_id"] = tg_id + if task_title is not None: + data["task_title"] = task_title + + return get_request(f"{BACKEND_URL}/api/attempts/", data=data) + + +def get_profile(tg_id: int): + return get_request(f"{BACKEND_URL}/api/profile/get/{tg_id}") + + +def publish_task(title: str): + url_title = urllib.parse.quote(title) + code, resp = get_task(title) + + if code != 200: + return code, {} + + if resp["first_published"] is None: + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "first_published": dt.datetime.now(), + "is_public": True + }) + else: + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "is_public": True + }) + + +def hide_task(title: str): + url_title = urllib.parse.quote(title) + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "is_public": False + }) diff --git a/src/basealt_bot_frontend/__init__.py b/src/basealt_bot_frontend/__init__.py deleted file mode 100644 index 4268070..0000000 --- a/src/basealt_bot_frontend/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from basealt-bot-frontend!" diff --git a/src/bot.py b/src/bot.py new file mode 100644 index 0000000..37e0ac8 --- /dev/null +++ b/src/bot.py @@ -0,0 +1,166 @@ +from telegram.ext import Updater, CommandHandler, ConversationHandler, MessageHandler, Filters, CallbackQueryHandler +from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, InlineKeyboardMarkup, InlineKeyboardButton + +# Typing +from telegram import Update, User, Bot + +from os import getenv +import json +from config import TG_TOKEN, REQUEST_KWARGS + +import backend_api +from keyboards import ( + MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, + AnsweringKeyboard, AdminKeyboard, BackToMenuKeyboard +) +from utils import * +from states import States, AdminStates + +from botlogging import logger + + +def start(bot: Bot, update: Update, user_data: dict): + user_data.update({"chosen_task": None}) + + update.message.reply_text( + "Квиз Базальт СПО на День работодателя КНиИТ СГУ", + reply_markup=ReplyKeyboardRemove() + ) + + user: User = update.message.from_user + if user.username is None: + return States.wait_for_username(bot, update, user_data) + + else: + logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) + return States.main_menu(bot, update, user_data) + + +def username_check(bot: Bot, update: Update, user_data): + user: User = update.message.from_user + if user.username is None: + return States.wait_for_username(bot, update, user_data) + + else: + logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) + return States.main_menu(bot, update, user_data) + + +def resume_bot(bot: Bot, update: Update, user_data): + status_code, state = backend_api.get_state(update.message.from_user.id) + + user_data.update(json.loads(state["user_data"])) + last_state = state["last_state"] + + for handler in conversation_handler.states[last_state]: + if handler.filters.filter(update): + return handler.callback(bot, update, user_data) + + return last_state + + +def stop(bot, update): + update.message.reply_text('Пока!', reply_markup=ReplyKeyboardRemove()) + update.message.reply_text('Для того, чтобы начать работу с ботом заново напишите /start') + return ConversationHandler.END + + +def error(bot, update, error): + logger.warning('Update "%s" caused error "%s"', update, error) + + +def main(): + is_prod = getenv('production', '') + + if is_prod: + print('PRODUCTION ENVIRONMENT') + updater = Updater(TG_TOKEN) + else: + print('DEVELOPMENT ENVIRONMENT') + updater = Updater(TG_TOKEN, request_kwargs=REQUEST_KWARGS) + + dp = updater.dispatcher + dp.add_error_handler(error) + dp.add_handler(conversation_handler) + + updater.start_polling() + updater.idle() + + +conversation_handler = ConversationHandler( + entry_points=[ + CommandHandler('start', start, pass_user_data=True), + MessageHandler(Filters.text, resume_bot, pass_user_data=True), + ], + + states={ + WAIT_FOR_USERNAME: [MessageHandler(Filters.text, username_check, pass_user_data=True)], + + MAIN_MENU: [ + MessageHandler(Filters.regex(MenuKeyboard.CHOOSE_TASK), States.choose_task, pass_user_data=True), + MessageHandler(Filters.regex(MenuKeyboard.TOP_10), States.top_10, pass_user_data=True), + MessageHandler(Filters.regex(MenuKeyboard.RULES), States.rules, pass_user_data=True), + CommandHandler('admin', AdminStates.admin_panel, pass_user_data=True) + ], + TASK_CHOOSING: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.text, States.show_task, pass_user_data=True), + ], + TASK_SHOWN: [ + MessageHandler(Filters.regex(TaskChosenKeyboard.CANCEL), States.choose_task, pass_user_data=True), + MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), + # MessageHandler(Filters.regex(TaskChosenKeyboard.TYPE_ANSWER), States.type_answer, pass_user_data=True), + ], + ANSWERING: [ + MessageHandler(Filters.regex(AnsweringKeyboard.CANCEL), States.show_task, pass_user_data=True), + MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), + ], + ANSWER_RIGHT: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], + ANSWER_WRONG: [MessageHandler(Filters.text, States.show_task, pass_user_data=True)], + + ASKING_QUESTION: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.text, States.ask_question, pass_user_data=True), + ], + + # ADMIN PANEL + + ADMIN_MENU: [ + MessageHandler(Filters.regex(AdminKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.PUBLISH_TASK), AdminStates.choose_task_publish, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.HIDE_TASK), AdminStates.choose_task_hide, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.ANNOUNCE), AdminStates.wait_for_announcement, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.MESSAGE_PLAYER), AdminStates.wait_for_message, pass_user_data=True), + ], + + ADMIN_WAIT_FOR_ANNOUNCEMENT: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.announce_message, pass_user_data=True), + ], + ADMIN_WAIT_FOR_MESSAGE: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.message_plr, pass_user_data=True), + ], + + ADMIN_TASK_CHOOSE_HIDE: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.hide_task, pass_user_data=True), + ], + + ADMIN_TASK_CHOOSE_PUBLISH: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.publish_task, pass_user_data=True), + ], + + ADMIN_TASK_PUBLISHED: [MessageHandler(Filters.text, AdminStates.admin_panel, pass_user_data=True)], + ADMIN_ACCESS_DENIED: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], + }, + + fallbacks=[ + CommandHandler('stop', stop), + MessageHandler(Filters.regex(MenuKeyboard.HELP), States.prompt_question, pass_user_data=True), + ] +) + +if __name__ == '__main__': + main() diff --git a/src/botlogging.py b/src/botlogging.py new file mode 100644 index 0000000..0a08a8d --- /dev/null +++ b/src/botlogging.py @@ -0,0 +1,24 @@ +import logging +from pathlib import Path + +Path("logs").mkdir(exist_ok=True) + +fh = logging.FileHandler("logs/log.log") +fh.setLevel(logging.DEBUG) + +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) + +log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +fh.setFormatter(log_format) +ch.setFormatter(log_format) + +logging.basicConfig( + level=logging.DEBUG, handlers=[fh, ch], + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(fh) +logger.addHandler(ch) diff --git a/src/config.example.py b/src/config.example.py new file mode 100644 index 0000000..ba0e5bc --- /dev/null +++ b/src/config.example.py @@ -0,0 +1,3 @@ +TG_TOKEN = "" +REQUEST_KWARGS = {} +BACKEND_URL = "http://localhost:8000" diff --git a/src/keyboards.py b/src/keyboards.py new file mode 100644 index 0000000..aa8d761 --- /dev/null +++ b/src/keyboards.py @@ -0,0 +1,123 @@ +import backend_api +from abc import ABC + + +class Keyboard(ABC): + @classmethod + def get_keyboard(cls, telegram_id=None): + pass + + +class MenuKeyboard(Keyboard): + CHOOSE_TASK = "Выбрать задание📚" + TOP_10 = "hidden Топ-10📊" + RULES = "Правилаℹ️" + ADMIN = "/admin" + HELP = "Связаться с Базальт СПО🐧" + + @classmethod + def get_keyboard(cls, telegram_id=None, solved=False): + if telegram_id is not None: + status_code, data = backend_api.get_profile(telegram_id) + if status_code == 200 and data["is_admin"]: + return [ + [cls.ADMIN], + [cls.CHOOSE_TASK], + [cls.TOP_10, cls.RULES], + [cls.HELP], + ] + + if solved: + return [ + [cls.RULES], + [cls.HELP], + ] + else: + return [ + [cls.CHOOSE_TASK], + [cls.RULES], + [cls.HELP], + ] + + +class BackToMenuKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CANCEL]] + + +class TasksKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + status, tasks = backend_api.get_published_tasks() + titles_keyboard = [[cls.CANCEL]] + if status == 200: + titles = [] + for task in sorted(tasks, key=lambda t: t.get("title")): + titles.append([task.get("title")]) + titles_keyboard.extend(titles) + + return titles_keyboard + + +class PublishTasksKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + status, tasks = backend_api.get_tasks() + titles_keyboard = [[cls.CANCEL]] + if status == 200: + titles_keyboard.extend([task.get("title")] for task in tasks) + + return titles_keyboard + + +class TaskChosenKeyboard(Keyboard): + TYPE_ANSWER = "Ввести ответ✏️" + CANCEL = "Назад↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [ + # [cls.TYPE_ANSWER], + [cls.CANCEL], + ] + + +class ContinueKeyboard(Keyboard): + CONTINUE = "Продолжить➡️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CONTINUE]] + + +class AnsweringKeyboard(Keyboard): + CANCEL = "Отмена↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CANCEL]] + + +class AdminKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + PUBLISH_TASK = "Опубликовать задачу" + HIDE_TASK = "Скрыть задачу" + ANNOUNCE = "Сделать объявление" + MESSAGE_PLAYER = "Написать сообщение от имени бота" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [ + [cls.CANCEL], + [cls.PUBLISH_TASK], + [cls.HIDE_TASK], + [cls.ANNOUNCE], + [cls.MESSAGE_PLAYER], + ] diff --git a/src/states.py b/src/states.py new file mode 100644 index 0000000..afb00cb --- /dev/null +++ b/src/states.py @@ -0,0 +1,568 @@ +from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove + +from keyboards import ( + MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, + AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard +) +from utils import * +import backend_api +from time import sleep +import datetime as dt +from decimal import Decimal + +# Typing +from telegram import Update, User, Bot +from typing import List + +from botlogging import logger + + +def save_state(func): + def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs): + last_state = func(bot, update, user_data, *args, **kwargs) + backend_api.save_state(last_state, update.message.from_user.id, user_data) + return last_state + + return wrapper + + +def calc_score(t, base_score=1000): + base_score = Decimal(base_score) + min_score = base_score * Decimal(0.1) + max_t = Decimal(720) + k = (base_score - min_score) / (max_t * max_t) + + if t >= max_t: + return round(min_score, 2) + + return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2) + + +class States: + @staticmethod + @save_state + def prompt_question(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите свой вопрос, он будет перенаправлен представителям компании. " + "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ASKING_QUESTION + + @staticmethod + @save_state + def ask_question(bot: Bot, update: Update, user_data: dict): + question = update.message.text + + code, profiles = backend_api.get_profiles() + if code != 200: + pass + + user_id = update.message.from_user.id + username = update.message.from_user.full_name + + for profile in profiles: + if profile["is_admin"]: + bot.send_message( + profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}", + parse_mode="Markdown" + ) + + update.message.reply_text( + "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return States.main_menu(bot, update, user_data) + + + @staticmethod + @save_state + def wait_for_username(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "По правилам квиза ты не можешь участвовать, если у тебя не указано " + "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!", + reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']]) + ) + return WAIT_FOR_USERNAME + + @staticmethod + @save_state + def main_menu(bot: Bot, update: Update, user_data: dict): + user_data["chosen_task"] = None + + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + + menu_text = [ + + ] + solved = False + + full_score = Decimal(0.0) + if status_code == 200: + if len(response) == 5: + solved = True + if len(response) != 0: + menu_text.append("Твои решенные задачи:") + + for attempt in response: + try: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") + + try: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") + + t = Decimal((ts - fp).total_seconds()) / Decimal(60) + # plr_score = calc_score(t, attempt["task"]["base_score"]) + plr_score = int(attempt["task"]["base_score"]) + full_score += plr_score + + menu_text.append( + f"_{attempt['task']['title']}_ " + f"({plr_score})" + ) + else: + menu_text.append("У тебя еще нет решенных задач") + else: + menu_text.append( + "К сожалению, не удалось получить данные о твоих попытках =(\n" + "Попробуй обратиться к боту чуть позже." + ) + + menu_text.append(f"\n*Итоговый счет*: {full_score}") + + update.message.reply_text("\n".join(menu_text), parse_mode="Markdown") + + if solved: + update.message.reply_text( + "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*", + parse_mode="Markdown" + ) + + update.message.reply_text( + "Выбери следующее действие...", + reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved)) + ) + + return MAIN_MENU + + @staticmethod + @save_state + def choose_task(bot: Bot, update: Update, user_data: dict): + user_data["chosen_task"] = None + + status, published = backend_api.get_published_tasks() + if len(published) == 0: + update.message.reply_text( + "Пока что не опубликовано ни одной задачи =(", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + # okay this is epic (pile of shit) + return ANSWER_RIGHT + else: + update.message.reply_text( + "Какую задачу ты хочешь сдать?", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + return TASK_CHOOSING + + @staticmethod + @save_state + def top_10(bot: Bot, update: Update, user_data: dict): + code, attempts = backend_api.get_attempts() + + top = {} + + for attempt in attempts: + if not attempt["solved"] or attempt["profile"]["is_hidden"]: + continue + + if attempt["profile"]["tg_id"] not in top: + top[attempt["profile"]["tg_id"]] = { + "fullname": attempt["profile"]["fullname"], + "username": attempt["profile"]["username"], + "score": Decimal(0.0) + } + + try: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") + + try: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") + + t = Decimal((ts - fp).total_seconds()) / Decimal(60) + plr_score = calc_score(t, attempt["task"]["base_score"]) + top[attempt["profile"]["tg_id"]]["score"] += plr_score + + try: + top_list = [] + for tg_id, stats in top.items(): + top_list.append(( + stats['score'], tg_id, + f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts" + )) + + top_list.sort(key=lambda p: p[0], reverse=True) + + top_10_ids = [] + places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)} + + top = ["Топ-10:"] + for place, (score, tg_id, text) in enumerate(top_list[:10], 1): + top.append(str(place).rjust(2, " ") + ". " + text) + top_10_ids.append(tg_id) + + if update.message.from_user.id not in top_10_ids: + if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11: + top.append("...") + this_place = places[update.message.from_user.id] + top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2]) + + except Exception as e: + print(e) + + update.message.reply_text("\n".join(top)) + return MAIN_MENU + + @staticmethod + @save_state + def rules(bot: Bot, update: Update, user_data: dict): + r = """Вот несколько правил, которые следует соблюдать: +1) Всего в квизе 5 задач - после решения который можно получить призы. +2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать! +3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время! +4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт.""" + # with open("rules.jpg", "rb") as f: + # try: + # print(update.message.reply_photo(f, timeout=5)) + # except Exception as e: + # print(e) + update.message.reply_text(r) + + return MAIN_MENU + + @staticmethod + @save_state + def show_task(bot: Bot, update: Update, user_data: dict): + if "chosen_task" in user_data and user_data["chosen_task"] is not None: + task_title = user_data["chosen_task"] + else: + task_title = update.message.text + user_data["chosen_task"] = task_title + + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + if status_code != 200: + user_data["chosen_task"] = None + + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + if len(response) != 0: + user_data["chosen_task"] = None + + update.message.reply_text( + "Ты уже решил эту задачу! Выбери другую.", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + status_code, tasks_response = backend_api.get_published_tasks() + if status_code != 200: + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return MAIN_MENU + + titles = {task.get("title"): task for task in tasks_response} + + if task_title not in titles.keys(): + update.message.reply_text( + "Такой задачи не найдено, попробуй ввести другое название!", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + task = titles[task_title] + + message = '\n'.join([ + f"*{task['title']}*", + f"{task['statement']}", + ]) + keyboard = TaskChosenKeyboard.get_keyboard() + + update.message.reply_text( + message, parse_mode="Markdown", + reply_markup=ReplyKeyboardMarkup(keyboard) + ) + + update.message.reply_text( + "Вводи свой ответ и я его проверю, " + "или нажми кнопку Назад, чтобы выбрать другую задачу" + ) + + return TASK_SHOWN + + @staticmethod + @save_state + def type_answer(bot: Bot, update: Update, user_data: dict): + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + if len(response) != 0: + update.message.reply_text( + "Ты уже решил эту задачу! Выбери другую.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + else: + update.message.reply_text( + "Вводи свой ответ, я его проверю.", + reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard()) + ) + return ANSWERING + + @staticmethod + @save_state + def accept_answer(bot: Bot, update: Update, user_data: dict): + answer = update.message.text + status_code, task = backend_api.get_task(user_data["chosen_task"]) + if status_code == 200: + backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower()) + + if answer.lower() == task["answer"].lower(): + update.message.reply_text( + "Ты ввел правильный ответ! Возвращаемся в главное меню." + ) + return States.main_menu(bot, update, user_data) + else: + update.message.reply_text( + "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.", + reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard()) + ) + + # return ANSWER_WRONG + return TASK_SHOWN + + else: + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ANSWER_RIGHT + + +class AdminStates: + @staticmethod + @save_state + def admin_panel(bot: Bot, update: Update, user_data: dict): + status_code, data = backend_api.get_profile(update.message.from_user.id) + if status_code != 200: + update.message.reply_text( + "Не удалось аутентифицировать пользователя. Доступ запрещен.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_ACCESS_DENIED + + if not data["is_admin"]: + update.message.reply_text( + "Вы не являетесь администратором. Доступ запрещен.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_ACCESS_DENIED + + update.message.reply_text( + "Выберите действие", + reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard()) + ) + + return ADMIN_MENU + + @staticmethod + @save_state + def choose_task_hide(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Выберите задачу, которую хотите скрыть", + reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_CHOOSE_HIDE + + @staticmethod + @save_state + def choose_task_publish(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Выберите задачу, которую хотите опубликовать", + reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_CHOOSE_PUBLISH + + @staticmethod + @save_state + def hide_task(bot: Bot, update: Update, user_data: dict): + title = update.message.text + status, data = backend_api.hide_task(title) + if status != 200: + update.message.reply_text( + "Не удалось скрыть задачу.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + else: + update.message.reply_text( + "Задача была скрыта.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + @staticmethod + @save_state + def publish_task(bot: Bot, update: Update, user_data: dict): + title = update.message.text + status, data = backend_api.publish_task(title) + if status != 200: + update.message.reply_text( + "Не удалось опубликовать задачу.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + else: + update.message.reply_text( + "Задача была опубликована.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + @staticmethod + @save_state + def wait_for_announcement(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите объявление (Отправка сообщений может занять несколько секунд)", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ADMIN_WAIT_FOR_ANNOUNCEMENT + + @staticmethod + @save_state + def wait_for_message(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите сообщение (Отправка сообщений может занять несколько секунд)", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ADMIN_WAIT_FOR_MESSAGE + + @staticmethod + @save_state + def announce_message(bot: Bot, update: Update, user_data: dict): + status, profiles = backend_api.get_profiles() + if status != 200: + pass + else: + ids = [] + for profile in profiles: + ids.append(int(profile["tg_id"])) + + return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update) + + @staticmethod + @save_state + def message_plr(bot: Bot, update: Update, user_data: dict): + text = update.message.text + _ = text.split(maxsplit=1) + + if len(_) != 2: + update.message.reply_text( + "Ошибка в сообщении, попробуйте еще раз", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + return ADMIN_TASK_PUBLISHED + + ids_text, msg = _ + ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(',')) + ids = list(map(int, ids_split)) + + if len(ids) == 0: + update.message.reply_text( + "Все id пользователей неверно введены", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + else: + return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update) + + @staticmethod + def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update): + errors = [] + for tg_id in ids: + try: + bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown") + print(tg_id) + except Exception as e: + logger.debug(f"Got exception while announcing message: {e}") + errors.append((tg_id, str(e))) + sleep(0.3) + + sleep(0.05) + + if len(errors) == 0: + update.message.reply_text( + "Сообщение успешно отправлено всем пользователям", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()), + ) + + # OKAY + return ADMIN_TASK_PUBLISHED + + else: + error_msg = [] + user_ids = [] + + for err in errors: + user_ids.append(str(err[0])) + error_msg.append(f"{err[0]}. Reason: {err[1]}") + + msg = "\n".join(error_msg) + user_ids = ",".join(user_ids) + + update.message.reply_text( + "Во время отправки сообщений возникли следующие ошибки:\n" + f"{msg}\n\n{user_ids}", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + # OKAY + return ADMIN_TASK_PUBLISHED diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..933f0e8 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,13 @@ +( + WAIT_FOR_USERNAME, + MAIN_MENU, TOP_10, RULES, + TASK_CHOOSING, TASK_SHOWN, ANSWERING, + ANSWER_RIGHT, ANSWER_WRONG, + ASKING_QUESTION, + + ADMIN_MENU, ADMIN_TASK_CHOOSE_PUBLISH, ADMIN_TASK_CHOOSE_HIDE, + ADMIN_WAIT_FOR_ANNOUNCEMENT, ADMIN_WAIT_FOR_MESSAGE, + ADMIN_TASK_PUBLISHED, + ADMIN_ACCESS_DENIED, + *_ +) = range(100) |