from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove from keyboards import ( MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard, AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard ) from utils import * import backend_api from time import sleep import datetime as dt from decimal import Decimal # Typing from telegram import Update, User, Bot from typing import List from botlogging import logger def save_state(func): def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs): last_state = func(bot, update, user_data, *args, **kwargs) backend_api.save_state(last_state, update.message.from_user.id, user_data) return last_state return wrapper def calc_score(t, base_score=1000): base_score = Decimal(base_score) min_score = base_score * Decimal(0.1) max_t = Decimal(720) k = (base_score - min_score) / (max_t * max_t) if t >= max_t: return round(min_score, 2) return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2) class States: @staticmethod @save_state def prompt_question(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "Введите свой вопрос, он будет перенаправлен представителям компании. " "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.", reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) ) return ASKING_QUESTION @staticmethod @save_state def ask_question(bot: Bot, update: Update, user_data: dict): question = update.message.text code, profiles = backend_api.get_profiles() if code != 200: pass user_id = update.message.from_user.id username = update.message.from_user.full_name for profile in profiles: if profile["is_admin"]: bot.send_message( profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}", parse_mode="Markdown" ) update.message.reply_text( "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return States.main_menu(bot, update, user_data) @staticmethod @save_state def wait_for_username(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "По правилам квиза ты не можешь участвовать, если у тебя не указано " "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!", reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']]) ) return WAIT_FOR_USERNAME @staticmethod @save_state def main_menu(bot: Bot, update: Update, user_data: dict): user_data["chosen_task"] = None status_code, response = backend_api.get_attempts( tg_id=update.message.from_user.id, task_title=user_data["chosen_task"] ) menu_text = [ ] solved = False full_score = Decimal(0.0) if status_code == 200: if len(response) == 5: solved = True if len(response) != 0: menu_text.append("Твои решенные задачи:") for attempt in response: try: ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") try: fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") t = Decimal((ts - fp).total_seconds()) / Decimal(60) # plr_score = calc_score(t, attempt["task"]["base_score"]) plr_score = int(attempt["task"]["base_score"]) full_score += plr_score menu_text.append( f"_{attempt['task']['title']}_ " f"({plr_score})" ) else: menu_text.append("У тебя еще нет решенных задач") else: menu_text.append( "К сожалению, не удалось получить данные о твоих попытках =(\n" "Попробуй обратиться к боту чуть позже." ) menu_text.append(f"\n*Итоговый счет*: {full_score}") update.message.reply_text("\n".join(menu_text), parse_mode="Markdown") if solved: update.message.reply_text( "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*", parse_mode="Markdown" ) update.message.reply_text( "Выбери следующее действие...", reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved)) ) return MAIN_MENU @staticmethod @save_state def choose_task(bot: Bot, update: Update, user_data: dict): user_data["chosen_task"] = None status, published = backend_api.get_published_tasks() if len(published) == 0: update.message.reply_text( "Пока что не опубликовано ни одной задачи =(", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) # okay this is epic (pile of shit) return ANSWER_RIGHT else: update.message.reply_text( "Какую задачу ты хочешь сдать?", reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) ) return TASK_CHOOSING @staticmethod @save_state def top_10(bot: Bot, update: Update, user_data: dict): code, attempts = backend_api.get_attempts() top = {} for attempt in attempts: if not attempt["solved"] or attempt["profile"]["is_hidden"]: continue if attempt["profile"]["tg_id"] not in top: top[attempt["profile"]["tg_id"]] = { "fullname": attempt["profile"]["fullname"], "username": attempt["profile"]["username"], "score": Decimal(0.0) } try: ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ") try: fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ") t = Decimal((ts - fp).total_seconds()) / Decimal(60) plr_score = calc_score(t, attempt["task"]["base_score"]) top[attempt["profile"]["tg_id"]]["score"] += plr_score try: top_list = [] for tg_id, stats in top.items(): top_list.append(( stats['score'], tg_id, f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts" )) top_list.sort(key=lambda p: p[0], reverse=True) top_10_ids = [] places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)} top = ["Топ-10:"] for place, (score, tg_id, text) in enumerate(top_list[:10], 1): top.append(str(place).rjust(2, " ") + ". " + text) top_10_ids.append(tg_id) if update.message.from_user.id not in top_10_ids: if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11: top.append("...") this_place = places[update.message.from_user.id] top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2]) except Exception as e: print(e) update.message.reply_text("\n".join(top)) return MAIN_MENU @staticmethod @save_state def rules(bot: Bot, update: Update, user_data: dict): r = """Вот несколько правил, которые следует соблюдать: 1) Всего в квизе 5 задач - после решения который можно получить призы. 2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать! 3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время! 4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт.""" # with open("rules.jpg", "rb") as f: # try: # print(update.message.reply_photo(f, timeout=5)) # except Exception as e: # print(e) update.message.reply_text(r) return MAIN_MENU @staticmethod @save_state def show_task(bot: Bot, update: Update, user_data: dict): if "chosen_task" in user_data and user_data["chosen_task"] is not None: task_title = user_data["chosen_task"] else: task_title = update.message.text user_data["chosen_task"] = task_title status_code, response = backend_api.get_attempts( tg_id=update.message.from_user.id, task_title=user_data["chosen_task"] ) if status_code != 200: user_data["chosen_task"] = None update.message.reply_text( "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) ) return TASK_CHOOSING if len(response) != 0: user_data["chosen_task"] = None update.message.reply_text( "Ты уже решил эту задачу! Выбери другую.", reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) ) return TASK_CHOOSING status_code, tasks_response = backend_api.get_published_tasks() if status_code != 200: update.message.reply_text( "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return MAIN_MENU titles = {task.get("title"): task for task in tasks_response} if task_title not in titles.keys(): update.message.reply_text( "Такой задачи не найдено, попробуй ввести другое название!", reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard()) ) return TASK_CHOOSING task = titles[task_title] message = '\n'.join([ f"*{task['title']}*", f"{task['statement']}", ]) keyboard = TaskChosenKeyboard.get_keyboard() update.message.reply_text( message, parse_mode="Markdown", reply_markup=ReplyKeyboardMarkup(keyboard) ) update.message.reply_text( "Вводи свой ответ и я его проверю, " "или нажми кнопку Назад, чтобы выбрать другую задачу" ) return TASK_SHOWN @staticmethod @save_state def type_answer(bot: Bot, update: Update, user_data: dict): status_code, response = backend_api.get_attempts( tg_id=update.message.from_user.id, task_title=user_data["chosen_task"] ) if len(response) != 0: update.message.reply_text( "Ты уже решил эту задачу! Выбери другую.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return TASK_CHOOSING else: update.message.reply_text( "Вводи свой ответ, я его проверю.", reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard()) ) return ANSWERING @staticmethod @save_state def accept_answer(bot: Bot, update: Update, user_data: dict): answer = update.message.text status_code, task = backend_api.get_task(user_data["chosen_task"]) if status_code == 200: backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower()) if answer.lower() == task["answer"].lower(): update.message.reply_text( "Ты ввел правильный ответ! Возвращаемся в главное меню." ) return States.main_menu(bot, update, user_data) else: update.message.reply_text( "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.", reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard()) ) # return ANSWER_WRONG return TASK_SHOWN else: update.message.reply_text( "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ANSWER_RIGHT class AdminStates: @staticmethod @save_state def admin_panel(bot: Bot, update: Update, user_data: dict): status_code, data = backend_api.get_profile(update.message.from_user.id) if status_code != 200: update.message.reply_text( "Не удалось аутентифицировать пользователя. Доступ запрещен.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_ACCESS_DENIED if not data["is_admin"]: update.message.reply_text( "Вы не являетесь администратором. Доступ запрещен.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_ACCESS_DENIED update.message.reply_text( "Выберите действие", reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard()) ) return ADMIN_MENU @staticmethod @save_state def choose_task_hide(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "Выберите задачу, которую хотите скрыть", reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) ) return ADMIN_TASK_CHOOSE_HIDE @staticmethod @save_state def choose_task_publish(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "Выберите задачу, которую хотите опубликовать", reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard()) ) return ADMIN_TASK_CHOOSE_PUBLISH @staticmethod @save_state def hide_task(bot: Bot, update: Update, user_data: dict): title = update.message.text status, data = backend_api.hide_task(title) if status != 200: update.message.reply_text( "Не удалось скрыть задачу.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) else: update.message.reply_text( "Задача была скрыта.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_TASK_PUBLISHED @staticmethod @save_state def publish_task(bot: Bot, update: Update, user_data: dict): title = update.message.text status, data = backend_api.publish_task(title) if status != 200: update.message.reply_text( "Не удалось опубликовать задачу.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) else: update.message.reply_text( "Задача была опубликована.", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_TASK_PUBLISHED @staticmethod @save_state def wait_for_announcement(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "Введите объявление (Отправка сообщений может занять несколько секунд)", reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) ) return ADMIN_WAIT_FOR_ANNOUNCEMENT @staticmethod @save_state def wait_for_message(bot: Bot, update: Update, user_data: dict): update.message.reply_text( "Введите сообщение (Отправка сообщений может занять несколько секунд)", reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard()) ) return ADMIN_WAIT_FOR_MESSAGE @staticmethod @save_state def announce_message(bot: Bot, update: Update, user_data: dict): status, profiles = backend_api.get_profiles() if status != 200: pass else: ids = [] for profile in profiles: ids.append(int(profile["tg_id"])) return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update) @staticmethod @save_state def message_plr(bot: Bot, update: Update, user_data: dict): text = update.message.text _ = text.split(maxsplit=1) if len(_) != 2: update.message.reply_text( "Ошибка в сообщении, попробуйте еще раз", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_TASK_PUBLISHED ids_text, msg = _ ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(',')) ids = list(map(int, ids_split)) if len(ids) == 0: update.message.reply_text( "Все id пользователей неверно введены", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) return ADMIN_TASK_PUBLISHED else: return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update) @staticmethod def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update): errors = [] for tg_id in ids: try: bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown") print(tg_id) except Exception as e: logger.debug(f"Got exception while announcing message: {e}") errors.append((tg_id, str(e))) sleep(0.3) sleep(0.05) if len(errors) == 0: update.message.reply_text( "Сообщение успешно отправлено всем пользователям", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()), ) # OKAY return ADMIN_TASK_PUBLISHED else: error_msg = [] user_ids = [] for err in errors: user_ids.append(str(err[0])) error_msg.append(f"{err[0]}. Reason: {err[1]}") msg = "\n".join(error_msg) user_ids = ",".join(user_ids) update.message.reply_text( "Во время отправки сообщений возникли следующие ошибки:\n" f"{msg}\n\n{user_ids}", reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()) ) # OKAY return ADMIN_TASK_PUBLISHED