Skip to content

Commit ae25e86

Browse files
Add .wait_ready to parser for clean server disconnects (#3690)
1 parent 2038049 commit ae25e86

File tree

4 files changed

+51
-6
lines changed

4 files changed

+51
-6
lines changed

src/ahttpx/_parsers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,13 @@ async def send_body(self, body: bytes) -> None:
224224
# Handle body close
225225
self.send_state = State.DONE
226226

227+
async def wait_ready(self) -> bool:
228+
"""
229+
Wait until read data starts arriving, and return `True`.
230+
Return `False` if the stream closes.
231+
"""
232+
return await self.parser.wait_ready()
233+
227234
async def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
228235
"""
229236
Receive the initial request method line:
@@ -453,6 +460,15 @@ def _push_back(self, buffer):
453460
assert self._buffer == b''
454461
self._buffer = buffer
455462

463+
async def wait_ready(self) -> bool:
464+
"""
465+
Attempt a read, and return True if read succeeds or False if the
466+
stream is closed. The data remains in the read buffer.
467+
"""
468+
data = await self._read_some()
469+
self._push_back(data)
470+
return data != b''
471+
456472
async def read(self, size: int) -> bytes:
457473
"""
458474
Read and return up to 'size' bytes from the stream, with I/O buffering provided.

src/ahttpx/_server.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ def __init__(self, stream, endpoint):
3232
async def handle_requests(self):
3333
try:
3434
while not self._parser.is_closed():
35+
if not await self._parser.wait_ready():
36+
# Wait until we have read data, or return
37+
# if the stream closes.
38+
return
39+
# Read the initial part of the request,
40+
# and setup a stream for reading the body.
3541
method, url, headers = await self._recv_head()
3642
stream = HTTPStream(self._recv_body, self._reset)
37-
# TODO: Handle endpoint exceptions
3843
async with Request(method, url, headers=headers, content=stream) as request:
3944
try:
4045
response = await self._endpoint(request)
@@ -50,7 +55,10 @@ async def handle_requests(self):
5055
await self._send_head(response)
5156
await self._send_body(response)
5257
if self._parser.is_keepalive():
58+
# If the client hasn't read the request body to
59+
# completion, then do that here.
5360
await stream.read()
61+
# Either revert to idle, or close the connection.
5462
await self._reset()
5563
except Exception:
5664
logger.error("Internal Server Error", exc_info=True)

src/httpx/_parsers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,13 @@ def send_body(self, body: bytes) -> None:
224224
# Handle body close
225225
self.send_state = State.DONE
226226

227+
def wait_ready(self) -> bool:
228+
"""
229+
Wait until read data starts arriving, and return `True`.
230+
Return `False` if the stream closes.
231+
"""
232+
return self.parser.wait_ready()
233+
227234
def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
228235
"""
229236
Receive the initial request method line:
@@ -453,6 +460,15 @@ def _push_back(self, buffer):
453460
assert self._buffer == b''
454461
self._buffer = buffer
455462

463+
def wait_ready(self) -> bool:
464+
"""
465+
Attempt a read, and return True if read succeeds or False if the
466+
stream is closed. The data remains in the read buffer.
467+
"""
468+
data = self._read_some()
469+
self._push_back(data)
470+
return data != b''
471+
456472
def read(self, size: int) -> bytes:
457473
"""
458474
Read and return up to 'size' bytes from the stream, with I/O buffering provided.

src/httpx/_server.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ def __init__(self, stream, endpoint):
3232
def handle_requests(self):
3333
try:
3434
while not self._parser.is_closed():
35+
if not self._parser.wait_ready():
36+
# Wait until we have read data, or return
37+
# if the stream closes.
38+
return
39+
# Read the initial part of the request,
40+
# and setup a stream for reading the body.
3541
method, url, headers = self._recv_head()
3642
stream = HTTPStream(self._recv_body, self._reset)
37-
# TODO: Handle endpoint exceptions
3843
with Request(method, url, headers=headers, content=stream) as request:
3944
try:
4045
response = self._endpoint(request)
@@ -50,7 +55,10 @@ def handle_requests(self):
5055
self._send_head(response)
5156
self._send_body(response)
5257
if self._parser.is_keepalive():
58+
# If the client hasn't read the request body to
59+
# completion, then do that here.
5360
stream.read()
61+
# Either revert to idle, or close the connection.
5462
self._reset()
5563
except Exception:
5664
logger.error("Internal Server Error", exc_info=True)
@@ -102,10 +110,7 @@ def __init__(self, host, port):
102110

103111
def wait(self):
104112
while(True):
105-
try:
106-
sleep(1)
107-
except KeyboardInterrupt:
108-
break
113+
sleep(1)
109114

110115

111116
@contextlib.contextmanager

0 commit comments

Comments
 (0)