from backend.core.utils import render_template, NOT_FOUND_CODE from backend.settings import STATIC_FILES_PATH from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse from backend.database.database import get_wrapper_for import csv def return_static(query, *args): if args[0] in ['css', 'js']: try: return TextFileResponse( path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', extension={'js': 'javascript', 'css': 'css'}.get(args[0]) ) except (PermissionError, FileNotFoundError): return ErrorResponse(404) elif args[0] == 'images': try: return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1]) except (PermissionError, FileNotFoundError): return ErrorResponse(404) def upload_file(query, *args): base_html = 'Return to main page
%s' try: data = query['files'][0]['data'].decode('utf-8') except (UnicodeDecodeError, KeyError, IndexError): return ErrorResponse(500, base_html % '

Error while reading file

') wrapper = get_wrapper_for('mysql') permitted_headers = wrapper.get_column_names() try: data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"')) if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers): return ErrorResponse(500, base_html % '

File format error

') except IndexError: return ErrorResponse(500, base_html % '

File format error

') wrapper.clear_table('table_task1') scheme = wrapper.schemes['table_task1'] for row in data[1:]: validation_results = scheme.validate(dict(zip(data[0], row))) if not validation_results['error']: wrapper.insert_one('table_task1', row, data[0]) else: return ErrorResponse(500) return HtmlResponse(base_html % '

File uploaded

') def validate(query, *args): wrapper = get_wrapper_for('mysql') scheme = wrapper.schemes['table_task1'] validation_results = scheme.validate(query) return JsonResponse(validation_results) def update_post(query, *args): wrapper = get_wrapper_for('mysql') scheme = wrapper.schemes['table_task1'] permitted_fields = wrapper.get_column_names() permitted_fields.remove('service_id') validation_results = scheme.validate(query) if not validation_results['error']: expressions = {} for key, value in query.items(): if key in permitted_fields: expressions[key] = value wrapper.update('table_task1', expressions, {'service_id': query['service_id']}) else: return ErrorResponse(500) return HtmlResponse('Return to main page

Database Updated

') def delete_post(query, *args): get_wrapper_for('mysql').delete_from('table_task1', {'service_id': query['service_id']}) return HtmlResponse('Return to main page

Database Updated

') def add_post(query, *args): wrapper = get_wrapper_for('mysql') header_fields = wrapper.get_column_names() headers = [] values = [] for field_name in query: if field_name in header_fields: headers.append(field_name) values.append(f'"{query[field_name]}"') if len(headers) == len(header_fields): wrapper.insert_one('table_task1', values, headers) return HtmlResponse('Return to main page

Database Updated

') def db_get(query, *args): wrapper = get_wrapper_for('mysql') table_headers = wrapper.get_column_names() if query['type'] == 'full': content = wrapper.get_data('table_task1') for row in content: # creation_time row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':')) # creation_request_sent_date if row[8] is not None: creation_date, creation_time = str(row[8]).split() creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) row[8] = creation_date + ' ' + creation_time json_content = [] for row in content: new_row = [] for col in row: if not isinstance(col, (float, bool, int, dict, list, tuple, str)): col = str(col) new_row.append(col) json_content.append(new_row) return JsonResponse({'headers': table_headers, 'content': json_content}) elif query['type'] == 'single_id': content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0] # creation_time content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':')) # creation_request_sent_date if content[8] is not None: creation_date, creation_time = str(content[8]).split() creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) content[8] = creation_date + ' ' + creation_time if content is not None: json_content = [] for col in content: if not isinstance(col, (float, bool, int, dict, list, tuple)): col = str(col) json_content.append(col) return JsonResponse(dict(zip(table_headers, json_content))) else: return JsonResponse({}) return ErrorResponse(NOT_FOUND_CODE) def index_get(query, *args): data = render_template('index.html') return HtmlResponse(data)