Skip to content

Commit 2010fad

Browse files
committed
[WIP] server: having issues with POST partial reads
1 parent e1f4ddd commit 2010fad

File tree

3 files changed

+90
-11
lines changed

3 files changed

+90
-11
lines changed

src/py/extra/http/model.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dataclasses import dataclass
2020
from pathlib import Path
2121
from enum import Enum
22+
from ..utils.logging import warning
2223
from ..utils.primitives import TPrimitive
2324
from .status import HTTP_STATUS
2425
from ..utils.io import DEFAULT_ENCODING, asWritable
@@ -139,26 +140,58 @@ def __init__(
139140

140141

141142
class HTTPBodyIO:
142-
__slots__ = ["reader", "read", "expected", "remaining"]
143+
__slots__ = ["reader", "read", "expected", "remaining", "existing"]
143144
"""Represents a body that is loaded from a reader IO."""
144145

145-
def __init__(self, reader: "HTTPBodyReader", expected: int | None = None):
146+
def __init__(
147+
self,
148+
reader: "HTTPBodyReader",
149+
expected: int | None = None,
150+
existing: bytes | None = None,
151+
):
146152
self.reader: HTTPBodyReader = reader
147153
self.read: int = 0
148154
self.expected: int | None = expected
149155
self.remaining: int | None = expected
156+
self.existing: bytes | None = existing
150157

151-
async def load(
158+
async def _read(
152159
self,
153160
) -> bytes | None:
154-
"""Loads all the data and returns a list of bodies."""
155-
payload = await self.reader.load()
156-
if payload:
157-
n = len(payload)
158-
self.read += n
159-
if self.remaining is not None:
161+
"""Reads the next available bytes"""
162+
if self.existing and self.read == 0:
163+
self.read += len(self.existing)
164+
return self.existing
165+
elif self.remaining:
166+
# FIXME: We should probably have a timeout there
167+
try:
168+
payload = await self.reader.load()
169+
except TimeoutError:
170+
warning(
171+
"Request body loading timed out",
172+
Remaining=self.remaining,
173+
Read=self.read,
174+
)
175+
return None
176+
if payload:
177+
n = len(payload)
178+
self.read += n
160179
self.remaining -= n
161-
return payload
180+
return payload
181+
else:
182+
return None
183+
184+
async def load(self) -> bytes:
185+
"""Fully loads the body."""
186+
res = bytearray()
187+
# FIXME: This would read other requests as well if there is no
188+
# remaining -- there should be at least a delimiter.
189+
while True:
190+
chunk = await self._read()
191+
if chunk:
192+
res += chunk
193+
else:
194+
return bytes(res)
162195

163196

164197
class HTTPBodyBlob(NamedTuple):
@@ -457,10 +490,17 @@ def contentType(self) -> str | None:
457490

458491
@property
459492
def body(self) -> HTTPBodyIO | HTTPBodyBlob:
493+
b = self._body
460494
if self._body is None:
461495
if not self._reader:
462496
raise RuntimeError("Request has no reader, can't read body")
463497
self._body = HTTPBodyIO(self._reader)
498+
elif isinstance(self._body, HTTPBodyBlob) and self._body.remaining:
499+
if not self._reader:
500+
raise RuntimeError("Request has no reader, can't read body")
501+
self._body = HTTPBodyIO(
502+
self._reader, expected=self._body.remaining, existing=self._body.payload
503+
)
464504
return self._body
465505

466506
@property

src/py/extra/http/parser.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,19 @@ def feed(self, chunk: bytes) -> Iterator[HTTPAtom]:
324324
if self.requestLine is None or self.requestHeaders is None:
325325
yield HTTPProcessingStatus.BadFormat
326326
else:
327+
# FIXME: In some circumstances (for POST requests),
328+
# reading the body will time out, and then the request
329+
# will be duplicated.
327330
headers = self.requestHeaders
328331
line = self.requestLine
329332
# NOTE: This is an awkward dance around the type checker
330333
if self.parser is self.bodyRest:
331334
self.parser.feed(chunk[offset:])
332335
offset = len(chunk)
333336
body = self.parser.flush()
337+
# NOTE: Careful here as we may create a request that has
338+
# a body with remaining data. The HTTP request will need
339+
# to make sure it can continue reading the body.
334340
yield (
335341
HTTPRequest(
336342
method=line.method,

src/py/extra/server.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ def __init__(
100100
async def _read(
101101
self, timeout: float = 1.0, size: int | None = None
102102
) -> bytes | None:
103+
info(
104+
"Reading Body",
105+
Client=f"{id(self.socket):x}",
106+
Size=size,
107+
Timeout=timeout,
108+
)
103109
return await asyncio.wait_for(
104110
self.loop.sock_recv(self.socket, size or self.size),
105111
timeout=timeout,
@@ -213,15 +219,31 @@ async def OnRequest(
213219
# request in the same payload, so we need to be prepared
214220
# to answer more than one request.
215221
stream = parser.feed(buffer[:n] if n != size else buffer)
222+
# TODO: Should be debug
223+
info(
224+
"Reading Requests(s)",
225+
Client=f"{id(client):x}",
226+
Read=n,
227+
Iteration=iteration,
228+
Count=req_count,
229+
)
216230
while True:
231+
217232
try:
218233
atom = next(stream)
234+
# TODO: Should be debug
235+
info("Request Atom", Atom=atom.__class__.__name__)
219236
except StopIteration:
237+
# TODO: Should be debug
238+
info("Request End")
220239
break
221240
if atom is HTTPProcessingStatus.Complete:
222241
status = atom
223242
elif isinstance(atom, HTTPRequest):
224243
req = atom
244+
# We pass the reader to the request, as for instance
245+
# the request may need more than what was available
246+
# from the socket.
225247
req._reader = reader
226248
# Logs the request method
227249
if options.logRequests:
@@ -234,8 +256,19 @@ async def OnRequest(
234256
keep_alive = False
235257
if await cls.SendResponse(req, app, writer):
236258
res_count += 1
259+
info("Request Sent", Iteration=iteration, Count=res_count)
260+
else:
261+
warning(
262+
"Sending Response Failed",
263+
Iteration=iteration,
264+
Count=res_count,
265+
)
237266
# We clear what we've read from the buffer
238-
del buffer[:n]
267+
if n != size:
268+
del buffer[:n]
269+
else:
270+
# FIXME: Or is it buffer.clear()?
271+
del buffer[:]
239272
iteration += 1
240273

241274
# NOTE: We'll need to think about the loading of the body, which

0 commit comments

Comments
 (0)