2
2
#
3
3
# This program is release under the MIT license. You can find the full text of
4
4
# the license in the LICENSE file.
5
-
6
5
import enum
7
6
import itertools
8
7
import json
11
10
12
11
from werkzeug .datastructures import Headers
13
12
from werkzeug .serving import make_server
14
- from werkzeug .wrappers import Response , Request
13
+ from werkzeug .wrappers import Request
14
+ from werkzeug .wrappers import Response
15
15
16
16
17
17
class WSGIServer (threading .Thread ):
@@ -20,14 +20,12 @@ class WSGIServer(threading.Thread):
20
20
HTTP server running a WSGI application in its own thread.
21
21
"""
22
22
23
- def __init__ (self , host = ' 127.0.0.1' , port = 0 , application = None , ** kwargs ):
23
+ def __init__ (self , host = " 127.0.0.1" , port = 0 , application = None , ** kwargs ):
24
24
self .app = application
25
25
self ._server = make_server (host , port , self .app , ** kwargs )
26
26
self .server_address = self ._server .server_address
27
27
28
- super ().__init__ (
29
- name = self .__class__ ,
30
- target = self ._server .serve_forever )
28
+ super ().__init__ (name = self .__class__ , target = self ._server .serve_forever )
31
29
32
30
def __del__ (self ):
33
31
self .stop ()
@@ -38,8 +36,8 @@ def stop(self):
38
36
@property
39
37
def url (self ):
40
38
host , port = self .server_address
41
- proto = ' http' if self ._server .ssl_context is None else ' https'
42
- return ' %s://%s:%i' % (proto , host , port )
39
+ proto = " http" if self ._server .ssl_context is None else " https"
40
+ return " %s://%s:%i" % (proto , host , port )
43
41
44
42
45
43
class Chunked (enum .Enum ):
@@ -54,7 +52,7 @@ def __bool__(self):
54
52
def _encode_chunk (chunk , charset ):
55
53
if isinstance (chunk , str ):
56
54
chunk = chunk .encode (charset )
57
- return ' {:x}' .format (len (chunk )).encode (charset ) + b' \r \n ' + chunk + b' \r \n '
55
+ return " {:x}" .format (len (chunk )).encode (charset ) + b" \r \n " + chunk + b" \r \n "
58
56
59
57
60
58
class ContentServer (WSGIServer ):
@@ -78,9 +76,9 @@ class ContentServer(WSGIServer):
78
76
79
77
"""
80
78
81
- def __init__ (self , host = ' 127.0.0.1' , port = 0 , ssl_context = None ):
79
+ def __init__ (self , host = " 127.0.0.1" , port = 0 , ssl_context = None ):
82
80
super ().__init__ (host , port , self , ssl_context = ssl_context )
83
- self .content , self .code = ('' , 204 ) # HTTP 204: No Content
81
+ self .content , self .code = ("" , 204 ) # HTTP 204: No Content
84
82
self .headers = {}
85
83
self .show_post_vars = False
86
84
self .compress = None
@@ -93,25 +91,27 @@ def __call__(self, environ, start_response):
93
91
"""
94
92
request = Request (environ )
95
93
self .requests .append (request )
96
- if (request .content_type == 'application/x-www-form-urlencoded'
97
- and request .method == 'POST' and self .show_post_vars ):
94
+ if (
95
+ request .content_type == "application/x-www-form-urlencoded"
96
+ and request .method == "POST"
97
+ and self .show_post_vars
98
+ ):
98
99
content = json .dumps (request .form )
99
100
else :
100
101
content = self .content
101
102
102
- if (
103
- self .chunked == Chunked .YES
104
- or (self .chunked == Chunked .AUTO and 'chunked' in self .headers .get ('Transfer-encoding' , '' ))
103
+ if self .chunked == Chunked .YES or (
104
+ self .chunked == Chunked .AUTO and "chunked" in self .headers .get ("Transfer-encoding" , "" )
105
105
):
106
106
# If the code below ever changes to allow setting the charset of
107
107
# the Response object, the charset used here should also be changed
108
108
# to match. But until that happens, use UTF-8 since it is Werkzeug's
109
109
# default.
110
- charset = ' utf-8'
110
+ charset = " utf-8"
111
111
if isinstance (content , (str , bytes )):
112
- content = (_encode_chunk (content , charset ), ' 0\r \n \r \n ' )
112
+ content = (_encode_chunk (content , charset ), " 0\r \n \r \n " )
113
113
else :
114
- content = itertools .chain ((_encode_chunk (item , charset ) for item in content ), [' 0\r \n \r \n ' ])
114
+ content = itertools .chain ((_encode_chunk (item , charset ) for item in content ), [" 0\r \n \r \n " ])
115
115
116
116
response = Response (response = content , status = self .code )
117
117
response .headers .clear ()
@@ -152,28 +152,27 @@ def serve_content(self, content, code=200, headers=None, chunked=Chunked.NO):
152
152
self .headers = Headers (headers )
153
153
154
154
155
- if __name__ == ' __main__' : # pragma: no cover
155
+ if __name__ == " __main__" : # pragma: no cover
156
156
import os .path
157
157
import time
158
158
159
159
app = ContentServer ()
160
160
server = WSGIServer (application = app )
161
161
server .start ()
162
162
163
- print (' HTTP server is running at %s' % server .url )
164
- print (' Type <Ctrl-C> to stop' )
163
+ print (" HTTP server is running at %s" % server .url )
164
+ print (" Type <Ctrl-C> to stop" )
165
165
166
166
try :
167
167
path = sys .argv [1 ]
168
168
except IndexError :
169
- path = os .path .join (
170
- os .path .dirname (os .path .abspath (__file__ )), '..' , 'README.rst' )
169
+ path = os .path .join (os .path .dirname (os .path .abspath (__file__ )), ".." , "README.rst" )
171
170
172
171
app .serve_content (open (path ).read (), 302 )
173
172
174
173
try :
175
174
while True :
176
175
time .sleep (1 )
177
176
except KeyboardInterrupt :
178
- print (' \r stopping...' )
177
+ print (" \r stopping..." )
179
178
server .stop ()
0 commit comments