22
22
from .headers import Headers
23
23
24
24
25
- class FormData :
25
+ class _IFieldStorage :
26
+ """Interface with shared methods for QueryParams and FormData."""
27
+
28
+ _storage : Dict [str , List [Union [str , bytes ]]]
29
+
30
+ def _add_field_value (self , field_name : str , value : Union [str , bytes ]) -> None :
31
+ if field_name not in self ._storage :
32
+ self ._storage [field_name ] = [value ]
33
+ else :
34
+ self ._storage [field_name ].append (value )
35
+
36
+ def get (self , field_name : str , default : Any = None ) -> Union [str , bytes , None ]:
37
+ """Get the value of a field."""
38
+ return self ._storage .get (field_name , [default ])[0 ]
39
+
40
+ def get_list (self , field_name : str ) -> List [Union [str , bytes ]]:
41
+ """Get the list of values of a field."""
42
+ return self ._storage .get (field_name , [])
43
+
44
+ @property
45
+ def fields (self ):
46
+ """Returns a list of field names."""
47
+ return list (self ._storage .keys ())
48
+
49
+ def __getitem__ (self , field_name : str ):
50
+ return self .get (field_name )
51
+
52
+ def __iter__ (self ):
53
+ return iter (self ._storage )
54
+
55
+ def __len__ (self ):
56
+ return len (self ._storage )
57
+
58
+ def __contains__ (self , key : str ):
59
+ return key in self ._storage
60
+
61
+ def __repr__ (self ) -> str :
62
+ return f"{ self .__class__ .__name__ } ({ repr (self ._storage )} )"
63
+
64
+
65
+ class QueryParams (_IFieldStorage ):
66
+ """
67
+ Class for parsing and storing GET quer parameters requests.
68
+
69
+ Examples::
70
+
71
+ query_params = QueryParams(b"foo=bar&baz=qux&baz=quux")
72
+ # QueryParams({"foo": "bar", "baz": ["qux", "quux"]})
73
+
74
+ query_params.get("foo") # "bar"
75
+ query_params["foo"] # "bar"
76
+ query_params.get("non-existent-key") # None
77
+ query_params.get_list("baz") # ["qux", "quux"]
78
+ "unknown-key" in query_params # False
79
+ query_params.fields # ["foo", "baz"]
80
+ """
81
+
82
+ _storage : Dict [str , List [Union [str , bytes ]]]
83
+
84
+ def __init__ (self , query_string : str ) -> None :
85
+ self ._storage = {}
86
+
87
+ for query_param in query_string .split ("&" ):
88
+ if "=" in query_param :
89
+ key , value = query_param .split ("=" , 1 )
90
+ self ._add_field_value (key , value )
91
+ elif query_param :
92
+ self ._add_field_value (query_param , "" )
93
+
94
+
95
+ class FormData (_IFieldStorage ):
26
96
"""
27
97
Class for parsing and storing form data from POST requests.
28
98
@@ -31,18 +101,15 @@ class FormData:
31
101
32
102
Examples::
33
103
34
- form_data = FormData(b"foo=bar&baz=qux", "application/x-www-form-urlencoded")
35
-
104
+ form_data = FormData(b"foo=bar&baz=qux&baz=quuz", "application/x-www-form-urlencoded")
36
105
# or
37
-
38
- form_data = FormData(b"foo=bar\\ r\\ nbaz=qux", "text/plain")
39
-
106
+ form_data = FormData(b"foo=bar\\ r\\ nbaz=qux\\ r\\ nbaz=quux", "text/plain")
40
107
# FormData({"foo": "bar", "baz": "qux"})
41
108
42
109
form_data.get("foo") # "bar"
43
110
form_data["foo"] # "bar"
44
111
form_data.get("non-existent-key") # None
45
- form_data.get_list("baz") # ["qux"]
112
+ form_data.get_list("baz") # ["qux", "quux" ]
46
113
"unknown-key" in form_data # False
47
114
form_data.fields # ["foo", "baz"]
48
115
"""
@@ -63,12 +130,6 @@ def __init__(self, data: bytes, content_type: str) -> None:
63
130
elif content_type .startswith ("text/plain" ):
64
131
self ._parse_text_plain (data )
65
132
66
- def _add_field_value (self , field_name : str , value : Union [str , bytes ]) -> None :
67
- if field_name not in self ._storage :
68
- self ._storage [field_name ] = [value ]
69
- else :
70
- self ._storage [field_name ].append (value )
71
-
72
133
def _parse_x_www_form_urlencoded (self , data : bytes ) -> None :
73
134
decoded_data = data .decode ()
74
135
@@ -95,34 +156,6 @@ def _parse_text_plain(self, data: bytes) -> None:
95
156
96
157
self ._add_field_value (field_name .decode (), value .decode ())
97
158
98
- def get (self , field_name : str , default : Any = None ) -> Union [str , bytes , None ]:
99
- """Get the value of a field."""
100
- return self ._storage .get (field_name , [default ])[0 ]
101
-
102
- def get_list (self , field_name : str ) -> List [Union [str , bytes ]]:
103
- """Get the list of values of a field."""
104
- return self ._storage .get (field_name , [])
105
-
106
- @property
107
- def fields (self ):
108
- """Returns a list of field names."""
109
- return list (self ._storage .keys ())
110
-
111
- def __getitem__ (self , field_name : str ):
112
- return self .get (field_name )
113
-
114
- def __iter__ (self ):
115
- return iter (self ._storage )
116
-
117
- def __len__ (self ):
118
- return len (self ._storage )
119
-
120
- def __contains__ (self , key : str ):
121
- return key in self ._storage
122
-
123
- def __repr__ (self ) -> str :
124
- return f"FormData({ repr (self ._storage )} )"
125
-
126
159
127
160
class Request :
128
161
"""
@@ -156,15 +189,15 @@ class Request:
156
189
path : str
157
190
"""Path of the request, e.g. ``"/foo/bar"``."""
158
191
159
- query_params : Dict [ str , str ]
192
+ query_params : QueryParams
160
193
"""
161
194
Query/GET parameters in the request.
162
195
163
196
Example::
164
197
165
198
request = Request(raw_request=b"GET /?foo=bar HTTP/1.1...")
166
199
request.query_params
167
- # {"foo": "bar"}
200
+ # QueryParams( {"foo": "bar"})
168
201
"""
169
202
170
203
http_version : str
@@ -258,13 +291,7 @@ def _parse_start_line(header_bytes: bytes) -> Tuple[str, str, Dict[str, str], st
258
291
259
292
path , query_string = path .split ("?" , 1 )
260
293
261
- query_params = {}
262
- for query_param in query_string .split ("&" ):
263
- if "=" in query_param :
264
- key , value = query_param .split ("=" , 1 )
265
- query_params [key ] = value
266
- elif query_param :
267
- query_params [query_param ] = ""
294
+ query_params = QueryParams (query_string )
268
295
269
296
return method , path , query_params , http_version
270
297
0 commit comments