from backend.router import route
from backend.utils import render_template, NOT_FOUND_CODE
from backend.config import SERVER_HOST, SERVER_PORT, STATIC_FILES_PATH
from backend.core import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse
import logging
import csv
from sys import stdout
@route('/static/(.*?)/(.*?)/?')
def return_static(query, *args):
if args[0] in ['css', 'js']:
try:
return TextFileResponse(
path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}',
extension={'js': 'javascript', 'css': 'css'}.get(args[0])
)
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
elif args[0] == 'images':
try:
return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1])
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
@route('/api/upload', ['POST'])
def upload_file(query, *args):
base_html = 'Return to main page
%s'
try:
data = query['files'][0]['data'].decode()
except (UnicodeDecodeError, KeyError, IndexError):
return ErrorResponse(500, base_html % '
Error while reading file
')
permitted_headers = db_column_names()
try:
data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"'))
if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers):
return ErrorResponse(500, base_html % 'File format error
')
except IndexError:
return ErrorResponse(500, base_html % 'File format error
')
cursor = db.cursor()
cursor.execute("START TRANSACTION; DELETE FROM `table_task1`; COMMIT;")
cursor.close()
for row in data[1:]:
values = []
for i in range(len(row)):
if row[i] == 'NULL':
if data[0][i] == 'creation_request_sent_date':
value = 'NULL'
else:
value = '"NULL"'
else:
value = f'"{row[i]}"'
values.append(value)
cursor = db.cursor()
cursor.execute(
"START TRANSACTION; "
"INSERT INTO `table_task1` ({}) VALUES ({}); "
"COMMIT;".format(
','.join(data[0]), ','.join(values)
)
)
cursor.close()
return HtmlResponse(base_html % 'File uploaded
')
@route('/api/update', ['POST'])
def update_post(query, *args):
print(query)
service_id = query['service_id']
permitted_fields = db_column_names()
permitted_fields.remove('service_id')
query_set = []
for field_name, value in query.items():
if field_name in permitted_fields:
query_set.append(f'{field_name}="{value}"')
cursor = db.cursor()
cursor.execute("START TRANSACTION; UPDATE `table_task1` SET {} WHERE {}; COMMIT;".format(
','.join(query_set), f'service_id="{service_id}"'
))
cursor.close()
return HtmlResponse('Return to main page
Database Updated
')
@route('/api/delete', ['POST'])
def delete_post(query, *args):
service_id = query['service_id']
cursor = db.cursor()
cursor.execute(f'START TRANSACTION; DELETE FROM `table_task1` WHERE service_id="{service_id}"; COMMIT;')
return HtmlResponse('Return to main page
Database Updated
')
@route('/api/add', ['POST'])
def add_post(query, *args):
header_fields = db_column_names()
headers = []
values = []
for field_name in query:
if field_name in header_fields:
headers.append(field_name)
values.append(f'"{query[field_name]}"')
if len(headers) == len(header_fields):
cursor = db.cursor()
cursor.execute("START TRANSACTION; INSERT INTO `table_task1` ({}) VALUES ({}); COMMIT;".format(
','.join(headers), ','.join(values)
))
cursor.close()
return HtmlResponse('Return to main page
Database Updated
')
@route('/api/get/?', ['POST'])
def db_get(query, *args):
table_headers = db_column_names()
cursor = db.cursor()
if query['type'] == 'full':
cursor.execute('SELECT * FROM table_task1;')
content = list(map(list, cursor.fetchall()))
for row in content:
# creation_time
row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':'))
# creation_request_sent_date
if row[8] is not None:
creation_date, creation_time = str(row[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
row[8] = creation_date + ' ' + creation_time
cursor.close()
json_content = []
for row in content:
new_row = []
for col in row:
if not isinstance(col, (float, bool, int, dict, list, tuple, str)):
col = str(col)
new_row.append(col)
json_content.append(new_row)
return JsonResponse({'headers': table_headers, 'content': json_content})
elif query['type'] == 'single_id':
cursor.execute(f'SELECT * FROM table_task1 WHERE service_id="{query["service_id"]}";')
content = list(cursor.fetchall()[0])
# creation_time
content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':'))
# creation_request_sent_date
if content[8] is not None:
creation_date, creation_time = str(content[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
content[8] = creation_date + ' ' + creation_time
cursor.close()
if content is not None:
json_content = []
for col in content:
if not isinstance(col, (float, bool, int, dict, list, tuple)):
col = str(col)
json_content.append(col)
return JsonResponse(dict(zip(table_headers, json_content)))
else:
return JsonResponse({})
return ErrorResponse(NOT_FOUND_CODE)
@route('/')
def index_get(query, *args):
data = render_template('index.html')
return HtmlResponse(data)
def prepare_logger():
logger = logging.getLogger('tableApp')
logger.setLevel(logging.INFO)
sh = logging.StreamHandler(stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
if __name__ == '__main__':
prepare_logger()
from backend.server import start_server
from backend.database import db, db_column_names
logging.getLogger('tableApp').info(f'Starting server...')
start_server(SERVER_HOST, SERVER_PORT)