Skip to content

Commit 68989ae

Browse files
Server connection handling. (#3672)
1 parent 4acf5c2 commit 68989ae

File tree

8 files changed

+62
-27
lines changed

8 files changed

+62
-27
lines changed

src/ahttpx/_parsers.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,13 @@ async def recv_body(self) -> bytes:
375375
self.recv_state = State.DONE
376376
return body
377377

378-
async def complete(self):
378+
async def reset(self) -> bool:
379379
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
380380
is_keepalive = self.send_keep_alive and self.recv_keep_alive
381381

382382
if not (is_fully_complete and is_keepalive):
383383
await self.close()
384-
return
384+
return False
385385

386386
if self.mode == Mode.CLIENT:
387387
self.send_state = State.SEND_METHOD_LINE
@@ -397,13 +397,21 @@ async def complete(self):
397397
self.send_keep_alive = True
398398
self.recv_keep_alive = True
399399
self.processing_1xx = False
400+
return True
400401

401402
async def close(self):
402403
if self.send_state != State.CLOSED:
403404
self.send_state = State.CLOSED
404405
self.recv_state = State.CLOSED
405406
await self.stream.close()
406407

408+
def is_keepalive(self) -> bool:
409+
return (
410+
self.send_keep_alive and
411+
self.recv_keep_alive and
412+
self.send_state != State.CLOSED
413+
)
414+
407415
def is_idle(self) -> bool:
408416
return (
409417
self.send_state == State.SEND_METHOD_LINE or

src/ahttpx/_pool.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ async def send(self, request: Request) -> Response:
170170
await self._send_head(request)
171171
await self._send_body(request)
172172
code, headers = await self._recv_head()
173-
stream = HTTPStream(self._recv_body, self._complete)
173+
stream = HTTPStream(self._recv_body, self._reset)
174174
# TODO...
175175
return Response(code, headers=headers, content=stream)
176176
# finally:
@@ -235,9 +235,9 @@ async def _recv_head(self) -> tuple[int, Headers]:
235235
async def _recv_body(self) -> bytes:
236236
return await self._parser.recv_body()
237237

238-
# Request/response cycle complete...
239-
async def _complete(self) -> None:
240-
await self._parser.complete()
238+
# Request/response cycle reset...
239+
async def _reset(self) -> None:
240+
await self._parser.reset()
241241
self._idle_expiry = time.monotonic() + self._keepalive_duration
242242

243243
async def _close(self) -> None:

src/ahttpx/_server.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def handle_requests(self):
3333
try:
3434
while not self._parser.is_closed():
3535
method, url, headers = await self._recv_head()
36-
stream = HTTPStream(self._recv_body, self._complete)
36+
stream = HTTPStream(self._recv_body, self._reset)
3737
# TODO: Handle endpoint exceptions
3838
async with Request(method, url, headers=headers, content=stream) as request:
3939
try:
@@ -43,12 +43,15 @@ async def handle_requests(self):
4343
except Exception:
4444
logger.error("Internal Server Error", exc_info=True)
4545
content = Text("Internal Server Error")
46-
err = Response(code=500, content=content)
46+
err = Response(500, content=content)
4747
await self._send_head(err)
4848
await self._send_body(err)
4949
else:
5050
await self._send_head(response)
5151
await self._send_body(response)
52+
if self._parser.is_keepalive():
53+
await stream.read()
54+
await self._reset()
5255
except Exception:
5356
logger.error("Internal Server Error", exc_info=True)
5457

@@ -88,8 +91,8 @@ async def _send_body(self, response: Response):
8891
await self._parser.send_body(b'')
8992

9093
# Start it all over again...
91-
async def _complete(self):
92-
await self._parser.complete
94+
async def _reset(self):
95+
await self._parser.reset()
9396
self._idle_expiry = time.monotonic() + self._keepalive_duration
9497

9598

src/httpx/_network.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def close(self) -> None:
8383
self._is_closed = True
8484
self._socket.close()
8585

86+
def is_closed(self) -> bool:
87+
return self._is_closed
88+
8689
def __repr__(self):
8790
description = ""
8891
description += " TLS" if self._is_tls else ""
@@ -160,7 +163,7 @@ def __init__(self, listener: NetworkListener, handler: typing.Callable[[NetworkS
160163
self._max_workers = 5
161164
self._executor = None
162165
self._thread = None
163-
self._streams = list[NetworkStream]
166+
self._streams: list[NetworkStream] = []
164167

165168
@property
166169
def host(self):
@@ -177,11 +180,18 @@ def __enter__(self):
177180

178181
def __exit__(self, exc_type, exc_val, exc_tb):
179182
self.listener.close()
183+
for stream in self._streams:
184+
stream.close()
180185
self._executor.shutdown(wait=True)
181186

182187
def _serve(self):
183188
while stream := self.listener.accept():
184189
self._executor.submit(self._handler, stream)
190+
self._streams = [
191+
stream for stream in self._streams
192+
if not stream.is_closed()
193+
]
194+
self._streams.append(stream)
185195

186196
def _handler(self, stream):
187197
try:

src/httpx/_parsers.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,13 @@ def recv_body(self) -> bytes:
375375
self.recv_state = State.DONE
376376
return body
377377

378-
def complete(self):
378+
def reset(self) -> bool:
379379
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
380380
is_keepalive = self.send_keep_alive and self.recv_keep_alive
381381

382382
if not (is_fully_complete and is_keepalive):
383383
self.close()
384-
return
384+
return False
385385

386386
if self.mode == Mode.CLIENT:
387387
self.send_state = State.SEND_METHOD_LINE
@@ -397,13 +397,21 @@ def complete(self):
397397
self.send_keep_alive = True
398398
self.recv_keep_alive = True
399399
self.processing_1xx = False
400+
return True
400401

401402
def close(self):
402403
if self.send_state != State.CLOSED:
403404
self.send_state = State.CLOSED
404405
self.recv_state = State.CLOSED
405406
self.stream.close()
406407

408+
def is_keepalive(self) -> bool:
409+
return (
410+
self.send_keep_alive and
411+
self.recv_keep_alive and
412+
self.send_state != State.CLOSED
413+
)
414+
407415
def is_idle(self) -> bool:
408416
return (
409417
self.send_state == State.SEND_METHOD_LINE or

src/httpx/_pool.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def send(self, request: Request) -> Response:
170170
self._send_head(request)
171171
self._send_body(request)
172172
code, headers = self._recv_head()
173-
stream = HTTPStream(self._recv_body, self._complete)
173+
stream = HTTPStream(self._recv_body, self._reset)
174174
# TODO...
175175
return Response(code, headers=headers, content=stream)
176176
# finally:
@@ -235,9 +235,9 @@ def _recv_head(self) -> tuple[int, Headers]:
235235
def _recv_body(self) -> bytes:
236236
return self._parser.recv_body()
237237

238-
# Request/response cycle complete...
239-
def _complete(self) -> None:
240-
self._parser.complete()
238+
# Request/response cycle reset...
239+
def _reset(self) -> None:
240+
self._parser.reset()
241241
self._idle_expiry = time.monotonic() + self._keepalive_duration
242242

243243
def _close(self) -> None:

src/httpx/_server.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def handle_requests(self):
3333
try:
3434
while not self._parser.is_closed():
3535
method, url, headers = self._recv_head()
36-
stream = HTTPStream(self._recv_body, self._complete)
36+
stream = HTTPStream(self._recv_body, self._reset)
3737
# TODO: Handle endpoint exceptions
3838
with Request(method, url, headers=headers, content=stream) as request:
3939
try:
@@ -43,12 +43,15 @@ def handle_requests(self):
4343
except Exception:
4444
logger.error("Internal Server Error", exc_info=True)
4545
content = Text("Internal Server Error")
46-
err = Response(code=500, content=content)
46+
err = Response(500, content=content)
4747
self._send_head(err)
4848
self._send_body(err)
4949
else:
5050
self._send_head(response)
5151
self._send_body(response)
52+
if self._parser.is_keepalive():
53+
stream.read()
54+
self._reset()
5255
except Exception:
5356
logger.error("Internal Server Error", exc_info=True)
5457

@@ -88,8 +91,8 @@ def _send_body(self, response: Response):
8891
self._parser.send_body(b'')
8992

9093
# Start it all over again...
91-
def _complete(self):
92-
self._parser.complete
94+
def _reset(self):
95+
self._parser.reset()
9396
self._idle_expiry = time.monotonic() + self._keepalive_duration
9497

9598

@@ -99,7 +102,10 @@ def __init__(self, host, port):
99102

100103
def wait(self):
101104
while(True):
102-
sleep(1)
105+
try:
106+
sleep(1)
107+
except KeyboardInterrupt:
108+
break
103109

104110

105111
@contextlib.contextmanager

tests/test_parsers.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def test_parser():
6767
assert terminator == b''
6868

6969
assert not p.is_idle()
70-
p.complete()
70+
p.reset()
7171
assert p.is_idle()
7272

7373

@@ -113,7 +113,7 @@ def test_parser_server():
113113
)
114114

115115
assert not p.is_idle()
116-
p.complete()
116+
p.reset()
117117
assert p.is_idle()
118118

119119

@@ -315,7 +315,7 @@ def test_parser_repr():
315315
p.recv_body()
316316
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
317317

318-
p.complete()
318+
p.reset()
319319
assert repr(p) == "<HTTPParser [client SEND_METHOD_LINE, server WAIT]>"
320320

321321

@@ -554,7 +554,7 @@ def test_client_connection_close():
554554

555555
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
556556

557-
p.complete()
557+
p.reset()
558558
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"
559559
assert p.is_closed()
560560

@@ -591,7 +591,7 @@ def test_server_connection_close():
591591
assert terminator == b""
592592

593593
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
594-
p.complete()
594+
p.reset()
595595
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"
596596

597597

0 commit comments

Comments
 (0)