from backend.core.utils import render_template, NOT_FOUND_CODE from backend.settings import STATIC_FILES_PATH from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse from backend.database.database import get_wrapper_for from backend.database.validators import ValidationError from backend.logger import Logger import csv import io def return_static(query, *args): if args[0] in ['css', 'js']: try: return TextFileResponse( path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', extension={'js': 'javascript', 'css': 'css'}.get(args[0]) ) except (PermissionError, FileNotFoundError): return ErrorResponse(404) elif args[0] == 'images': try: return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1]) except (PermissionError, FileNotFoundError): return ErrorResponse(404) def upload_file(query, *args): try: data = query['files'][0]['data'].decode('utf-8') except (UnicodeDecodeError, KeyError, IndexError): return ErrorResponse(500, 'Error while reading file') wrapper = get_wrapper_for('mysql') wrapper.clear_table('table_task1') scheme = wrapper.schemes['table_task1'] result = parse_csv(data, scheme) if result is None: return ErrorResponse(500, 'Wrong file format') for index, row in enumerate(result): Logger.get_logger().debug(f'Validating') validation_results = scheme.validate(row) if not validation_results['error']: Logger.get_logger().debug(f'No errors') wrapper.insert_one('table_task1', row) else: Logger.get_logger().debug(f'Error {validation_results}') return ErrorResponse(500, f'Validation error\n{validation_results}') return HtmlResponse('File uploaded') def parse_csv(data, scheme): reader = csv.DictReader(io.StringIO(data), delimiter=';') if set(reader.fieldnames) != set(scheme.fields.keys()): return None result = [] for row in reader: insert_row = {} for field_name, value in row.items(): if scheme.fields[field_name].nullable and value == 'NULL': insert_row[field_name] = None else: data_type = scheme.fields[field_name].data_type try: new_value = data_type(value) except ValueError: Logger.get_logger().debug(f'Unable to cast `{value}` to {data_type}') insert_row[field_name] = value else: insert_row[field_name] = new_value result.append(insert_row) return result def validate(query, *args): wrapper = get_wrapper_for('mysql') scheme = wrapper.schemes['table_task1'] validation_results = scheme.validate(query) return JsonResponse(validation_results) def update_post(query, *args): wrapper = get_wrapper_for('mysql') scheme = wrapper.schemes['table_task1'] permitted_fields = wrapper.get_column_names() permitted_fields.remove('service_id') validation_results = scheme.validate(query) if not validation_results['error']: expressions = {} for key, value in query.items(): if key in permitted_fields: expressions[key] = value wrapper.update('table_task1', expressions, {'service_id': query['service_id']}) return JsonResponse(validation_results) def delete_post(query, *args): wrapper = get_wrapper_for('mysql') scheme = wrapper.schemes['table_task1'] try: scheme.fields['service_id'].validate(query['service_id']) except ValidationError as e: result = {'error': True, 'service_id': str(e)} else: result = {'error': False} wrapper.delete_from('table_task1', {'service_id': query['service_id']}) return JsonResponse(result) def add_post(query, *args): wrapper = get_wrapper_for('mysql') header_fields = wrapper.get_column_names() insert_data = {} for field_name, value in query.items(): if field_name in header_fields: insert_data[field_name] = value scheme = wrapper.schemes['table_task1'] validation_result = scheme.validate(query) if not validation_result['error']: wrapper.insert_one('table_task1', insert_data) return JsonResponse(validation_result) def db_get(query, *args): wrapper = get_wrapper_for('mysql') table_headers = wrapper.get_column_names() if query['type'] == 'full': content = wrapper.get_data('table_task1') for row in content: # creation_time row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':')) # creation_request_sent_date if row[8] is not None: creation_date, creation_time = str(row[8]).split() creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) row[8] = creation_date + ' ' + creation_time json_content = [] for row in content: new_row = [] for col in row: if not isinstance(col, (float, bool, int, dict, list, tuple, str)): col = str(col) new_row.append(col) json_content.append(new_row) return JsonResponse({'headers': table_headers, 'content': json_content}) elif query['type'] == 'single_id': content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0] # creation_time content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':')) # creation_request_sent_date if content[8] is not None: creation_date, creation_time = str(content[8]).split() creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) content[8] = creation_date + ' ' + creation_time if content is not None: json_content = [] for col in content: if not isinstance(col, (float, bool, int, dict, list, tuple)): col = str(col) json_content.append(col) return JsonResponse(dict(zip(table_headers, json_content))) else: return JsonResponse({}) return ErrorResponse(NOT_FOUND_CODE) def index_get(query, *args): data = render_template('index.html') return HtmlResponse(data)