summaryrefslogtreecommitdiff
path: root/day9/task5_vue/backend/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'day9/task5_vue/backend/views.py')
-rw-r--r--day9/task5_vue/backend/views.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/day9/task5_vue/backend/views.py b/day9/task5_vue/backend/views.py
new file mode 100644
index 0000000..d6d9351
--- /dev/null
+++ b/day9/task5_vue/backend/views.py
@@ -0,0 +1,160 @@
+from backend.core.utils import render_template, NOT_FOUND_CODE
+from backend.settings import STATIC_FILES_PATH
+
+from backend.core.response_types import ErrorResponse, TextFileResponse, ImageResponse, HtmlResponse, JsonResponse
+from backend.database.database import get_wrapper_for
+
+import csv
+
+
+def return_static(query, *args):
+ if args[0] in ['css', 'js']:
+ try:
+ return TextFileResponse(
+ path=STATIC_FILES_PATH + f'/{args[0]}/{args[1]}',
+ extension={'js': 'javascript', 'css': 'css'}.get(args[0])
+ )
+
+ except (PermissionError, FileNotFoundError):
+ return ErrorResponse(404)
+
+ elif args[0] == 'images':
+ try:
+ return ImageResponse(STATIC_FILES_PATH + f'/{args[0]}/{args[1]}', args[1].split('.')[-1])
+
+ except (PermissionError, FileNotFoundError):
+ return ErrorResponse(404)
+
+
+def upload_file(query, *args):
+ base_html = '<a href="/">Return to main page</a><br>%s'
+
+ try:
+ data = query['files'][0]['data'].decode('utf-8')
+ except (UnicodeDecodeError, KeyError, IndexError):
+ return ErrorResponse(500, base_html % '<h1>Error while reading file</h1>')
+
+ wrapper = get_wrapper_for('mysql')
+
+ permitted_headers = wrapper.get_column_names()
+
+ try:
+ data = list(csv.reader(data.strip().splitlines(), delimiter=';', quotechar='"'))
+ if len(data[0]) != len(permitted_headers) or set(data[0]) != set(permitted_headers):
+ return ErrorResponse(500, base_html % '<h1>File format error</h1>')
+ except IndexError:
+ return ErrorResponse(500, base_html % '<h1>File format error</h1>')
+
+ wrapper.clear_table('table_task1')
+
+ scheme = wrapper.schemes['table_task1']
+ for row in data[1:]:
+ validation_results = scheme.validate(dict(zip(data[0], row)))
+ if not validation_results['error']:
+ wrapper.insert_one('table_task1', row, data[0])
+ else:
+ return ErrorResponse(500)
+
+ return HtmlResponse(base_html % '<h1>File uploaded</h1>')
+
+
+def validate(query, *args):
+ wrapper = get_wrapper_for('mysql')
+ scheme = wrapper.schemes['table_task1']
+ validation_results = scheme.validate(query)
+ return JsonResponse(validation_results)
+
+
+def update_post(query, *args):
+ wrapper = get_wrapper_for('mysql')
+ service_id = query['service_id']
+ permitted_fields = wrapper.get_column_names()
+ permitted_fields.remove('service_id')
+
+ query_set = []
+ for field_name, value in query.items():
+ if field_name in permitted_fields:
+ query_set.append(f'{field_name}="{value}"')
+
+ wrapper.update('table_task1', query_set, {'service_id': service_id})
+ return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>')
+
+
+def delete_post(query, *args):
+ get_wrapper_for('mysql').delete_from('table_task1', {'service_id': query['service_id']})
+ return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>')
+
+
+def add_post(query, *args):
+ wrapper = get_wrapper_for('mysql')
+ header_fields = wrapper.get_column_names()
+ headers = []
+ values = []
+ for field_name in query:
+ if field_name in header_fields:
+ headers.append(field_name)
+ values.append(f'"{query[field_name]}"')
+
+ if len(headers) == len(header_fields):
+ wrapper.insert_one('table_task1', values, headers)
+
+ return HtmlResponse('<a href="/">Return to main page</a><br><h1>Database Updated</h1>')
+
+
+def db_get(query, *args):
+ wrapper = get_wrapper_for('mysql')
+ table_headers = wrapper.get_column_names()
+
+ if query['type'] == 'full':
+ content = wrapper.get_data('table_task1')
+
+ for row in content:
+ # creation_time
+ row[7] = ':'.join(_.rjust(2, '0') for _ in str(row[7]).split(':'))
+
+ # creation_request_sent_date
+ if row[8] is not None:
+ creation_date, creation_time = str(row[8]).split()
+ creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
+ row[8] = creation_date + ' ' + creation_time
+
+ json_content = []
+ for row in content:
+ new_row = []
+ for col in row:
+ if not isinstance(col, (float, bool, int, dict, list, tuple, str)):
+ col = str(col)
+ new_row.append(col)
+ json_content.append(new_row)
+
+ return JsonResponse({'headers': table_headers, 'content': json_content})
+
+ elif query['type'] == 'single_id':
+ content = wrapper.get_rows('table_task1', {'service_id': query['service_id']})[0]
+
+ # creation_time
+ content[7] = ':'.join(_.rjust(2, '0') for _ in str(content[7]).split(':'))
+
+ # creation_request_sent_date
+ if content[8] is not None:
+ creation_date, creation_time = str(content[8]).split()
+ creation_time = ':'.join(_.rjust(2, '0') for _ in creation_time.split(':'))
+ content[8] = creation_date + ' ' + creation_time
+
+ if content is not None:
+ json_content = []
+ for col in content:
+ if not isinstance(col, (float, bool, int, dict, list, tuple)):
+ col = str(col)
+ json_content.append(col)
+
+ return JsonResponse(dict(zip(table_headers, json_content)))
+ else:
+ return JsonResponse({})
+
+ return ErrorResponse(NOT_FOUND_CODE)
+
+
+def index_get(query, *args):
+ data = render_template('index.html')
+ return HtmlResponse(data)