Skip to content

Fix infinite iter_chunks() on empty response #11397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@
payload_type,
)
from .resolver import AsyncResolver, DefaultResolver, ThreadedResolver
from .streams import EMPTY_PAYLOAD, DataQueue, EofStream, StreamReader
from .streams import (
EMPTY_PAYLOAD,
DataQueue,
EofStream,
StreamReader,
empty_stream_reader,
)
from .tracing import (
TraceConfig,
TraceConnectionCreateEndParams,
Expand Down Expand Up @@ -212,6 +218,7 @@
"EMPTY_PAYLOAD",
"EofStream",
"StreamReader",
"empty_stream_reader",
# tracing
"TraceConfig",
"TraceConnectionCreateEndParams",
Expand Down
11 changes: 8 additions & 3 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ from .http_writer import (
HttpVersion10 as _HttpVersion10,
HttpVersion11 as _HttpVersion11,
)
from .streams import EMPTY_PAYLOAD as _EMPTY_PAYLOAD, StreamReader as _StreamReader
from .streams import (
EMPTY_PAYLOAD as _EMPTY_PAYLOAD,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is presumably also unused now.

StreamReader as _StreamReader,
empty_stream_reader as _empty_stream_reader,
)

cimport cython

Expand Down Expand Up @@ -70,6 +74,7 @@ cdef object SEC_WEBSOCKET_KEY1 = hdrs.SEC_WEBSOCKET_KEY1
cdef object CONTENT_ENCODING = hdrs.CONTENT_ENCODING
cdef object EMPTY_PAYLOAD = _EMPTY_PAYLOAD
cdef object StreamReader = _StreamReader
cdef object empty_stream_reader = _empty_stream_reader
cdef object DeflateBuffer = _DeflateBuffer
cdef bytes EMPTY_BYTES = b""

Expand Down Expand Up @@ -463,14 +468,14 @@ cdef class HttpParser:
self._protocol, timer=self._timer, loop=self._loop,
limit=self._limit)
else:
payload = EMPTY_PAYLOAD
payload = empty_stream_reader()

self._payload = payload
if encoding is not None and self._auto_decompress:
self._payload = DeflateBuffer(payload, encoding)

if not self._response_with_body:
payload = EMPTY_PAYLOAD
payload = empty_stream_reader()

self._messages.append((msg, payload))

Expand Down
4 changes: 2 additions & 2 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
TransferEncodingError,
)
from .http_writer import HttpVersion, HttpVersion10
from .streams import EMPTY_PAYLOAD, StreamReader
from .streams import EMPTY_PAYLOAD, StreamReader, empty_stream_reader

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'EMPTY_PAYLOAD' is not used.
from .typedefs import RawHeaders

__all__ = (
Expand Down Expand Up @@ -436,7 +436,7 @@
if not payload_parser.done:
self._payload_parser = payload_parser
else:
payload = EMPTY_PAYLOAD
payload = empty_stream_reader()

messages.append((msg, payload))
should_close = msg.should_close
Expand Down
13 changes: 13 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"EofStream",
"StreamReader",
"DataQueue",
"empty_stream_reader",
)

_T = TypeVar("_T")
Expand Down Expand Up @@ -604,6 +605,18 @@ def read_nowait(self, n: int = -1) -> bytes:
EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably want a followup PR to remove EMPTY_PAYLOAD too, without backporting.



def empty_stream_reader() -> "EmptyStreamReader":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point to having a function here, and not just importing EmptyStreamReader?

"""Create a fresh EmptyStreamReader instance.

This function should be used instead of the global EMPTY_PAYLOAD
to avoid state sharing issues between requests.

Returns:
A new EmptyStreamReader instance.
"""
return EmptyStreamReader()


class DataQueue(Generic[_T]):
"""DataQueue is a general-purpose blocking queue with one reader."""

Expand Down
43 changes: 43 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,49 @@ async def test_empty_stream_reader_iter_chunks() -> None:
await iter_chunks.__anext__()


async def test_empty_stream_reader_function() -> None:
"""Test the empty_stream_reader() function creates fresh instances."""
reader1 = streams.empty_stream_reader()
reader2 = streams.empty_stream_reader()

# Should be different instances
assert reader1 is not reader2
assert reader1 is not streams.EMPTY_PAYLOAD
assert reader2 is not streams.EMPTY_PAYLOAD

# Both should start with fresh state
assert reader1._read_eof_chunk is False
assert reader2._read_eof_chunk is False


async def test_empty_stream_reader_no_state_sharing() -> None:
"""Test that fresh EmptyStreamReader instances don't share state."""
reader1 = streams.empty_stream_reader()
reader2 = streams.empty_stream_reader()

# Use reader1 - this will modify its internal state
chunks = []
async for chunk in reader1.iter_chunks():
chunks.append(chunk)
if len(chunks) > 5: # Safety break
break
Comment on lines +1156 to +1159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage doesn't seem happy with these. Not sure if it's just confused, as it seems to suggest that the break line is reached without any of the above lines being reached...


assert len(chunks) == 0 # Should terminate normally with no chunks
Comment on lines +1155 to +1161
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the expected behaviour is that we don't iterate here, then we should do something like:

Suggested change
chunks = []
async for chunk in reader1.iter_chunks():
chunks.append(chunk)
if len(chunks) > 5: # Safety break
break
assert len(chunks) == 0 # Should terminate normally with no chunks
with pytest.raises(StopIteration):
await anext(reader1.iter_chunks())

assert reader1._read_eof_chunk is True # State should be modified

# reader2 should still have fresh state and work correctly
assert reader2._read_eof_chunk is False

chunks = []
async for chunk in reader2.iter_chunks():
chunks.append(chunk)
if len(chunks) > 5: # Safety break
break

assert len(chunks) == 0 # Should also terminate normally with no chunks
assert reader2._read_eof_chunk is True


@pytest.fixture
async def buffer(loop: asyncio.AbstractEventLoop) -> streams.DataQueue[bytes]:
return streams.DataQueue(loop)
Expand Down
Loading