8
8
"""
9
9
10
10
try :
11
- from typing import Dict , Tuple , Union , TYPE_CHECKING
11
+ from typing import List , Dict , Tuple , Union , Any , TYPE_CHECKING
12
12
from socket import socket
13
13
from socketpool import SocketPool
14
14
22
22
from .headers import Headers
23
23
24
24
25
+ class _IFieldStorage :
26
+ """Interface with shared methods for QueryParams and FormData."""
27
+
28
+ _storage : Dict [str , List [Union [str , bytes ]]]
29
+
30
+ def _add_field_value (self , field_name : str , value : Union [str , bytes ]) -> None :
31
+ if field_name not in self ._storage :
32
+ self ._storage [field_name ] = [value ]
33
+ else :
34
+ self ._storage [field_name ].append (value )
35
+
36
+ def get (self , field_name : str , default : Any = None ) -> Union [str , bytes , None ]:
37
+ """Get the value of a field."""
38
+ return self ._storage .get (field_name , [default ])[0 ]
39
+
40
+ def get_list (self , field_name : str ) -> List [Union [str , bytes ]]:
41
+ """Get the list of values of a field."""
42
+ return self ._storage .get (field_name , [])
43
+
44
+ @property
45
+ def fields (self ):
46
+ """Returns a list of field names."""
47
+ return list (self ._storage .keys ())
48
+
49
+ def __getitem__ (self , field_name : str ):
50
+ return self .get (field_name )
51
+
52
+ def __iter__ (self ):
53
+ return iter (self ._storage )
54
+
55
+ def __len__ (self ):
56
+ return len (self ._storage )
57
+
58
+ def __contains__ (self , key : str ):
59
+ return key in self ._storage
60
+
61
+ def __repr__ (self ) -> str :
62
+ return f"{ self .__class__ .__name__ } ({ repr (self ._storage )} )"
63
+
64
+
65
+ class QueryParams (_IFieldStorage ):
66
+ """
67
+ Class for parsing and storing GET quer parameters requests.
68
+
69
+ Examples::
70
+
71
+ query_params = QueryParams(b"foo=bar&baz=qux&baz=quux")
72
+ # QueryParams({"foo": "bar", "baz": ["qux", "quux"]})
73
+
74
+ query_params.get("foo") # "bar"
75
+ query_params["foo"] # "bar"
76
+ query_params.get("non-existent-key") # None
77
+ query_params.get_list("baz") # ["qux", "quux"]
78
+ "unknown-key" in query_params # False
79
+ query_params.fields # ["foo", "baz"]
80
+ """
81
+
82
+ _storage : Dict [str , List [Union [str , bytes ]]]
83
+
84
+ def __init__ (self , query_string : str ) -> None :
85
+ self ._storage = {}
86
+
87
+ for query_param in query_string .split ("&" ):
88
+ if "=" in query_param :
89
+ key , value = query_param .split ("=" , 1 )
90
+ self ._add_field_value (key , value )
91
+ elif query_param :
92
+ self ._add_field_value (query_param , "" )
93
+
94
+
95
+ class FormData (_IFieldStorage ):
96
+ """
97
+ Class for parsing and storing form data from POST requests.
98
+
99
+ Supports ``application/x-www-form-urlencoded``, ``multipart/form-data`` and ``text/plain``
100
+ content types.
101
+
102
+ Examples::
103
+
104
+ form_data = FormData(b"foo=bar&baz=qux&baz=quuz", "application/x-www-form-urlencoded")
105
+ # or
106
+ form_data = FormData(b"foo=bar\\ r\\ nbaz=qux\\ r\\ nbaz=quux", "text/plain")
107
+ # FormData({"foo": "bar", "baz": "qux"})
108
+
109
+ form_data.get("foo") # "bar"
110
+ form_data["foo"] # "bar"
111
+ form_data.get("non-existent-key") # None
112
+ form_data.get_list("baz") # ["qux", "quux"]
113
+ "unknown-key" in form_data # False
114
+ form_data.fields # ["foo", "baz"]
115
+ """
116
+
117
+ _storage : Dict [str , List [Union [str , bytes ]]]
118
+
119
+ def __init__ (self , data : bytes , content_type : str ) -> None :
120
+ self .content_type = content_type
121
+ self ._storage = {}
122
+
123
+ if content_type .startswith ("application/x-www-form-urlencoded" ):
124
+ self ._parse_x_www_form_urlencoded (data )
125
+
126
+ elif content_type .startswith ("multipart/form-data" ):
127
+ boundary = content_type .split ("boundary=" )[1 ]
128
+ self ._parse_multipart_form_data (data , boundary )
129
+
130
+ elif content_type .startswith ("text/plain" ):
131
+ self ._parse_text_plain (data )
132
+
133
+ def _parse_x_www_form_urlencoded (self , data : bytes ) -> None :
134
+ decoded_data = data .decode ()
135
+
136
+ for field_name , value in [
137
+ key_value .split ("=" , 1 ) for key_value in decoded_data .split ("&" )
138
+ ]:
139
+ self ._add_field_value (field_name , value )
140
+
141
+ def _parse_multipart_form_data (self , data : bytes , boundary : str ) -> None :
142
+ blocks = data .split (b"--" + boundary .encode ())[1 :- 1 ]
143
+
144
+ for block in blocks :
145
+ disposition , content = block .split (b"\r \n \r \n " , 1 )
146
+ field_name = disposition .split (b'"' , 2 )[1 ].decode ()
147
+ value = content [:- 2 ]
148
+
149
+ self ._add_field_value (field_name , value )
150
+
151
+ def _parse_text_plain (self , data : bytes ) -> None :
152
+ lines = data .split (b"\r \n " )[:- 1 ]
153
+
154
+ for line in lines :
155
+ field_name , value = line .split (b"=" , 1 )
156
+
157
+ self ._add_field_value (field_name .decode (), value .decode ())
158
+
159
+
25
160
class Request :
26
161
"""
27
162
Incoming request, constructed from raw incoming bytes.
@@ -54,15 +189,15 @@ class Request:
54
189
path : str
55
190
"""Path of the request, e.g. ``"/foo/bar"``."""
56
191
57
- query_params : Dict [ str , str ]
192
+ query_params : QueryParams
58
193
"""
59
194
Query/GET parameters in the request.
60
195
61
196
Example::
62
197
63
198
request = Request(raw_request=b"GET /?foo=bar HTTP/1.1...")
64
199
request.query_params
65
- # {"foo": "bar"}
200
+ # QueryParams( {"foo": "bar"})
66
201
"""
67
202
68
203
http_version : str
@@ -91,6 +226,7 @@ def __init__(
91
226
self .connection = connection
92
227
self .client_address = client_address
93
228
self .raw_request = raw_request
229
+ self ._form_data = None
94
230
95
231
if raw_request is None :
96
232
raise ValueError ("raw_request cannot be None" )
@@ -117,6 +253,13 @@ def body(self) -> bytes:
117
253
def body (self , body : bytes ) -> None :
118
254
self .raw_request = self ._raw_header_bytes + b"\r \n \r \n " + body
119
255
256
+ @property
257
+ def form_data (self ) -> Union [FormData , None ]:
258
+ """POST data of the request"""
259
+ if self ._form_data is None and self .method == "POST" :
260
+ self ._form_data = FormData (self .body , self .headers ["Content-Type" ])
261
+ return self ._form_data
262
+
120
263
def json (self ) -> Union [dict , None ]:
121
264
"""Body of the request, as a JSON-decoded dictionary."""
122
265
return json .loads (self .body ) if self .body else None
@@ -148,13 +291,7 @@ def _parse_start_line(header_bytes: bytes) -> Tuple[str, str, Dict[str, str], st
148
291
149
292
path , query_string = path .split ("?" , 1 )
150
293
151
- query_params = {}
152
- for query_param in query_string .split ("&" ):
153
- if "=" in query_param :
154
- key , value = query_param .split ("=" , 1 )
155
- query_params [key ] = value
156
- elif query_param :
157
- query_params [query_param ] = ""
294
+ query_params = QueryParams (query_string )
158
295
159
296
return method , path , query_params , http_version
160
297
0 commit comments