From dd177e3d2c4579bf435c1f583a1ceab3fb438791 Mon Sep 17 00:00:00 2001 From: Andrew Guschin Date: Tue, 15 Oct 2024 20:08:19 +0400 Subject: move project files to src --- .dockerignore | 3 +- .gitignore | 1 - README.md | 5 + backend_api.py | 135 --------- bot.py | 166 ---------- botlogging.py | 22 -- config.example.py | 3 - keyboards.py | 123 -------- pyproject.toml | 4 + src/backend_api.py | 135 +++++++++ src/basealt_bot_frontend/__init__.py | 2 - src/bot.py | 166 ++++++++++ src/botlogging.py | 24 ++ src/config.example.py | 3 + src/keyboards.py | 123 ++++++++ src/states.py | 568 +++++++++++++++++++++++++++++++++++ src/utils.py | 13 + states.py | 568 ----------------------------------- utils.py | 13 - 19 files changed, 1042 insertions(+), 1035 deletions(-) create mode 100644 README.md delete mode 100644 backend_api.py delete mode 100644 bot.py delete mode 100644 botlogging.py delete mode 100644 config.example.py delete mode 100644 keyboards.py create mode 100644 src/backend_api.py delete mode 100644 src/basealt_bot_frontend/__init__.py create mode 100644 src/bot.py create mode 100644 src/botlogging.py create mode 100644 src/config.example.py create mode 100644 src/keyboards.py create mode 100644 src/states.py create mode 100644 src/utils.py delete mode 100644 states.py delete mode 100644 utils.py diff --git a/.dockerignore b/.dockerignore index 5237f0f..1d17dae 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,2 +1 @@ -venv -db.sqlite3 \ No newline at end of file +.venv diff --git a/.gitignore b/.gitignore index 14e0a5f..e8fbe0e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,3 @@ logs __pycache__ config.py -venv diff --git a/README.md b/README.md new file mode 100644 index 0000000..78c8e64 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# basealt-bot-frontend + +Чтобы запустить данного бота, соответствующий бэкенд уже должен быть поднят. +После этого, можно скопировать файл `src/config.example.py` в файл +`src/config.py` и вписать в него нужные значения токена бота и URL бэкенда. diff --git a/backend_api.py b/backend_api.py deleted file mode 100644 index 94841f9..0000000 --- a/backend_api.py +++ /dev/null @@ -1,135 +0,0 @@ -import requests -import urllib -from config import BACKEND_URL -import json -import datetime as dt - -from typing import Tuple, Dict - -from botlogging import logger - - -def make_request(method: str, url: str, **kwargs) -> Tuple[int, Dict]: - try: - response = requests.request(method, url, **kwargs) - answer = response.json() - - except Exception as e: - logger.debug(f"Got exception while making request: {e}") - return 500, {} - - logger.debug( - f"Got response from backend: " - f"Status={response.status_code}; " - f"Text={response.text[:200]}..." - ) - - return response.status_code, answer - - -def post_request(url: str, **kwargs): - return make_request("post", url, **kwargs) - - -def put_request(url: str, **kwargs): - return make_request("put", url, **kwargs) - - -def get_request(url: str, **kwargs): - return make_request("get", url, **kwargs) - - -def patch_request(url: str, **kwargs): - return make_request("patch", url, **kwargs) - - -def register_user(tg_id: int, username: str, fullname: str) -> Tuple[int, Dict]: - logger.debug(f"Trying to register user with id={tg_id}; username={username}") - return post_request(f"{BACKEND_URL}/profiles/", data={ - "tg_id": tg_id, - "username": username, - "fullname": fullname - }) - - -def get_profiles(): - logger.debug(f"Trying to retrieve all profiles") - return get_request(f"{BACKEND_URL}/profiles/") - - -def get_tasks(): - logger.debug(f"Trying to retrieve all tasks") - return get_request(f"{BACKEND_URL}/tasks/") - - -def get_published_tasks(): - logger.debug(f"Trying to retrieve all published tasks") - return get_request(f"{BACKEND_URL}/api/tasks/published/") - - -def get_task(title: str) -> Tuple[int, Dict]: - logger.debug(f"Trying to retrieve task with title={title}") - return get_request(f"{BACKEND_URL}/api/get_task/" + urllib.parse.quote(title)) - - -def save_state(last_state: int, tg_id: int, user_data: dict) -> Tuple[int, Dict]: - user_data_dumped = json.dumps(user_data) - logger.debug(f"Trying to save state for user with id={tg_id}; state={last_state}; user_data={user_data_dumped}") - - return put_request(f"{BACKEND_URL}/api/state/update/{tg_id}/", data={ - "last_state": last_state, - "tg_id": tg_id, - "user_data": user_data_dumped - }) - - -def get_state(tg_id: int) -> Tuple[int, dict]: - logger.debug(f"Trying to get state for user with id={tg_id}") - return get_request(f"{BACKEND_URL}/api/state/get/{tg_id}/") - - -def create_attempt(tg_id: int, task_title: str, answer: str): - return post_request(f"{BACKEND_URL}/attempts/", data={ - "profile": json.dumps({"tg_id": tg_id}), - "task": json.dumps({"title": task_title}), - "answer": answer - }) - - -def get_attempts(tg_id: int=None, task_title: str=None): - data = {} - if tg_id is not None: - data["tg_id"] = tg_id - if task_title is not None: - data["task_title"] = task_title - - return get_request(f"{BACKEND_URL}/api/attempts/", data=data) - - -def get_profile(tg_id: int): - return get_request(f"{BACKEND_URL}/api/profile/get/{tg_id}") - - -def publish_task(title: str): - url_title = urllib.parse.quote(title) - code, resp = get_task(title) - - if code != 200: - return code, {} - - if resp["first_published"] is None: - return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ - "first_published": dt.datetime.now(), - "is_public": True - }) - else: - return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ - "is_public": True - }) - - -def hide_task(title: str): - url_title = urllib.parse.quote(title) - return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ - "is_public": False - }) diff --git a/bot.py b/bot.py deleted file mode 100644 index 37e0ac8..0000000 --- a/bot.py +++ /dev/null @@ -1,166 +0,0 @@ -from telegram.ext import Updater, CommandHandler, ConversationHandler, MessageHandler, Filters, CallbackQueryHandler -from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, InlineKeyboardMarkup, InlineKeyboardButton - -# Typing -from telegram import Update, User, Bot - -from os import getenv -import json -from config import TG_TOKEN, REQUEST_KWARGS - -import backend_api -from keyboards import ( - MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, - AnsweringKeyboard, AdminKeyboard, BackToMenuKeyboard -) -from utils import * -from states import States, AdminStates - -from botlogging import logger - - -def start(bot: Bot, update: Update, user_data: dict): - user_data.update({"chosen_task": None}) - - update.message.reply_text( - "Квиз Базальт СПО на День работодателя КНиИТ СГУ", - reply_markup=ReplyKeyboardRemove() - ) - - user: User = update.message.from_user - if user.username is None: - return States.wait_for_username(bot, update, user_data) - - else: - logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) - return States.main_menu(bot, update, user_data) - - -def username_check(bot: Bot, update: Update, user_data): - user: User = update.message.from_user - if user.username is None: - return States.wait_for_username(bot, update, user_data) - - else: - logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) - return States.main_menu(bot, update, user_data) - - -def resume_bot(bot: Bot, update: Update, user_data): - status_code, state = backend_api.get_state(update.message.from_user.id) - - user_data.update(json.loads(state["user_data"])) - last_state = state["last_state"] - - for handler in conversation_handler.states[last_state]: - if handler.filters.filter(update): - return handler.callback(bot, update, user_data) - - return last_state - - -def stop(bot, update): - update.message.reply_text('Пока!', reply_markup=ReplyKeyboardRemove()) - update.message.reply_text('Для того, чтобы начать работу с ботом заново напишите /start') - return ConversationHandler.END - - -def error(bot, update, error): - logger.warning('Update "%s" caused error "%s"', update, error) - - -def main(): - is_prod = getenv('production', '') - - if is_prod: - print('PRODUCTION ENVIRONMENT') - updater = Updater(TG_TOKEN) - else: - print('DEVELOPMENT ENVIRONMENT') - updater = Updater(TG_TOKEN, request_kwargs=REQUEST_KWARGS) - - dp = updater.dispatcher - dp.add_error_handler(error) - dp.add_handler(conversation_handler) - - updater.start_polling() - updater.idle() - - -conversation_handler = ConversationHandler( - entry_points=[ - CommandHandler('start', start, pass_user_data=True), - MessageHandler(Filters.text, resume_bot, pass_user_data=True), - ], - - states={ - WAIT_FOR_USERNAME: [MessageHandler(Filters.text, username_check, pass_user_data=True)], - - MAIN_MENU: [ - MessageHandler(Filters.regex(MenuKeyboard.CHOOSE_TASK), States.choose_task, pass_user_data=True), - MessageHandler(Filters.regex(MenuKeyboard.TOP_10), States.top_10, pass_user_data=True), - MessageHandler(Filters.regex(MenuKeyboard.RULES), States.rules, pass_user_data=True), - CommandHandler('admin', AdminStates.admin_panel, pass_user_data=True) - ], - TASK_CHOOSING: [ - MessageHandler(Filters.regex(TasksKeyboard.CANCEL), States.main_menu, pass_user_data=True), - MessageHandler(Filters.text, States.show_task, pass_user_data=True), - ], - TASK_SHOWN: [ - MessageHandler(Filters.regex(TaskChosenKeyboard.CANCEL), States.choose_task, pass_user_data=True), - MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), - # MessageHandler(Filters.regex(TaskChosenKeyboard.TYPE_ANSWER), States.type_answer, pass_user_data=True), - ], - ANSWERING: [ - MessageHandler(Filters.regex(AnsweringKeyboard.CANCEL), States.show_task, pass_user_data=True), - MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), - ], - ANSWER_RIGHT: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], - ANSWER_WRONG: [MessageHandler(Filters.text, States.show_task, pass_user_data=True)], - - ASKING_QUESTION: [ - MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), States.main_menu, pass_user_data=True), - MessageHandler(Filters.text, States.ask_question, pass_user_data=True), - ], - - # ADMIN PANEL - - ADMIN_MENU: [ - MessageHandler(Filters.regex(AdminKeyboard.CANCEL), States.main_menu, pass_user_data=True), - MessageHandler(Filters.regex(AdminKeyboard.PUBLISH_TASK), AdminStates.choose_task_publish, pass_user_data=True), - MessageHandler(Filters.regex(AdminKeyboard.HIDE_TASK), AdminStates.choose_task_hide, pass_user_data=True), - MessageHandler(Filters.regex(AdminKeyboard.ANNOUNCE), AdminStates.wait_for_announcement, pass_user_data=True), - MessageHandler(Filters.regex(AdminKeyboard.MESSAGE_PLAYER), AdminStates.wait_for_message, pass_user_data=True), - ], - - ADMIN_WAIT_FOR_ANNOUNCEMENT: [ - MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), - MessageHandler(Filters.text, AdminStates.announce_message, pass_user_data=True), - ], - ADMIN_WAIT_FOR_MESSAGE: [ - MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), - MessageHandler(Filters.text, AdminStates.message_plr, pass_user_data=True), - ], - - ADMIN_TASK_CHOOSE_HIDE: [ - MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), - MessageHandler(Filters.text, AdminStates.hide_task, pass_user_data=True), - ], - - ADMIN_TASK_CHOOSE_PUBLISH: [ - MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), - MessageHandler(Filters.text, AdminStates.publish_task, pass_user_data=True), - ], - - ADMIN_TASK_PUBLISHED: [MessageHandler(Filters.text, AdminStates.admin_panel, pass_user_data=True)], - ADMIN_ACCESS_DENIED: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], - }, - - fallbacks=[ - CommandHandler('stop', stop), - MessageHandler(Filters.regex(MenuKeyboard.HELP), States.prompt_question, pass_user_data=True), - ] -) - -if __name__ == '__main__': - main() diff --git a/botlogging.py b/botlogging.py deleted file mode 100644 index 6dc9d37..0000000 --- a/botlogging.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging - - -fh = logging.FileHandler("logs/log.log") -fh.setLevel(logging.DEBUG) - -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) - -log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -fh.setFormatter(log_format) -ch.setFormatter(log_format) - -logging.basicConfig( - level=logging.DEBUG, handlers=[fh, ch], - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -logger.addHandler(fh) -logger.addHandler(ch) \ No newline at end of file diff --git a/config.example.py b/config.example.py deleted file mode 100644 index ba0e5bc..0000000 --- a/config.example.py +++ /dev/null @@ -1,3 +0,0 @@ -TG_TOKEN = "" -REQUEST_KWARGS = {} -BACKEND_URL = "http://localhost:8000" diff --git a/keyboards.py b/keyboards.py deleted file mode 100644 index aa8d761..0000000 --- a/keyboards.py +++ /dev/null @@ -1,123 +0,0 @@ -import backend_api -from abc import ABC - - -class Keyboard(ABC): - @classmethod - def get_keyboard(cls, telegram_id=None): - pass - - -class MenuKeyboard(Keyboard): - CHOOSE_TASK = "Выбрать задание📚" - TOP_10 = "hidden Топ-10📊" - RULES = "Правилаℹ️" - ADMIN = "/admin" - HELP = "Связаться с Базальт СПО🐧" - - @classmethod - def get_keyboard(cls, telegram_id=None, solved=False): - if telegram_id is not None: - status_code, data = backend_api.get_profile(telegram_id) - if status_code == 200 and data["is_admin"]: - return [ - [cls.ADMIN], - [cls.CHOOSE_TASK], - [cls.TOP_10, cls.RULES], - [cls.HELP], - ] - - if solved: - return [ - [cls.RULES], - [cls.HELP], - ] - else: - return [ - [cls.CHOOSE_TASK], - [cls.RULES], - [cls.HELP], - ] - - -class BackToMenuKeyboard(Keyboard): - CANCEL = "Вернуться в меню↩️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - return [[cls.CANCEL]] - - -class TasksKeyboard(Keyboard): - CANCEL = "Вернуться в меню↩️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - status, tasks = backend_api.get_published_tasks() - titles_keyboard = [[cls.CANCEL]] - if status == 200: - titles = [] - for task in sorted(tasks, key=lambda t: t.get("title")): - titles.append([task.get("title")]) - titles_keyboard.extend(titles) - - return titles_keyboard - - -class PublishTasksKeyboard(Keyboard): - CANCEL = "Вернуться в меню↩️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - status, tasks = backend_api.get_tasks() - titles_keyboard = [[cls.CANCEL]] - if status == 200: - titles_keyboard.extend([task.get("title")] for task in tasks) - - return titles_keyboard - - -class TaskChosenKeyboard(Keyboard): - TYPE_ANSWER = "Ввести ответ✏️" - CANCEL = "Назад↩️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - return [ - # [cls.TYPE_ANSWER], - [cls.CANCEL], - ] - - -class ContinueKeyboard(Keyboard): - CONTINUE = "Продолжить➡️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - return [[cls.CONTINUE]] - - -class AnsweringKeyboard(Keyboard): - CANCEL = "Отмена↩️" - - @classmethod - def get_keyboard(cls, telegram_id=None): - return [[cls.CANCEL]] - - -class AdminKeyboard(Keyboard): - CANCEL = "Вернуться в меню↩️" - PUBLISH_TASK = "Опубликовать задачу" - HIDE_TASK = "Скрыть задачу" - ANNOUNCE = "Сделать объявление" - MESSAGE_PLAYER = "Написать сообщение от имени бота" - - @classmethod - def get_keyboard(cls, telegram_id=None): - return [ - [cls.CANCEL], - [cls.PUBLISH_TASK], - [cls.HIDE_TASK], - [cls.ANNOUNCE], - [cls.MESSAGE_PLAYER], - ] diff --git a/pyproject.toml b/pyproject.toml index e9088fd..28125de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,8 +9,12 @@ dependencies = [ "python-telegram-bot==12.3.0", "requests==2.22.0", ] +readme = "README.md" requires-python = ">= 3.8" +[project.scripts] +startbot = "bot:main" + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/src/backend_api.py b/src/backend_api.py new file mode 100644 index 0000000..94841f9 --- /dev/null +++ b/src/backend_api.py @@ -0,0 +1,135 @@ +import requests +import urllib +from config import BACKEND_URL +import json +import datetime as dt + +from typing import Tuple, Dict + +from botlogging import logger + + +def make_request(method: str, url: str, **kwargs) -> Tuple[int, Dict]: + try: + response = requests.request(method, url, **kwargs) + answer = response.json() + + except Exception as e: + logger.debug(f"Got exception while making request: {e}") + return 500, {} + + logger.debug( + f"Got response from backend: " + f"Status={response.status_code}; " + f"Text={response.text[:200]}..." + ) + + return response.status_code, answer + + +def post_request(url: str, **kwargs): + return make_request("post", url, **kwargs) + + +def put_request(url: str, **kwargs): + return make_request("put", url, **kwargs) + + +def get_request(url: str, **kwargs): + return make_request("get", url, **kwargs) + + +def patch_request(url: str, **kwargs): + return make_request("patch", url, **kwargs) + + +def register_user(tg_id: int, username: str, fullname: str) -> Tuple[int, Dict]: + logger.debug(f"Trying to register user with id={tg_id}; username={username}") + return post_request(f"{BACKEND_URL}/profiles/", data={ + "tg_id": tg_id, + "username": username, + "fullname": fullname + }) + + +def get_profiles(): + logger.debug(f"Trying to retrieve all profiles") + return get_request(f"{BACKEND_URL}/profiles/") + + +def get_tasks(): + logger.debug(f"Trying to retrieve all tasks") + return get_request(f"{BACKEND_URL}/tasks/") + + +def get_published_tasks(): + logger.debug(f"Trying to retrieve all published tasks") + return get_request(f"{BACKEND_URL}/api/tasks/published/") + + +def get_task(title: str) -> Tuple[int, Dict]: + logger.debug(f"Trying to retrieve task with title={title}") + return get_request(f"{BACKEND_URL}/api/get_task/" + urllib.parse.quote(title)) + + +def save_state(last_state: int, tg_id: int, user_data: dict) -> Tuple[int, Dict]: + user_data_dumped = json.dumps(user_data) + logger.debug(f"Trying to save state for user with id={tg_id}; state={last_state}; user_data={user_data_dumped}") + + return put_request(f"{BACKEND_URL}/api/state/update/{tg_id}/", data={ + "last_state": last_state, + "tg_id": tg_id, + "user_data": user_data_dumped + }) + + +def get_state(tg_id: int) -> Tuple[int, dict]: + logger.debug(f"Trying to get state for user with id={tg_id}") + return get_request(f"{BACKEND_URL}/api/state/get/{tg_id}/") + + +def create_attempt(tg_id: int, task_title: str, answer: str): + return post_request(f"{BACKEND_URL}/attempts/", data={ + "profile": json.dumps({"tg_id": tg_id}), + "task": json.dumps({"title": task_title}), + "answer": answer + }) + + +def get_attempts(tg_id: int=None, task_title: str=None): + data = {} + if tg_id is not None: + data["tg_id"] = tg_id + if task_title is not None: + data["task_title"] = task_title + + return get_request(f"{BACKEND_URL}/api/attempts/", data=data) + + +def get_profile(tg_id: int): + return get_request(f"{BACKEND_URL}/api/profile/get/{tg_id}") + + +def publish_task(title: str): + url_title = urllib.parse.quote(title) + code, resp = get_task(title) + + if code != 200: + return code, {} + + if resp["first_published"] is None: + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "first_published": dt.datetime.now(), + "is_public": True + }) + else: + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "is_public": True + }) + + +def hide_task(title: str): + url_title = urllib.parse.quote(title) + return patch_request(f"{BACKEND_URL}/api/tasks/{url_title}/update/", data={ + "is_public": False + }) diff --git a/src/basealt_bot_frontend/__init__.py b/src/basealt_bot_frontend/__init__.py deleted file mode 100644 index 4268070..0000000 --- a/src/basealt_bot_frontend/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from basealt-bot-frontend!" diff --git a/src/bot.py b/src/bot.py new file mode 100644 index 0000000..37e0ac8 --- /dev/null +++ b/src/bot.py @@ -0,0 +1,166 @@ +from telegram.ext import Updater, CommandHandler, ConversationHandler, MessageHandler, Filters, CallbackQueryHandler +from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, InlineKeyboardMarkup, InlineKeyboardButton + +# Typing +from telegram import Update, User, Bot + +from os import getenv +import json +from config import TG_TOKEN, REQUEST_KWARGS + +import backend_api +from keyboards import ( + MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, + AnsweringKeyboard, AdminKeyboard, BackToMenuKeyboard +) +from utils import * +from states import States, AdminStates + +from botlogging import logger + + +def start(bot: Bot, update: Update, user_data: dict): + user_data.update({"chosen_task": None}) + + update.message.reply_text( + "Квиз Базальт СПО на День работодателя КНиИТ СГУ", + reply_markup=ReplyKeyboardRemove() + ) + + user: User = update.message.from_user + if user.username is None: + return States.wait_for_username(bot, update, user_data) + + else: + logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) + return States.main_menu(bot, update, user_data) + + +def username_check(bot: Bot, update: Update, user_data): + user: User = update.message.from_user + if user.username is None: + return States.wait_for_username(bot, update, user_data) + + else: + logger.debug(backend_api.register_user(user.id, user.username, user.full_name)) + return States.main_menu(bot, update, user_data) + + +def resume_bot(bot: Bot, update: Update, user_data): + status_code, state = backend_api.get_state(update.message.from_user.id) + + user_data.update(json.loads(state["user_data"])) + last_state = state["last_state"] + + for handler in conversation_handler.states[last_state]: + if handler.filters.filter(update): + return handler.callback(bot, update, user_data) + + return last_state + + +def stop(bot, update): + update.message.reply_text('Пока!', reply_markup=ReplyKeyboardRemove()) + update.message.reply_text('Для того, чтобы начать работу с ботом заново напишите /start') + return ConversationHandler.END + + +def error(bot, update, error): + logger.warning('Update "%s" caused error "%s"', update, error) + + +def main(): + is_prod = getenv('production', '') + + if is_prod: + print('PRODUCTION ENVIRONMENT') + updater = Updater(TG_TOKEN) + else: + print('DEVELOPMENT ENVIRONMENT') + updater = Updater(TG_TOKEN, request_kwargs=REQUEST_KWARGS) + + dp = updater.dispatcher + dp.add_error_handler(error) + dp.add_handler(conversation_handler) + + updater.start_polling() + updater.idle() + + +conversation_handler = ConversationHandler( + entry_points=[ + CommandHandler('start', start, pass_user_data=True), + MessageHandler(Filters.text, resume_bot, pass_user_data=True), + ], + + states={ + WAIT_FOR_USERNAME: [MessageHandler(Filters.text, username_check, pass_user_data=True)], + + MAIN_MENU: [ + MessageHandler(Filters.regex(MenuKeyboard.CHOOSE_TASK), States.choose_task, pass_user_data=True), + MessageHandler(Filters.regex(MenuKeyboard.TOP_10), States.top_10, pass_user_data=True), + MessageHandler(Filters.regex(MenuKeyboard.RULES), States.rules, pass_user_data=True), + CommandHandler('admin', AdminStates.admin_panel, pass_user_data=True) + ], + TASK_CHOOSING: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.text, States.show_task, pass_user_data=True), + ], + TASK_SHOWN: [ + MessageHandler(Filters.regex(TaskChosenKeyboard.CANCEL), States.choose_task, pass_user_data=True), + MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), + # MessageHandler(Filters.regex(TaskChosenKeyboard.TYPE_ANSWER), States.type_answer, pass_user_data=True), + ], + ANSWERING: [ + MessageHandler(Filters.regex(AnsweringKeyboard.CANCEL), States.show_task, pass_user_data=True), + MessageHandler(Filters.text, States.accept_answer, pass_user_data=True), + ], + ANSWER_RIGHT: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], + ANSWER_WRONG: [MessageHandler(Filters.text, States.show_task, pass_user_data=True)], + + ASKING_QUESTION: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.text, States.ask_question, pass_user_data=True), + ], + + # ADMIN PANEL + + ADMIN_MENU: [ + MessageHandler(Filters.regex(AdminKeyboard.CANCEL), States.main_menu, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.PUBLISH_TASK), AdminStates.choose_task_publish, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.HIDE_TASK), AdminStates.choose_task_hide, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.ANNOUNCE), AdminStates.wait_for_announcement, pass_user_data=True), + MessageHandler(Filters.regex(AdminKeyboard.MESSAGE_PLAYER), AdminStates.wait_for_message, pass_user_data=True), + ], + + ADMIN_WAIT_FOR_ANNOUNCEMENT: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.announce_message, pass_user_data=True), + ], + ADMIN_WAIT_FOR_MESSAGE: [ + MessageHandler(Filters.regex(BackToMenuKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.message_plr, pass_user_data=True), + ], + + ADMIN_TASK_CHOOSE_HIDE: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.hide_task, pass_user_data=True), + ], + + ADMIN_TASK_CHOOSE_PUBLISH: [ + MessageHandler(Filters.regex(TasksKeyboard.CANCEL), AdminStates.admin_panel, pass_user_data=True), + MessageHandler(Filters.text, AdminStates.publish_task, pass_user_data=True), + ], + + ADMIN_TASK_PUBLISHED: [MessageHandler(Filters.text, AdminStates.admin_panel, pass_user_data=True)], + ADMIN_ACCESS_DENIED: [MessageHandler(Filters.text, States.main_menu, pass_user_data=True)], + }, + + fallbacks=[ + CommandHandler('stop', stop), + MessageHandler(Filters.regex(MenuKeyboard.HELP), States.prompt_question, pass_user_data=True), + ] +) + +if __name__ == '__main__': + main() diff --git a/src/botlogging.py b/src/botlogging.py new file mode 100644 index 0000000..0a08a8d --- /dev/null +++ b/src/botlogging.py @@ -0,0 +1,24 @@ +import logging +from pathlib import Path + +Path("logs").mkdir(exist_ok=True) + +fh = logging.FileHandler("logs/log.log") +fh.setLevel(logging.DEBUG) + +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) + +log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +fh.setFormatter(log_format) +ch.setFormatter(log_format) + +logging.basicConfig( + level=logging.DEBUG, handlers=[fh, ch], + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(fh) +logger.addHandler(ch) diff --git a/src/config.example.py b/src/config.example.py new file mode 100644 index 0000000..ba0e5bc --- /dev/null +++ b/src/config.example.py @@ -0,0 +1,3 @@ +TG_TOKEN = "" +REQUEST_KWARGS = {} +BACKEND_URL = "http://localhost:8000" diff --git a/src/keyboards.py b/src/keyboards.py new file mode 100644 index 0000000..aa8d761 --- /dev/null +++ b/src/keyboards.py @@ -0,0 +1,123 @@ +import backend_api +from abc import ABC + + +class Keyboard(ABC): + @classmethod + def get_keyboard(cls, telegram_id=None): + pass + + +class MenuKeyboard(Keyboard): + CHOOSE_TASK = "Выбрать задание📚" + TOP_10 = "hidden Топ-10📊" + RULES = "Правилаℹ️" + ADMIN = "/admin" + HELP = "Связаться с Базальт СПО🐧" + + @classmethod + def get_keyboard(cls, telegram_id=None, solved=False): + if telegram_id is not None: + status_code, data = backend_api.get_profile(telegram_id) + if status_code == 200 and data["is_admin"]: + return [ + [cls.ADMIN], + [cls.CHOOSE_TASK], + [cls.TOP_10, cls.RULES], + [cls.HELP], + ] + + if solved: + return [ + [cls.RULES], + [cls.HELP], + ] + else: + return [ + [cls.CHOOSE_TASK], + [cls.RULES], + [cls.HELP], + ] + + +class BackToMenuKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CANCEL]] + + +class TasksKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + status, tasks = backend_api.get_published_tasks() + titles_keyboard = [[cls.CANCEL]] + if status == 200: + titles = [] + for task in sorted(tasks, key=lambda t: t.get("title")): + titles.append([task.get("title")]) + titles_keyboard.extend(titles) + + return titles_keyboard + + +class PublishTasksKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + status, tasks = backend_api.get_tasks() + titles_keyboard = [[cls.CANCEL]] + if status == 200: + titles_keyboard.extend([task.get("title")] for task in tasks) + + return titles_keyboard + + +class TaskChosenKeyboard(Keyboard): + TYPE_ANSWER = "Ввести ответ✏️" + CANCEL = "Назад↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [ + # [cls.TYPE_ANSWER], + [cls.CANCEL], + ] + + +class ContinueKeyboard(Keyboard): + CONTINUE = "Продолжить➡️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CONTINUE]] + + +class AnsweringKeyboard(Keyboard): + CANCEL = "Отмена↩️" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [[cls.CANCEL]] + + +class AdminKeyboard(Keyboard): + CANCEL = "Вернуться в меню↩️" + PUBLISH_TASK = "Опубликовать задачу" + HIDE_TASK = "Скрыть задачу" + ANNOUNCE = "Сделать объявление" + MESSAGE_PLAYER = "Написать сообщение от имени бота" + + @classmethod + def get_keyboard(cls, telegram_id=None): + return [ + [cls.CANCEL], + [cls.PUBLISH_TASK], + [cls.HIDE_TASK], + [cls.ANNOUNCE], + [cls.MESSAGE_PLAYER], + ] diff --git a/src/states.py b/src/states.py new file mode 100644 index 0000000..afb00cb --- /dev/null +++ b/src/states.py @@ -0,0 +1,568 @@ +from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove + +from keyboards import ( + MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, + AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard +) +from utils import * +import backend_api +from time import sleep +import datetime as dt +from decimal import Decimal + +# Typing +from telegram import Update, User, Bot +from typing import List + +from botlogging import logger + + +def save_state(func): + def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs): + last_state = func(bot, update, user_data, *args, **kwargs) + backend_api.save_state(last_state, update.message.from_user.id, user_data) + return last_state + + return wrapper + + +def calc_score(t, base_score=1000): + base_score = Decimal(base_score) + min_score = base_score * Decimal(0.1) + max_t = Decimal(720) + k = (base_score - min_score) / (max_t * max_t) + + if t >= max_t: + return round(min_score, 2) + + return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2) + + +class States: + @staticmethod + @save_state + def prompt_question(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите свой вопрос, он будет перенаправлен представителям компании. " + "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ASKING_QUESTION + + @staticmethod + @save_state + def ask_question(bot: Bot, update: Update, user_data: dict): + question = update.message.text + + code, profiles = backend_api.get_profiles() + if code != 200: + pass + + user_id = update.message.from_user.id + username = update.message.from_user.full_name + + for profile in profiles: + if profile["is_admin"]: + bot.send_message( + profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}", + parse_mode="Markdown" + ) + + update.message.reply_text( + "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return States.main_menu(bot, update, user_data) + + + @staticmethod + @save_state + def wait_for_username(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "По правилам квиза ты не можешь участвовать, если у тебя не указано " + "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!", + reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']]) + ) + return WAIT_FOR_USERNAME + + @staticmethod + @save_state + def main_menu(bot: Bot, update: Update, user_data: dict): + user_data["chosen_task"] = None + + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + + menu_text = [ + + ] + solved = False + + full_score = Decimal(0.0) + if status_code == 200: + if len(response) == 5: + solved = True + if len(response) != 0: + menu_text.append("Твои решенные задачи:") + + for attempt in response: + try: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") + + try: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") + + t = Decimal((ts - fp).total_seconds()) / Decimal(60) + # plr_score = calc_score(t, attempt["task"]["base_score"]) + plr_score = int(attempt["task"]["base_score"]) + full_score += plr_score + + menu_text.append( + f"_{attempt['task']['title']}_ " + f"({plr_score})" + ) + else: + menu_text.append("У тебя еще нет решенных задач") + else: + menu_text.append( + "К сожалению, не удалось получить данные о твоих попытках =(\n" + "Попробуй обратиться к боту чуть позже." + ) + + menu_text.append(f"\n*Итоговый счет*: {full_score}") + + update.message.reply_text("\n".join(menu_text), parse_mode="Markdown") + + if solved: + update.message.reply_text( + "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*", + parse_mode="Markdown" + ) + + update.message.reply_text( + "Выбери следующее действие...", + reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved)) + ) + + return MAIN_MENU + + @staticmethod + @save_state + def choose_task(bot: Bot, update: Update, user_data: dict): + user_data["chosen_task"] = None + + status, published = backend_api.get_published_tasks() + if len(published) == 0: + update.message.reply_text( + "Пока что не опубликовано ни одной задачи =(", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + # okay this is epic (pile of shit) + return ANSWER_RIGHT + else: + update.message.reply_text( + "Какую задачу ты хочешь сдать?", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + return TASK_CHOOSING + + @staticmethod + @save_state + def top_10(bot: Bot, update: Update, user_data: dict): + code, attempts = backend_api.get_attempts() + + top = {} + + for attempt in attempts: + if not attempt["solved"] or attempt["profile"]["is_hidden"]: + continue + + if attempt["profile"]["tg_id"] not in top: + top[attempt["profile"]["tg_id"]] = { + "fullname": attempt["profile"]["fullname"], + "username": attempt["profile"]["username"], + "score": Decimal(0.0) + } + + try: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") + + try: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") + + t = Decimal((ts - fp).total_seconds()) / Decimal(60) + plr_score = calc_score(t, attempt["task"]["base_score"]) + top[attempt["profile"]["tg_id"]]["score"] += plr_score + + try: + top_list = [] + for tg_id, stats in top.items(): + top_list.append(( + stats['score'], tg_id, + f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts" + )) + + top_list.sort(key=lambda p: p[0], reverse=True) + + top_10_ids = [] + places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)} + + top = ["Топ-10:"] + for place, (score, tg_id, text) in enumerate(top_list[:10], 1): + top.append(str(place).rjust(2, " ") + ". " + text) + top_10_ids.append(tg_id) + + if update.message.from_user.id not in top_10_ids: + if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11: + top.append("...") + this_place = places[update.message.from_user.id] + top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2]) + + except Exception as e: + print(e) + + update.message.reply_text("\n".join(top)) + return MAIN_MENU + + @staticmethod + @save_state + def rules(bot: Bot, update: Update, user_data: dict): + r = """Вот несколько правил, которые следует соблюдать: +1) Всего в квизе 5 задач - после решения который можно получить призы. +2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать! +3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время! +4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт.""" + # with open("rules.jpg", "rb") as f: + # try: + # print(update.message.reply_photo(f, timeout=5)) + # except Exception as e: + # print(e) + update.message.reply_text(r) + + return MAIN_MENU + + @staticmethod + @save_state + def show_task(bot: Bot, update: Update, user_data: dict): + if "chosen_task" in user_data and user_data["chosen_task"] is not None: + task_title = user_data["chosen_task"] + else: + task_title = update.message.text + user_data["chosen_task"] = task_title + + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + if status_code != 200: + user_data["chosen_task"] = None + + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + if len(response) != 0: + user_data["chosen_task"] = None + + update.message.reply_text( + "Ты уже решил эту задачу! Выбери другую.", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + status_code, tasks_response = backend_api.get_published_tasks() + if status_code != 200: + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return MAIN_MENU + + titles = {task.get("title"): task for task in tasks_response} + + if task_title not in titles.keys(): + update.message.reply_text( + "Такой задачи не найдено, попробуй ввести другое название!", + reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + task = titles[task_title] + + message = '\n'.join([ + f"*{task['title']}*", + f"{task['statement']}", + ]) + keyboard = TaskChosenKeyboard.get_keyboard() + + update.message.reply_text( + message, parse_mode="Markdown", + reply_markup=ReplyKeyboardMarkup(keyboard) + ) + + update.message.reply_text( + "Вводи свой ответ и я его проверю, " + "или нажми кнопку Назад, чтобы выбрать другую задачу" + ) + + return TASK_SHOWN + + @staticmethod + @save_state + def type_answer(bot: Bot, update: Update, user_data: dict): + status_code, response = backend_api.get_attempts( + tg_id=update.message.from_user.id, + task_title=user_data["chosen_task"] + ) + if len(response) != 0: + update.message.reply_text( + "Ты уже решил эту задачу! Выбери другую.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return TASK_CHOOSING + + else: + update.message.reply_text( + "Вводи свой ответ, я его проверю.", + reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard()) + ) + return ANSWERING + + @staticmethod + @save_state + def accept_answer(bot: Bot, update: Update, user_data: dict): + answer = update.message.text + status_code, task = backend_api.get_task(user_data["chosen_task"]) + if status_code == 200: + backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower()) + + if answer.lower() == task["answer"].lower(): + update.message.reply_text( + "Ты ввел правильный ответ! Возвращаемся в главное меню." + ) + return States.main_menu(bot, update, user_data) + else: + update.message.reply_text( + "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.", + reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard()) + ) + + # return ANSWER_WRONG + return TASK_SHOWN + + else: + update.message.reply_text( + "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ANSWER_RIGHT + + +class AdminStates: + @staticmethod + @save_state + def admin_panel(bot: Bot, update: Update, user_data: dict): + status_code, data = backend_api.get_profile(update.message.from_user.id) + if status_code != 200: + update.message.reply_text( + "Не удалось аутентифицировать пользователя. Доступ запрещен.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_ACCESS_DENIED + + if not data["is_admin"]: + update.message.reply_text( + "Вы не являетесь администратором. Доступ запрещен.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_ACCESS_DENIED + + update.message.reply_text( + "Выберите действие", + reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard()) + ) + + return ADMIN_MENU + + @staticmethod + @save_state + def choose_task_hide(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Выберите задачу, которую хотите скрыть", + reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_CHOOSE_HIDE + + @staticmethod + @save_state + def choose_task_publish(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Выберите задачу, которую хотите опубликовать", + reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_CHOOSE_PUBLISH + + @staticmethod + @save_state + def hide_task(bot: Bot, update: Update, user_data: dict): + title = update.message.text + status, data = backend_api.hide_task(title) + if status != 200: + update.message.reply_text( + "Не удалось скрыть задачу.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + else: + update.message.reply_text( + "Задача была скрыта.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + @staticmethod + @save_state + def publish_task(bot: Bot, update: Update, user_data: dict): + title = update.message.text + status, data = backend_api.publish_task(title) + if status != 200: + update.message.reply_text( + "Не удалось опубликовать задачу.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + else: + update.message.reply_text( + "Задача была опубликована.", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + @staticmethod + @save_state + def wait_for_announcement(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите объявление (Отправка сообщений может занять несколько секунд)", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ADMIN_WAIT_FOR_ANNOUNCEMENT + + @staticmethod + @save_state + def wait_for_message(bot: Bot, update: Update, user_data: dict): + update.message.reply_text( + "Введите сообщение (Отправка сообщений может занять несколько секунд)", + reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) + ) + + return ADMIN_WAIT_FOR_MESSAGE + + @staticmethod + @save_state + def announce_message(bot: Bot, update: Update, user_data: dict): + status, profiles = backend_api.get_profiles() + if status != 200: + pass + else: + ids = [] + for profile in profiles: + ids.append(int(profile["tg_id"])) + + return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update) + + @staticmethod + @save_state + def message_plr(bot: Bot, update: Update, user_data: dict): + text = update.message.text + _ = text.split(maxsplit=1) + + if len(_) != 2: + update.message.reply_text( + "Ошибка в сообщении, попробуйте еще раз", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + return ADMIN_TASK_PUBLISHED + + ids_text, msg = _ + ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(',')) + ids = list(map(int, ids_split)) + + if len(ids) == 0: + update.message.reply_text( + "Все id пользователей неверно введены", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + return ADMIN_TASK_PUBLISHED + + else: + return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update) + + @staticmethod + def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update): + errors = [] + for tg_id in ids: + try: + bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown") + print(tg_id) + except Exception as e: + logger.debug(f"Got exception while announcing message: {e}") + errors.append((tg_id, str(e))) + sleep(0.3) + + sleep(0.05) + + if len(errors) == 0: + update.message.reply_text( + "Сообщение успешно отправлено всем пользователям", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()), + ) + + # OKAY + return ADMIN_TASK_PUBLISHED + + else: + error_msg = [] + user_ids = [] + + for err in errors: + user_ids.append(str(err[0])) + error_msg.append(f"{err[0]}. Reason: {err[1]}") + + msg = "\n".join(error_msg) + user_ids = ",".join(user_ids) + + update.message.reply_text( + "Во время отправки сообщений возникли следующие ошибки:\n" + f"{msg}\n\n{user_ids}", + reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) + ) + + # OKAY + return ADMIN_TASK_PUBLISHED diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..933f0e8 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,13 @@ +( + WAIT_FOR_USERNAME, + MAIN_MENU, TOP_10, RULES, + TASK_CHOOSING, TASK_SHOWN, ANSWERING, + ANSWER_RIGHT, ANSWER_WRONG, + ASKING_QUESTION, + + ADMIN_MENU, ADMIN_TASK_CHOOSE_PUBLISH, ADMIN_TASK_CHOOSE_HIDE, + ADMIN_WAIT_FOR_ANNOUNCEMENT, ADMIN_WAIT_FOR_MESSAGE, + ADMIN_TASK_PUBLISHED, + ADMIN_ACCESS_DENIED, + *_ +) = range(100) diff --git a/states.py b/states.py deleted file mode 100644 index afb00cb..0000000 --- a/states.py +++ /dev/null @@ -1,568 +0,0 @@ -from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove - -from keyboards import ( - MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, - AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard -) -from utils import * -import backend_api -from time import sleep -import datetime as dt -from decimal import Decimal - -# Typing -from telegram import Update, User, Bot -from typing import List - -from botlogging import logger - - -def save_state(func): - def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs): - last_state = func(bot, update, user_data, *args, **kwargs) - backend_api.save_state(last_state, update.message.from_user.id, user_data) - return last_state - - return wrapper - - -def calc_score(t, base_score=1000): - base_score = Decimal(base_score) - min_score = base_score * Decimal(0.1) - max_t = Decimal(720) - k = (base_score - min_score) / (max_t * max_t) - - if t >= max_t: - return round(min_score, 2) - - return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2) - - -class States: - @staticmethod - @save_state - def prompt_question(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите свой вопрос, он будет перенаправлен представителям компании. " - "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ASKING_QUESTION - - @staticmethod - @save_state - def ask_question(bot: Bot, update: Update, user_data: dict): - question = update.message.text - - code, profiles = backend_api.get_profiles() - if code != 200: - pass - - user_id = update.message.from_user.id - username = update.message.from_user.full_name - - for profile in profiles: - if profile["is_admin"]: - bot.send_message( - profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}", - parse_mode="Markdown" - ) - - update.message.reply_text( - "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return States.main_menu(bot, update, user_data) - - - @staticmethod - @save_state - def wait_for_username(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "По правилам квиза ты не можешь участвовать, если у тебя не указано " - "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!", - reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']]) - ) - return WAIT_FOR_USERNAME - - @staticmethod - @save_state - def main_menu(bot: Bot, update: Update, user_data: dict): - user_data["chosen_task"] = None - - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - - menu_text = [ - - ] - solved = False - - full_score = Decimal(0.0) - if status_code == 200: - if len(response) == 5: - solved = True - if len(response) != 0: - menu_text.append("Твои решенные задачи:") - - for attempt in response: - try: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") - - try: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") - - t = Decimal((ts - fp).total_seconds()) / Decimal(60) - # plr_score = calc_score(t, attempt["task"]["base_score"]) - plr_score = int(attempt["task"]["base_score"]) - full_score += plr_score - - menu_text.append( - f"_{attempt['task']['title']}_ " - f"({plr_score})" - ) - else: - menu_text.append("У тебя еще нет решенных задач") - else: - menu_text.append( - "К сожалению, не удалось получить данные о твоих попытках =(\n" - "Попробуй обратиться к боту чуть позже." - ) - - menu_text.append(f"\n*Итоговый счет*: {full_score}") - - update.message.reply_text("\n".join(menu_text), parse_mode="Markdown") - - if solved: - update.message.reply_text( - "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*", - parse_mode="Markdown" - ) - - update.message.reply_text( - "Выбери следующее действие...", - reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved)) - ) - - return MAIN_MENU - - @staticmethod - @save_state - def choose_task(bot: Bot, update: Update, user_data: dict): - user_data["chosen_task"] = None - - status, published = backend_api.get_published_tasks() - if len(published) == 0: - update.message.reply_text( - "Пока что не опубликовано ни одной задачи =(", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - # okay this is epic (pile of shit) - return ANSWER_RIGHT - else: - update.message.reply_text( - "Какую задачу ты хочешь сдать?", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - return TASK_CHOOSING - - @staticmethod - @save_state - def top_10(bot: Bot, update: Update, user_data: dict): - code, attempts = backend_api.get_attempts() - - top = {} - - for attempt in attempts: - if not attempt["solved"] or attempt["profile"]["is_hidden"]: - continue - - if attempt["profile"]["tg_id"] not in top: - top[attempt["profile"]["tg_id"]] = { - "fullname": attempt["profile"]["fullname"], - "username": attempt["profile"]["username"], - "score": Decimal(0.0) - } - - try: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") - - try: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") - - t = Decimal((ts - fp).total_seconds()) / Decimal(60) - plr_score = calc_score(t, attempt["task"]["base_score"]) - top[attempt["profile"]["tg_id"]]["score"] += plr_score - - try: - top_list = [] - for tg_id, stats in top.items(): - top_list.append(( - stats['score'], tg_id, - f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts" - )) - - top_list.sort(key=lambda p: p[0], reverse=True) - - top_10_ids = [] - places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)} - - top = ["Топ-10:"] - for place, (score, tg_id, text) in enumerate(top_list[:10], 1): - top.append(str(place).rjust(2, " ") + ". " + text) - top_10_ids.append(tg_id) - - if update.message.from_user.id not in top_10_ids: - if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11: - top.append("...") - this_place = places[update.message.from_user.id] - top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2]) - - except Exception as e: - print(e) - - update.message.reply_text("\n".join(top)) - return MAIN_MENU - - @staticmethod - @save_state - def rules(bot: Bot, update: Update, user_data: dict): - r = """Вот несколько правил, которые следует соблюдать: -1) Всего в квизе 5 задач - после решения который можно получить призы. -2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать! -3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время! -4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт.""" - # with open("rules.jpg", "rb") as f: - # try: - # print(update.message.reply_photo(f, timeout=5)) - # except Exception as e: - # print(e) - update.message.reply_text(r) - - return MAIN_MENU - - @staticmethod - @save_state - def show_task(bot: Bot, update: Update, user_data: dict): - if "chosen_task" in user_data and user_data["chosen_task"] is not None: - task_title = user_data["chosen_task"] - else: - task_title = update.message.text - user_data["chosen_task"] = task_title - - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - if status_code != 200: - user_data["chosen_task"] = None - - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - if len(response) != 0: - user_data["chosen_task"] = None - - update.message.reply_text( - "Ты уже решил эту задачу! Выбери другую.", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - status_code, tasks_response = backend_api.get_published_tasks() - if status_code != 200: - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return MAIN_MENU - - titles = {task.get("title"): task for task in tasks_response} - - if task_title not in titles.keys(): - update.message.reply_text( - "Такой задачи не найдено, попробуй ввести другое название!", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - task = titles[task_title] - - message = '\n'.join([ - f"*{task['title']}*", - f"{task['statement']}", - ]) - keyboard = TaskChosenKeyboard.get_keyboard() - - update.message.reply_text( - message, parse_mode="Markdown", - reply_markup=ReplyKeyboardMarkup(keyboard) - ) - - update.message.reply_text( - "Вводи свой ответ и я его проверю, " - "или нажми кнопку Назад, чтобы выбрать другую задачу" - ) - - return TASK_SHOWN - - @staticmethod - @save_state - def type_answer(bot: Bot, update: Update, user_data: dict): - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - if len(response) != 0: - update.message.reply_text( - "Ты уже решил эту задачу! Выбери другую.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - else: - update.message.reply_text( - "Вводи свой ответ, я его проверю.", - reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard()) - ) - return ANSWERING - - @staticmethod - @save_state - def accept_answer(bot: Bot, update: Update, user_data: dict): - answer = update.message.text - status_code, task = backend_api.get_task(user_data["chosen_task"]) - if status_code == 200: - backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower()) - - if answer.lower() == task["answer"].lower(): - update.message.reply_text( - "Ты ввел правильный ответ! Возвращаемся в главное меню." - ) - return States.main_menu(bot, update, user_data) - else: - update.message.reply_text( - "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.", - reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard()) - ) - - # return ANSWER_WRONG - return TASK_SHOWN - - else: - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ANSWER_RIGHT - - -class AdminStates: - @staticmethod - @save_state - def admin_panel(bot: Bot, update: Update, user_data: dict): - status_code, data = backend_api.get_profile(update.message.from_user.id) - if status_code != 200: - update.message.reply_text( - "Не удалось аутентифицировать пользователя. Доступ запрещен.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_ACCESS_DENIED - - if not data["is_admin"]: - update.message.reply_text( - "Вы не являетесь администратором. Доступ запрещен.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_ACCESS_DENIED - - update.message.reply_text( - "Выберите действие", - reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard()) - ) - - return ADMIN_MENU - - @staticmethod - @save_state - def choose_task_hide(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Выберите задачу, которую хотите скрыть", - reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_CHOOSE_HIDE - - @staticmethod - @save_state - def choose_task_publish(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Выберите задачу, которую хотите опубликовать", - reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_CHOOSE_PUBLISH - - @staticmethod - @save_state - def hide_task(bot: Bot, update: Update, user_data: dict): - title = update.message.text - status, data = backend_api.hide_task(title) - if status != 200: - update.message.reply_text( - "Не удалось скрыть задачу.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - else: - update.message.reply_text( - "Задача была скрыта.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - @staticmethod - @save_state - def publish_task(bot: Bot, update: Update, user_data: dict): - title = update.message.text - status, data = backend_api.publish_task(title) - if status != 200: - update.message.reply_text( - "Не удалось опубликовать задачу.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - else: - update.message.reply_text( - "Задача была опубликована.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - @staticmethod - @save_state - def wait_for_announcement(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите объявление (Отправка сообщений может занять несколько секунд)", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ADMIN_WAIT_FOR_ANNOUNCEMENT - - @staticmethod - @save_state - def wait_for_message(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите сообщение (Отправка сообщений может занять несколько секунд)", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ADMIN_WAIT_FOR_MESSAGE - - @staticmethod - @save_state - def announce_message(bot: Bot, update: Update, user_data: dict): - status, profiles = backend_api.get_profiles() - if status != 200: - pass - else: - ids = [] - for profile in profiles: - ids.append(int(profile["tg_id"])) - - return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update) - - @staticmethod - @save_state - def message_plr(bot: Bot, update: Update, user_data: dict): - text = update.message.text - _ = text.split(maxsplit=1) - - if len(_) != 2: - update.message.reply_text( - "Ошибка в сообщении, попробуйте еще раз", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - return ADMIN_TASK_PUBLISHED - - ids_text, msg = _ - ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(',')) - ids = list(map(int, ids_split)) - - if len(ids) == 0: - update.message.reply_text( - "Все id пользователей неверно введены", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - else: - return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update) - - @staticmethod - def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update): - errors = [] - for tg_id in ids: - try: - bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown") - print(tg_id) - except Exception as e: - logger.debug(f"Got exception while announcing message: {e}") - errors.append((tg_id, str(e))) - sleep(0.3) - - sleep(0.05) - - if len(errors) == 0: - update.message.reply_text( - "Сообщение успешно отправлено всем пользователям", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()), - ) - - # OKAY - return ADMIN_TASK_PUBLISHED - - else: - error_msg = [] - user_ids = [] - - for err in errors: - user_ids.append(str(err[0])) - error_msg.append(f"{err[0]}. Reason: {err[1]}") - - msg = "\n".join(error_msg) - user_ids = ",".join(user_ids) - - update.message.reply_text( - "Во время отправки сообщений возникли следующие ошибки:\n" - f"{msg}\n\n{user_ids}", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - # OKAY - return ADMIN_TASK_PUBLISHED diff --git a/utils.py b/utils.py deleted file mode 100644 index 933f0e8..0000000 --- a/utils.py +++ /dev/null @@ -1,13 +0,0 @@ -( - WAIT_FOR_USERNAME, - MAIN_MENU, TOP_10, RULES, - TASK_CHOOSING, TASK_SHOWN, ANSWERING, - ANSWER_RIGHT, ANSWER_WRONG, - ASKING_QUESTION, - - ADMIN_MENU, ADMIN_TASK_CHOOSE_PUBLISH, ADMIN_TASK_CHOOSE_HIDE, - ADMIN_WAIT_FOR_ANNOUNCEMENT, ADMIN_WAIT_FOR_MESSAGE, - ADMIN_TASK_PUBLISHED, - ADMIN_ACCESS_DENIED, - *_ -) = range(100) -- cgit v1.2.3