diff options
Diffstat (limited to 'day9/task5_vue/backend/views.py')
| -rw-r--r-- | day9/task5_vue/backend/views.py | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/day9/task5_vue/backend/views.py b/day9/task5_vue/backend/views.py new file mode 100644 index 0000000..d6d9351 --- /dev/null +++ b/day9/task5_vue/backend/views.py @@ -0,0 +1,160 @@ +from backend.core.utils import render_template, NOT_FOUND_CODE +from backend.settings import STATIC_FILES_PATH + +from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse +from backend.database.database import get_wrapper_for + +import csv + + +def return_static(query, *args): + if args[0] in ['css', 'js']: + try: + return TextFileResponse( + path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', + extension={'js': 'javascript', 'css': 'css'}.get(args[0]) + ) + + except (PermissionError, FileNotFoundError): + return ErrorResponse(404) + + elif args[0] == 'images': + try: + return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1]) + + except (PermissionError, FileNotFoundError): + return ErrorResponse(404) + + +def upload_file(query, *args): + base_html = '<a href="/">Return to main page</a><br>%s' + + try: + data = query['files'][0]['data'].decode('utf-8') + except (UnicodeDecodeError, KeyError, IndexError): + return ErrorResponse(500, base_html % '<h1>Error while reading file</h1>') + + wrapper = get_wrapper_for('mysql') + + permitted_headers = wrapper.get_column_names() + + try: + data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"')) + if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers): + return ErrorResponse(500, base_html % '<h1>File format error</h1>') + except IndexError: + return ErrorResponse(500, base_html % '<h1>File format error</h1>') + + wrapper.clear_table('table_task1') + + scheme = wrapper.schemes['table_task1'] + for row in data[1:]: + validation_results = scheme.validate(dict(zip(data[0], row))) + if not validation_results['error']: + wrapper.insert_one('table_task1', row, data[0]) + else: + return ErrorResponse(500) + + return HtmlResponse(base_html % '<h1>File uploaded</h1>') + + +def validate(query, *args): + wrapper = get_wrapper_for('mysql') + scheme = wrapper.schemes['table_task1'] + validation_results = scheme.validate(query) + return JsonResponse(validation_results) + + +def update_post(query, *args): + wrapper = get_wrapper_for('mysql') + service_id = query['service_id'] + permitted_fields = wrapper.get_column_names() + permitted_fields.remove('service_id') + + query_set = [] + for field_name, value in query.items(): + if field_name in permitted_fields: + query_set.append(f'{field_name}="{value}"') + + wrapper.update('table_task1', query_set, {'service_id': service_id}) + return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>') + + +def delete_post(query, *args): + get_wrapper_for('mysql').delete_from('table_task1', {'service_id': query['service_id']}) + return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>') + + +def add_post(query, *args): + wrapper = get_wrapper_for('mysql') + header_fields = wrapper.get_column_names() + headers = [] + values = [] + for field_name in query: + if field_name in header_fields: + headers.append(field_name) + values.append(f'"{query[field_name]}"') + + if len(headers) == len(header_fields): + wrapper.insert_one('table_task1', values, headers) + + return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>') + + +def db_get(query, *args): + wrapper = get_wrapper_for('mysql') + table_headers = wrapper.get_column_names() + + if query['type'] == 'full': + content = wrapper.get_data('table_task1') + + for row in content: + # creation_time + row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':')) + + # creation_request_sent_date + if row[8] is not None: + creation_date, creation_time = str(row[8]).split() + creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) + row[8] = creation_date + ' ' + creation_time + + json_content = [] + for row in content: + new_row = [] + for col in row: + if not isinstance(col, (float, bool, int, dict, list, tuple, str)): + col = str(col) + new_row.append(col) + json_content.append(new_row) + + return JsonResponse({'headers': table_headers, 'content': json_content}) + + elif query['type'] == 'single_id': + content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0] + + # creation_time + content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':')) + + # creation_request_sent_date + if content[8] is not None: + creation_date, creation_time = str(content[8]).split() + creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':')) + content[8] = creation_date + ' ' + creation_time + + if content is not None: + json_content = [] + for col in content: + if not isinstance(col, (float, bool, int, dict, list, tuple)): + col = str(col) + json_content.append(col) + + return JsonResponse(dict(zip(table_headers, json_content))) + else: + return JsonResponse({}) + + return ErrorResponse(NOT_FOUND_CODE) + + +def index_get(query, *args): + data = render_template('index.html') + return HtmlResponse(data) |