from backend.core.utils import render_template, NOT_FOUND_CODE
from backend.settings import STATIC_FILES_PATH
from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse
from backend.database.database import get_wrapper_for
from backend.database.validators import ValidationError
import csv
def return_static(query, *args):
if args[0] in ['css', 'js']:
try:
return TextFileResponse(
path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}',
extension={'js': 'javascript', 'css': 'css'}.get(args[0])
)
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
elif args[0] == 'images':
try:
return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1])
except (PermissionError, FileNotFoundError):
return ErrorResponse(404)
def upload_file(query, *args):
base_html = 'Return to main page
%s'
try:
data = query['files'][0]['data'].decode('utf-8')
except (UnicodeDecodeError, KeyError, IndexError):
return ErrorResponse(500, base_html % '
Error while reading file
')
wrapper = get_wrapper_for('mysql')
permitted_headers = wrapper.get_column_names()
try:
data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"'))
if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers):
return ErrorResponse(500, base_html % 'File format error
')
except IndexError:
return ErrorResponse(500, base_html % 'File format error
')
wrapper.clear_table('table_task1')
scheme = wrapper.schemes['table_task1']
for row in data[1:]:
insert_values = dict(zip(data[0], row))
validation_results = scheme.validate(insert_values)
if not validation_results['error']:
wrapper.insert_one('table_task1', insert_values)
else:
return ErrorResponse(500)
return HtmlResponse(base_html % 'File uploaded
')
def validate(query, *args):
wrapper = get_wrapper_for('mysql')
scheme = wrapper.schemes['table_task1']
validation_results = scheme.validate(query)
return JsonResponse(validation_results)
def update_post(query, *args):
wrapper = get_wrapper_for('mysql')
scheme = wrapper.schemes['table_task1']
permitted_fields = wrapper.get_column_names()
permitted_fields.remove('service_id')
validation_results = scheme.validate(query)
if not validation_results['error']:
expressions = {}
for key, value in query.items():
if key in permitted_fields:
expressions[key] = value
wrapper.update('table_task1', expressions, {'service_id': query['service_id']})
return JsonResponse(validation_results)
def delete_post(query, *args):
wrapper = get_wrapper_for('mysql')
scheme = wrapper.schemes['table_task1']
try:
scheme.fields['service_id'].validate(query['service_id'])
except ValidationError as e:
result = {'error': True, 'service_id': str(e)}
else:
result = {'error': False}
wrapper.delete_from('table_task1', {'service_id': query['service_id']})
return JsonResponse(result)
def add_post(query, *args):
wrapper = get_wrapper_for('mysql')
header_fields = wrapper.get_column_names()
insert_data = {}
for field_name, value in query.items():
if field_name in header_fields:
insert_data[field_name] = value
scheme = wrapper.schemes['table_task1']
validation_result = scheme.validate(query)
if not validation_result['error']:
wrapper.insert_one('table_task1', insert_data)
return JsonResponse(validation_result)
def db_get(query, *args):
wrapper = get_wrapper_for('mysql')
table_headers = wrapper.get_column_names()
if query['type'] == 'full':
content = wrapper.get_data('table_task1')
for row in content:
# creation_time
row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':'))
# creation_request_sent_date
if row[8] is not None:
creation_date, creation_time = str(row[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
row[8] = creation_date + ' ' + creation_time
json_content = []
for row in content:
new_row = []
for col in row:
if not isinstance(col, (float, bool, int, dict, list, tuple, str)):
col = str(col)
new_row.append(col)
json_content.append(new_row)
return JsonResponse({'headers': table_headers, 'content': json_content})
elif query['type'] == 'single_id':
content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0]
# creation_time
content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':'))
# creation_request_sent_date
if content[8] is not None:
creation_date, creation_time = str(content[8]).split()
creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
content[8] = creation_date + ' ' + creation_time
if content is not None:
json_content = []
for col in content:
if not isinstance(col, (float, bool, int, dict, list, tuple)):
col = str(col)
json_content.append(col)
return JsonResponse(dict(zip(table_headers, json_content)))
else:
return JsonResponse({})
return ErrorResponse(NOT_FOUND_CODE)
def index_get(query, *args):
data = render_template('index.html')
return HtmlResponse(data)