diff options
| author | Andrew Guschin <guschin@altlinux.org> | 2024-10-15 20:08:19 +0400 |
|---|---|---|
| committer | Andrew Guschin <guschin@altlinux.org> | 2024-10-15 20:08:19 +0400 |
| commit | dd177e3d2c4579bf435c1f583a1ceab3fb438791 (patch) | |
| tree | c721d654b78d504e88449d2b51f78ceb2eea2c3c /states.py | |
| parent | 3562ed767dbddfbcac321c7006962e1283eb63af (diff) | |
move project files to src
Diffstat (limited to 'states.py')
| -rw-r--r-- | states.py | 568 |
1 files changed, 0 insertions, 568 deletions
diff --git a/states.py b/states.py deleted file mode 100644 index afb00cb..0000000 --- a/states.py +++ /dev/null @@ -1,568 +0,0 @@ -from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove - -from keyboards import ( - MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, - AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard -) -from utils import * -import backend_api -from time import sleep -import datetime as dt -from decimal import Decimal - -# Typing -from telegram import Update, User, Bot -from typing import List - -from botlogging import logger - - -def save_state(func): - def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs): - last_state = func(bot, update, user_data, *args, **kwargs) - backend_api.save_state(last_state, update.message.from_user.id, user_data) - return last_state - - return wrapper - - -def calc_score(t, base_score=1000): - base_score = Decimal(base_score) - min_score = base_score * Decimal(0.1) - max_t = Decimal(720) - k = (base_score - min_score) / (max_t * max_t) - - if t >= max_t: - return round(min_score, 2) - - return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2) - - -class States: - @staticmethod - @save_state - def prompt_question(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите свой вопрос, он будет перенаправлен представителям компании. " - "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ASKING_QUESTION - - @staticmethod - @save_state - def ask_question(bot: Bot, update: Update, user_data: dict): - question = update.message.text - - code, profiles = backend_api.get_profiles() - if code != 200: - pass - - user_id = update.message.from_user.id - username = update.message.from_user.full_name - - for profile in profiles: - if profile["is_admin"]: - bot.send_message( - profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}", - parse_mode="Markdown" - ) - - update.message.reply_text( - "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return States.main_menu(bot, update, user_data) - - - @staticmethod - @save_state - def wait_for_username(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "По правилам квиза ты не можешь участвовать, если у тебя не указано " - "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!", - reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']]) - ) - return WAIT_FOR_USERNAME - - @staticmethod - @save_state - def main_menu(bot: Bot, update: Update, user_data: dict): - user_data["chosen_task"] = None - - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - - menu_text = [ - - ] - solved = False - - full_score = Decimal(0.0) - if status_code == 200: - if len(response) == 5: - solved = True - if len(response) != 0: - menu_text.append("Твои решенные задачи:") - - for attempt in response: - try: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") - - try: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") - - t = Decimal((ts - fp).total_seconds()) / Decimal(60) - # plr_score = calc_score(t, attempt["task"]["base_score"]) - plr_score = int(attempt["task"]["base_score"]) - full_score += plr_score - - menu_text.append( - f"_{attempt['task']['title']}_ " - f"({plr_score})" - ) - else: - menu_text.append("У тебя еще нет решенных задач") - else: - menu_text.append( - "К сожалению, не удалось получить данные о твоих попытках =(\n" - "Попробуй обратиться к боту чуть позже." - ) - - menu_text.append(f"\n*Итоговый счет*: {full_score}") - - update.message.reply_text("\n".join(menu_text), parse_mode="Markdown") - - if solved: - update.message.reply_text( - "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*", - parse_mode="Markdown" - ) - - update.message.reply_text( - "Выбери следующее действие...", - reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved)) - ) - - return MAIN_MENU - - @staticmethod - @save_state - def choose_task(bot: Bot, update: Update, user_data: dict): - user_data["chosen_task"] = None - - status, published = backend_api.get_published_tasks() - if len(published) == 0: - update.message.reply_text( - "Пока что не опубликовано ни одной задачи =(", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - # okay this is epic (pile of shit) - return ANSWER_RIGHT - else: - update.message.reply_text( - "Какую задачу ты хочешь сдать?", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - return TASK_CHOOSING - - @staticmethod - @save_state - def top_10(bot: Bot, update: Update, user_data: dict): - code, attempts = backend_api.get_attempts() - - top = {} - - for attempt in attempts: - if not attempt["solved"] or attempt["profile"]["is_hidden"]: - continue - - if attempt["profile"]["tg_id"] not in top: - top[attempt["profile"]["tg_id"]] = { - "fullname": attempt["profile"]["fullname"], - "username": attempt["profile"]["username"], - "score": Decimal(0.0) - } - - try: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") - - try: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") - - t = Decimal((ts - fp).total_seconds()) / Decimal(60) - plr_score = calc_score(t, attempt["task"]["base_score"]) - top[attempt["profile"]["tg_id"]]["score"] += plr_score - - try: - top_list = [] - for tg_id, stats in top.items(): - top_list.append(( - stats['score'], tg_id, - f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts" - )) - - top_list.sort(key=lambda p: p[0], reverse=True) - - top_10_ids = [] - places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)} - - top = ["Топ-10:"] - for place, (score, tg_id, text) in enumerate(top_list[:10], 1): - top.append(str(place).rjust(2, " ") + ". " + text) - top_10_ids.append(tg_id) - - if update.message.from_user.id not in top_10_ids: - if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11: - top.append("...") - this_place = places[update.message.from_user.id] - top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2]) - - except Exception as e: - print(e) - - update.message.reply_text("\n".join(top)) - return MAIN_MENU - - @staticmethod - @save_state - def rules(bot: Bot, update: Update, user_data: dict): - r = """Вот несколько правил, которые следует соблюдать: -1) Всего в квизе 5 задач - после решения который можно получить призы. -2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать! -3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время! -4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт.""" - # with open("rules.jpg", "rb") as f: - # try: - # print(update.message.reply_photo(f, timeout=5)) - # except Exception as e: - # print(e) - update.message.reply_text(r) - - return MAIN_MENU - - @staticmethod - @save_state - def show_task(bot: Bot, update: Update, user_data: dict): - if "chosen_task" in user_data and user_data["chosen_task"] is not None: - task_title = user_data["chosen_task"] - else: - task_title = update.message.text - user_data["chosen_task"] = task_title - - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - if status_code != 200: - user_data["chosen_task"] = None - - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - if len(response) != 0: - user_data["chosen_task"] = None - - update.message.reply_text( - "Ты уже решил эту задачу! Выбери другую.", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - status_code, tasks_response = backend_api.get_published_tasks() - if status_code != 200: - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return MAIN_MENU - - titles = {task.get("title"): task for task in tasks_response} - - if task_title not in titles.keys(): - update.message.reply_text( - "Такой задачи не найдено, попробуй ввести другое название!", - reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - task = titles[task_title] - - message = '\n'.join([ - f"*{task['title']}*", - f"{task['statement']}", - ]) - keyboard = TaskChosenKeyboard.get_keyboard() - - update.message.reply_text( - message, parse_mode="Markdown", - reply_markup=ReplyKeyboardMarkup(keyboard) - ) - - update.message.reply_text( - "Вводи свой ответ и я его проверю, " - "или нажми кнопку Назад, чтобы выбрать другую задачу" - ) - - return TASK_SHOWN - - @staticmethod - @save_state - def type_answer(bot: Bot, update: Update, user_data: dict): - status_code, response = backend_api.get_attempts( - tg_id=update.message.from_user.id, - task_title=user_data["chosen_task"] - ) - if len(response) != 0: - update.message.reply_text( - "Ты уже решил эту задачу! Выбери другую.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return TASK_CHOOSING - - else: - update.message.reply_text( - "Вводи свой ответ, я его проверю.", - reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard()) - ) - return ANSWERING - - @staticmethod - @save_state - def accept_answer(bot: Bot, update: Update, user_data: dict): - answer = update.message.text - status_code, task = backend_api.get_task(user_data["chosen_task"]) - if status_code == 200: - backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower()) - - if answer.lower() == task["answer"].lower(): - update.message.reply_text( - "Ты ввел правильный ответ! Возвращаемся в главное меню." - ) - return States.main_menu(bot, update, user_data) - else: - update.message.reply_text( - "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.", - reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard()) - ) - - # return ANSWER_WRONG - return TASK_SHOWN - - else: - update.message.reply_text( - "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ANSWER_RIGHT - - -class AdminStates: - @staticmethod - @save_state - def admin_panel(bot: Bot, update: Update, user_data: dict): - status_code, data = backend_api.get_profile(update.message.from_user.id) - if status_code != 200: - update.message.reply_text( - "Не удалось аутентифицировать пользователя. Доступ запрещен.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_ACCESS_DENIED - - if not data["is_admin"]: - update.message.reply_text( - "Вы не являетесь администратором. Доступ запрещен.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_ACCESS_DENIED - - update.message.reply_text( - "Выберите действие", - reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard()) - ) - - return ADMIN_MENU - - @staticmethod - @save_state - def choose_task_hide(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Выберите задачу, которую хотите скрыть", - reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_CHOOSE_HIDE - - @staticmethod - @save_state - def choose_task_publish(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Выберите задачу, которую хотите опубликовать", - reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_CHOOSE_PUBLISH - - @staticmethod - @save_state - def hide_task(bot: Bot, update: Update, user_data: dict): - title = update.message.text - status, data = backend_api.hide_task(title) - if status != 200: - update.message.reply_text( - "Не удалось скрыть задачу.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - else: - update.message.reply_text( - "Задача была скрыта.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - @staticmethod - @save_state - def publish_task(bot: Bot, update: Update, user_data: dict): - title = update.message.text - status, data = backend_api.publish_task(title) - if status != 200: - update.message.reply_text( - "Не удалось опубликовать задачу.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - else: - update.message.reply_text( - "Задача была опубликована.", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - @staticmethod - @save_state - def wait_for_announcement(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите объявление (Отправка сообщений может занять несколько секунд)", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ADMIN_WAIT_FOR_ANNOUNCEMENT - - @staticmethod - @save_state - def wait_for_message(bot: Bot, update: Update, user_data: dict): - update.message.reply_text( - "Введите сообщение (Отправка сообщений может занять несколько секунд)", - reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) - ) - - return ADMIN_WAIT_FOR_MESSAGE - - @staticmethod - @save_state - def announce_message(bot: Bot, update: Update, user_data: dict): - status, profiles = backend_api.get_profiles() - if status != 200: - pass - else: - ids = [] - for profile in profiles: - ids.append(int(profile["tg_id"])) - - return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update) - - @staticmethod - @save_state - def message_plr(bot: Bot, update: Update, user_data: dict): - text = update.message.text - _ = text.split(maxsplit=1) - - if len(_) != 2: - update.message.reply_text( - "Ошибка в сообщении, попробуйте еще раз", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - return ADMIN_TASK_PUBLISHED - - ids_text, msg = _ - ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(',')) - ids = list(map(int, ids_split)) - - if len(ids) == 0: - update.message.reply_text( - "Все id пользователей неверно введены", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - return ADMIN_TASK_PUBLISHED - - else: - return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update) - - @staticmethod - def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update): - errors = [] - for tg_id in ids: - try: - bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown") - print(tg_id) - except Exception as e: - logger.debug(f"Got exception while announcing message: {e}") - errors.append((tg_id, str(e))) - sleep(0.3) - - sleep(0.05) - - if len(errors) == 0: - update.message.reply_text( - "Сообщение успешно отправлено всем пользователям", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()), - ) - - # OKAY - return ADMIN_TASK_PUBLISHED - - else: - error_msg = [] - user_ids = [] - - for err in errors: - user_ids.append(str(err[0])) - error_msg.append(f"{err[0]}. Reason: {err[1]}") - - msg = "\n".join(error_msg) - user_ids = ",".join(user_ids) - - update.message.reply_text( - "Во время отправки сообщений возникли следующие ошибки:\n" - f"{msg}\n\n{user_ids}", - reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) - ) - - # OKAY - return ADMIN_TASK_PUBLISHED |