1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import re
from time import strftime, gmtime
_URI_RESERVED = {
'21': '!', '23': '#', '24': '$', '26': '&',
'27': '\'', '28': '(', '29': ')', '2A': '*',
'2B': '+', '2C': ',', '2F': '/', '3A': ':',
'3B': ';', '3D': '=', '3F': '?', '40': '@',
'5B': '[', '5D': ']'
}
BAD_REQUEST = 'HTTP/1.1 400 Bad Request'
NOT_FOUND = 'HTTP/1.1 404 Not Found'
SUCCESS = 'HTTP/1.1 200 OK'
METHOD_NOT_ALLOWED = 'HTTP/1.1 405 Method Not Allowed'
URL_REGEX_PATTERN = re.compile(r'/((.*?/?)+)?')
HTTP_METHODS = ['GET', 'POST', 'OPTIONS', 'HEAD', 'PUT', 'PATCH', 'DELETE', 'TRACE', 'CONNECT']
def add_headers(status, html: str):
return '\r\n'.join([
status,
f'Date: {strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime())}',
'Server: BrandNewServer',
'Content-Type: text/html; charset=utf-8',
'Connection: keep-alive',
'', html
])
def validate_url(url):
return bool(URL_REGEX_PATTERN.fullmatch(url))
def validate_first_line(line):
methods_groups = f"({'|'.join(HTTP_METHODS)})"
first_line_pattern = re.compile(rf'{methods_groups} {URL_REGEX_PATTERN.pattern} HTTP/1\.[01]')
return bool(first_line_pattern.fullmatch(line))
def parse_cookies(cookies_line: str):
cookies_line = cookies_line.strip().strip(';')
pairs = cookies_line.split(';')
d = {}
for pair in pairs:
try:
key, value = pair.split('=', maxsplit=1)
d[key] = value
except ValueError:
raise ValueError('Wrong format of cookies')
return d
def parse_headers(request_line: str):
request = request_line.split('\r\n')
first_line = request.pop(0)
if validate_first_line(first_line):
method, url, http_ver = first_line.split()
else:
# Я не знаю зачем в условии необходимо завершать работу сервера, если пришел не-HTTP запрос,
# поэтому оставлю эту строку здесь.
# raise ValueError('Wrong format of HTTP request')
raise ConnectionError('Wrong format of HTTP request')
headers = {}
for line in request:
try:
field, value = line.split(': ', maxsplit=1)
except ValueError:
# raise ValueError('Wrong format of HTTP header')
raise ConnectionError('Wrong format of HTTP request')
headers[field] = value
return method, url, http_ver, headers
def parse_query(query_line: str):
pairs = query_line.strip().split('&')
d = {}
for pair in pairs:
try:
key, value = pair.split('=', maxsplit=1)
d[url_decoder(key)] = url_decoder(value)
except ValueError:
# raise ValueError('Wrong format of query')
raise ConnectionError('Wrong format of query')
return d
def url_decoder(url_line: str):
url_line = url_line.replace('+', ' ')
encoded = b''
i = 0
while i < len(url_line):
if url_line[i] == '%':
hex_value = url_line[i + 1: i + 3]
if hex_value in _URI_RESERVED:
integer = ord(_URI_RESERVED[hex_value])
else:
integer = int(hex_value, 16)
encoded += bytes([integer])
i += 3
continue
else:
encoded += bytes([ord(url_line[i])])
i += 1
return encoded.decode()
|