from backend.core.utils import render_template, NOT_FOUND_CODE
from backend.settings import STATIC_FILES_PATH
from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse
from backend.database.database import get_wrapper_for
import csv
def return_static(query, *args):
if args[0] in ['css', 'js']:
try:
return TextFileResponse(
path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}',
extension={'js': 'javascript', 'css': 'css'}.get(args[0])
)
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
elif args[0] == 'images':
try:
return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1])
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
def upload_file(query, *args):
base_html = 'Return to main page
%s'
try:
data = query['files'][0]['data'].decode('utf-8')
except (UnicodeDecodeError, KeyError, IndexError):
return ErrorResponse(500, base_html % '
Error while reading file
')
wrapper = get_wrapper_for('mysql')
permitted_headers = wrapper.get_column_names()
try:
data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"'))
if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers):
return ErrorResponse(500, base_html % 'File format error
')
except IndexError:
return ErrorResponse(500, base_html % 'File format error
')
wrapper.clear_table('table_task1')
scheme = wrapper.schemes['table_task1']
for row in data[1:]:
validation_results = scheme.validate(dict(zip(data[0], row)))
if not validation_results['error']:
wrapper.insert_one('table_task1', row, data[0])
else:
return ErrorResponse(500)
return HtmlResponse(base_html % 'File uploaded
')
def validate(query, *args):
wrapper = get_wrapper_for('mysql')
scheme = wrapper.schemes['table_task1']
validation_results = scheme.validate(query)
return JsonResponse(validation_results)
def update_post(query, *args):
wrapper = get_wrapper_for('mysql')
service_id = query['service_id']
permitted_fields = wrapper.get_column_names()
permitted_fields.remove('service_id')
query_set = []
for field_name, value in query.items():
if field_name in permitted_fields:
query_set.append(f'{field_name}="{value}"')
wrapper.update('table_task1', query_set, {'service_id': service_id})
return HtmlResponse('Return to main page
Database Updated
')
def delete_post(query, *args):
get_wrapper_for('mysql').delete_from('table_task1', {'service_id': query['service_id']})
return HtmlResponse('Return to main page
Database Updated
')
def add_post(query, *args):
wrapper = get_wrapper_for('mysql')
header_fields = wrapper.get_column_names()
headers = []
values = []
for field_name in query:
if field_name in header_fields:
headers.append(field_name)
values.append(f'"{query[field_name]}"')
if len(headers) == len(header_fields):
wrapper.insert_one('table_task1', values, headers)
return HtmlResponse('Return to main page
Database Updated
')
def db_get(query, *args):
wrapper = get_wrapper_for('mysql')
table_headers = wrapper.get_column_names()
if query['type'] == 'full':
content = wrapper.get_data('table_task1')
for row in content:
# creation_time
row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':'))
# creation_request_sent_date
if row[8] is not None:
creation_date, creation_time = str(row[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
row[8] = creation_date + ' ' + creation_time
json_content = []
for row in content:
new_row = []
for col in row:
if not isinstance(col, (float, bool, int, dict, list, tuple, str)):
col = str(col)
new_row.append(col)
json_content.append(new_row)
return JsonResponse({'headers': table_headers, 'content': json_content})
elif query['type'] == 'single_id':
content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0]
# creation_time
content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':'))
# creation_request_sent_date
if content[8] is not None:
creation_date, creation_time = str(content[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
content[8] = creation_date + ' ' + creation_time
if content is not None:
json_content = []
for col in content:
if not isinstance(col, (float, bool, int, dict, list, tuple)):
col = str(col)
json_content.append(col)
return JsonResponse(dict(zip(table_headers, json_content)))
else:
return JsonResponse({})
return ErrorResponse(NOT_FOUND_CODE)
def index_get(query, *args):
data = render_template('index.html')
return HtmlResponse(data)