summaryrefslogtreecommitdiff
path: root/src/states.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/states.py')
-rw-r--r--src/states.py568
1 files changed, 568 insertions, 0 deletions
diff --git a/src/states.py b/src/states.py
new file mode 100644
index 0000000..afb00cb
--- /dev/null
+++ b/src/states.py
@@ -0,0 +1,568 @@
+from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove
+
+from keyboards import (
+ MenuKeyboard, TasksKeyboard, TaskChosenKeyboard, ContinueKeyboard,
+ AnsweringKeyboard, AdminKeyboard, PublishTasksKeyboard, BackToMenuKeyboard
+)
+from utils import *
+import backend_api
+from time import sleep
+import datetime as dt
+from decimal import Decimal
+
+# Typing
+from telegram import Update, User, Bot
+from typing import List
+
+from botlogging import logger
+
+
+def save_state(func):
+ def wrapper(bot: Bot, update: Update, user_data: dict, *args, **kwargs):
+ last_state = func(bot, update, user_data, *args, **kwargs)
+ backend_api.save_state(last_state, update.message.from_user.id, user_data)
+ return last_state
+
+ return wrapper
+
+
+def calc_score(t, base_score=1000):
+ base_score = Decimal(base_score)
+ min_score = base_score * Decimal(0.1)
+ max_t = Decimal(720)
+ k = (base_score - min_score) / (max_t * max_t)
+
+ if t >= max_t:
+ return round(min_score, 2)
+
+ return round(min(k * (t - max_t) * (t - max_t) + min_score, base_score), 2)
+
+
+class States:
+ @staticmethod
+ @save_state
+ def prompt_question(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите свой вопрос, он будет перенаправлен представителям компании. "
+ "Вам ответят как только появится возможность, поэтому следите за сообщениями от бота.",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ASKING_QUESTION
+
+ @staticmethod
+ @save_state
+ def ask_question(bot: Bot, update: Update, user_data: dict):
+ question = update.message.text
+
+ code, profiles = backend_api.get_profiles()
+ if code != 200:
+ pass
+
+ user_id = update.message.from_user.id
+ username = update.message.from_user.full_name
+
+ for profile in profiles:
+ if profile["is_admin"]:
+ bot.send_message(
+ profile["tg_id"], f"*ВОПРОС ОТ ПОЛЬЗОВАТЕЛЯ {username} ({user_id})*:\n\n{question}",
+ parse_mode="Markdown"
+ )
+
+ update.message.reply_text(
+ "Ваш вопрос отправлен на рассмотрение, ожидайте ответ.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return States.main_menu(bot, update, user_data)
+
+
+ @staticmethod
+ @save_state
+ def wait_for_username(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "По правилам квиза ты не можешь участвовать, если у тебя не указано "
+ "имя пользователя, поэтому укажи его и возвращайся как только это сделаешь!",
+ reply_markup=ReplyKeyboardMarkup([['Я указал имя пользователя']])
+ )
+ return WAIT_FOR_USERNAME
+
+ @staticmethod
+ @save_state
+ def main_menu(bot: Bot, update: Update, user_data: dict):
+ user_data["chosen_task"] = None
+
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+
+ menu_text = [
+
+ ]
+ solved = False
+
+ full_score = Decimal(0.0)
+ if status_code == 200:
+ if len(response) == 5:
+ solved = True
+ if len(response) != 0:
+ menu_text.append("Твои решенные задачи:")
+
+ for attempt in response:
+ try:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ")
+
+ try:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ")
+
+ t = Decimal((ts - fp).total_seconds()) / Decimal(60)
+ # plr_score = calc_score(t, attempt["task"]["base_score"])
+ plr_score = int(attempt["task"]["base_score"])
+ full_score += plr_score
+
+ menu_text.append(
+ f"_{attempt['task']['title']}_ "
+ f"({plr_score})"
+ )
+ else:
+ menu_text.append("У тебя еще нет решенных задач")
+ else:
+ menu_text.append(
+ "К сожалению, не удалось получить данные о твоих попытках =(\n"
+ "Попробуй обратиться к боту чуть позже."
+ )
+
+ menu_text.append(f"\n*Итоговый счет*: {full_score}")
+
+ update.message.reply_text("\n".join(menu_text), parse_mode="Markdown")
+
+ if solved:
+ update.message.reply_text(
+ "*У тебя решены все задачи! Подойди к нам, чтобы получить приз.*",
+ parse_mode="Markdown"
+ )
+
+ update.message.reply_text(
+ "Выбери следующее действие...",
+ reply_markup=ReplyKeyboardMarkup(MenuKeyboard.get_keyboard(update.message.from_user.id, solved))
+ )
+
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def choose_task(bot: Bot, update: Update, user_data: dict):
+ user_data["chosen_task"] = None
+
+ status, published = backend_api.get_published_tasks()
+ if len(published) == 0:
+ update.message.reply_text(
+ "Пока что не опубликовано ни одной задачи =(",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ # okay this is epic (pile of shit)
+ return ANSWER_RIGHT
+ else:
+ update.message.reply_text(
+ "Какую задачу ты хочешь сдать?",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+ return TASK_CHOOSING
+
+ @staticmethod
+ @save_state
+ def top_10(bot: Bot, update: Update, user_data: dict):
+ code, attempts = backend_api.get_attempts()
+
+ top = {}
+
+ for attempt in attempts:
+ if not attempt["solved"] or attempt["profile"]["is_hidden"]:
+ continue
+
+ if attempt["profile"]["tg_id"] not in top:
+ top[attempt["profile"]["tg_id"]] = {
+ "fullname": attempt["profile"]["fullname"],
+ "username": attempt["profile"]["username"],
+ "score": Decimal(0.0)
+ }
+
+ try:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ ts = dt.datetime.strptime(attempt["timestamp"], "%Y-%m-%dT%H:%M:%SZ")
+
+ try:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ except ValueError:
+ fp = dt.datetime.strptime(attempt["task"]["first_published"], "%Y-%m-%dT%H:%M:%SZ")
+
+ t = Decimal((ts - fp).total_seconds()) / Decimal(60)
+ plr_score = calc_score(t, attempt["task"]["base_score"])
+ top[attempt["profile"]["tg_id"]]["score"] += plr_score
+
+ try:
+ top_list = []
+ for tg_id, stats in top.items():
+ top_list.append((
+ stats['score'], tg_id,
+ f"{stats['fullname']} (@{stats['username']}) -- {stats['score']}pts"
+ ))
+
+ top_list.sort(key=lambda p: p[0], reverse=True)
+
+ top_10_ids = []
+ places = {tg_id: place for place, (score, tg_id, text) in enumerate(top_list, 1)}
+
+ top = ["Топ-10:"]
+ for place, (score, tg_id, text) in enumerate(top_list[:10], 1):
+ top.append(str(place).rjust(2, " ") + ". " + text)
+ top_10_ids.append(tg_id)
+
+ if update.message.from_user.id not in top_10_ids:
+ if len(top_10_ids) == 10 and places[update.message.from_user.id] > 11:
+ top.append("...")
+ this_place = places[update.message.from_user.id]
+ top.append(str(this_place).rjust(2, " ") + ". " + top_list[this_place - 1][2])
+
+ except Exception as e:
+ print(e)
+
+ update.message.reply_text("\n".join(top))
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def rules(bot: Bot, update: Update, user_data: dict):
+ r = """Вот несколько правил, которые следует соблюдать:
+1) Всего в квизе 5 задач - после решения который можно получить призы.
+2) Пожалуйста, постарайся решить каждую задачу самостоятельно и никому не подсказывать!
+3) Если есть какие-то вопросы к нам, либо проблемы с задачами, нажми "Связаться с Базальт СПО" и напиши свой вопрос, мы ответим тебе в ближайшее время!
+4) Если возникнут проблемы с интерфейсом, попробуй написать /stop, а затем /start, чтобы общение с ботом перезапустилось. Прогресс решённых задач никуда не пропадёт."""
+ # with open("rules.jpg", "rb") as f:
+ # try:
+ # print(update.message.reply_photo(f, timeout=5))
+ # except Exception as e:
+ # print(e)
+ update.message.reply_text(r)
+
+ return MAIN_MENU
+
+ @staticmethod
+ @save_state
+ def show_task(bot: Bot, update: Update, user_data: dict):
+ if "chosen_task" in user_data and user_data["chosen_task"] is not None:
+ task_title = user_data["chosen_task"]
+ else:
+ task_title = update.message.text
+ user_data["chosen_task"] = task_title
+
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+ if status_code != 200:
+ user_data["chosen_task"] = None
+
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ if len(response) != 0:
+ user_data["chosen_task"] = None
+
+ update.message.reply_text(
+ "Ты уже решил эту задачу! Выбери другую.",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ status_code, tasks_response = backend_api.get_published_tasks()
+ if status_code != 200:
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return MAIN_MENU
+
+ titles = {task.get("title"): task for task in tasks_response}
+
+ if task_title not in titles.keys():
+ update.message.reply_text(
+ "Такой задачи не найдено, попробуй ввести другое название!",
+ reply_markup=ReplyKeyboardMarkup(TasksKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ task = titles[task_title]
+
+ message = '\n'.join([
+ f"*{task['title']}*",
+ f"{task['statement']}",
+ ])
+ keyboard = TaskChosenKeyboard.get_keyboard()
+
+ update.message.reply_text(
+ message, parse_mode="Markdown",
+ reply_markup=ReplyKeyboardMarkup(keyboard)
+ )
+
+ update.message.reply_text(
+ "Вводи свой ответ и я его проверю, "
+ "или нажми кнопку Назад, чтобы выбрать другую задачу"
+ )
+
+ return TASK_SHOWN
+
+ @staticmethod
+ @save_state
+ def type_answer(bot: Bot, update: Update, user_data: dict):
+ status_code, response = backend_api.get_attempts(
+ tg_id=update.message.from_user.id,
+ task_title=user_data["chosen_task"]
+ )
+ if len(response) != 0:
+ update.message.reply_text(
+ "Ты уже решил эту задачу! Выбери другую.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return TASK_CHOOSING
+
+ else:
+ update.message.reply_text(
+ "Вводи свой ответ, я его проверю.",
+ reply_markup=ReplyKeyboardMarkup(AnsweringKeyboard.get_keyboard())
+ )
+ return ANSWERING
+
+ @staticmethod
+ @save_state
+ def accept_answer(bot: Bot, update: Update, user_data: dict):
+ answer = update.message.text
+ status_code, task = backend_api.get_task(user_data["chosen_task"])
+ if status_code == 200:
+ backend_api.create_attempt(update.message.from_user.id, user_data["chosen_task"], answer.lower())
+
+ if answer.lower() == task["answer"].lower():
+ update.message.reply_text(
+ "Ты ввел правильный ответ! Возвращаемся в главное меню."
+ )
+ return States.main_menu(bot, update, user_data)
+ else:
+ update.message.reply_text(
+ "К сожалению, твой ответ неверный =( Попробуй ввести другой ответ.",
+ reply_markup=ReplyKeyboardMarkup(TaskChosenKeyboard.get_keyboard())
+ )
+
+ # return ANSWER_WRONG
+ return TASK_SHOWN
+
+ else:
+ update.message.reply_text(
+ "Произошла ошибка в работе квиза. Мы уже работаем над её устранением!",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ANSWER_RIGHT
+
+
+class AdminStates:
+ @staticmethod
+ @save_state
+ def admin_panel(bot: Bot, update: Update, user_data: dict):
+ status_code, data = backend_api.get_profile(update.message.from_user.id)
+ if status_code != 200:
+ update.message.reply_text(
+ "Не удалось аутентифицировать пользователя. Доступ запрещен.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_ACCESS_DENIED
+
+ if not data["is_admin"]:
+ update.message.reply_text(
+ "Вы не являетесь администратором. Доступ запрещен.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_ACCESS_DENIED
+
+ update.message.reply_text(
+ "Выберите действие",
+ reply_markup=ReplyKeyboardMarkup(AdminKeyboard.get_keyboard())
+ )
+
+ return ADMIN_MENU
+
+ @staticmethod
+ @save_state
+ def choose_task_hide(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Выберите задачу, которую хотите скрыть",
+ reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_CHOOSE_HIDE
+
+ @staticmethod
+ @save_state
+ def choose_task_publish(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Выберите задачу, которую хотите опубликовать",
+ reply_markup=ReplyKeyboardMarkup(PublishTasksKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_CHOOSE_PUBLISH
+
+ @staticmethod
+ @save_state
+ def hide_task(bot: Bot, update: Update, user_data: dict):
+ title = update.message.text
+ status, data = backend_api.hide_task(title)
+ if status != 200:
+ update.message.reply_text(
+ "Не удалось скрыть задачу.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ else:
+ update.message.reply_text(
+ "Задача была скрыта.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ @staticmethod
+ @save_state
+ def publish_task(bot: Bot, update: Update, user_data: dict):
+ title = update.message.text
+ status, data = backend_api.publish_task(title)
+ if status != 200:
+ update.message.reply_text(
+ "Не удалось опубликовать задачу.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ else:
+ update.message.reply_text(
+ "Задача была опубликована.",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ @staticmethod
+ @save_state
+ def wait_for_announcement(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите объявление (Отправка сообщений может занять несколько секунд)",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ADMIN_WAIT_FOR_ANNOUNCEMENT
+
+ @staticmethod
+ @save_state
+ def wait_for_message(bot: Bot, update: Update, user_data: dict):
+ update.message.reply_text(
+ "Введите сообщение (Отправка сообщений может занять несколько секунд)",
+ reply_markup=ReplyKeyboardMarkup(BackToMenuKeyboard.get_keyboard())
+ )
+
+ return ADMIN_WAIT_FOR_MESSAGE
+
+ @staticmethod
+ @save_state
+ def announce_message(bot: Bot, update: Update, user_data: dict):
+ status, profiles = backend_api.get_profiles()
+ if status != 200:
+ pass
+ else:
+ ids = []
+ for profile in profiles:
+ ids.append(int(profile["tg_id"]))
+
+ return AdminStates.send_to_ids(ids, "ОБЪЯВЛЕНИЕ ОТ МОДЕРАТОРОВ:", update.message.text, bot, update)
+
+ @staticmethod
+ @save_state
+ def message_plr(bot: Bot, update: Update, user_data: dict):
+ text = update.message.text
+ _ = text.split(maxsplit=1)
+
+ if len(_) != 2:
+ update.message.reply_text(
+ "Ошибка в сообщении, попробуйте еще раз",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+ return ADMIN_TASK_PUBLISHED
+
+ ids_text, msg = _
+ ids_split = filter(lambda s: len(s) > 0 and s.isnumeric(), ids_text.split(','))
+ ids = list(map(int, ids_split))
+
+ if len(ids) == 0:
+ update.message.reply_text(
+ "Все id пользователей неверно введены",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ return ADMIN_TASK_PUBLISHED
+
+ else:
+ return AdminStates.send_to_ids(ids, "СООБЩЕНИЕ ОТ МОДЕРАТОРОВ:", msg, bot, update)
+
+ @staticmethod
+ def send_to_ids(ids: List[int], prefix: str, message: str, bot: Bot, update: Update):
+ errors = []
+ for tg_id in ids:
+ try:
+ bot.send_message(tg_id, f"*{prefix}*\n\n{message}", parse_mode="Markdown")
+ print(tg_id)
+ except Exception as e:
+ logger.debug(f"Got exception while announcing message: {e}")
+ errors.append((tg_id, str(e)))
+ sleep(0.3)
+
+ sleep(0.05)
+
+ if len(errors) == 0:
+ update.message.reply_text(
+ "Сообщение успешно отправлено всем пользователям",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard()),
+ )
+
+ # OKAY
+ return ADMIN_TASK_PUBLISHED
+
+ else:
+ error_msg = []
+ user_ids = []
+
+ for err in errors:
+ user_ids.append(str(err[0]))
+ error_msg.append(f"{err[0]}. Reason: {err[1]}")
+
+ msg = "\n".join(error_msg)
+ user_ids = ",".join(user_ids)
+
+ update.message.reply_text(
+ "Во время отправки сообщений возникли следующие ошибки:\n"
+ f"{msg}\n\n{user_ids}",
+ reply_markup=ReplyKeyboardMarkup(ContinueKeyboard.get_keyboard())
+ )
+
+ # OKAY
+ return ADMIN_TASK_PUBLISHED