diff --git a/aiohttp/__init__.py b/aiohttp/__init__.py index 3f8a1cc62dc..5d9bb5b2697 100644 --- a/aiohttp/__init__.py +++ b/aiohttp/__init__.py @@ -88,7 +88,13 @@ payload_type, ) from .resolver import AsyncResolver, DefaultResolver, ThreadedResolver -from .streams import EMPTY_PAYLOAD, DataQueue, EofStream, StreamReader +from .streams import ( + EMPTY_PAYLOAD, + DataQueue, + EofStream, + StreamReader, + empty_stream_reader, +) from .tracing import ( TraceConfig, TraceConnectionCreateEndParams, @@ -212,6 +218,7 @@ "EMPTY_PAYLOAD", "EofStream", "StreamReader", + "empty_stream_reader", # tracing "TraceConfig", "TraceConnectionCreateEndParams", diff --git a/aiohttp/_http_parser.pyx b/aiohttp/_http_parser.pyx index f5015b297b0..ee0d01295c6 100644 --- a/aiohttp/_http_parser.pyx +++ b/aiohttp/_http_parser.pyx @@ -38,7 +38,11 @@ from .http_writer import ( HttpVersion10 as _HttpVersion10, HttpVersion11 as _HttpVersion11, ) -from .streams import EMPTY_PAYLOAD as _EMPTY_PAYLOAD, StreamReader as _StreamReader +from .streams import ( + EMPTY_PAYLOAD as _EMPTY_PAYLOAD, + StreamReader as _StreamReader, + empty_stream_reader as _empty_stream_reader, +) cimport cython @@ -70,6 +74,7 @@ cdef object SEC_WEBSOCKET_KEY1 = hdrs.SEC_WEBSOCKET_KEY1 cdef object CONTENT_ENCODING = hdrs.CONTENT_ENCODING cdef object EMPTY_PAYLOAD = _EMPTY_PAYLOAD cdef object StreamReader = _StreamReader +cdef object empty_stream_reader = _empty_stream_reader cdef object DeflateBuffer = _DeflateBuffer cdef bytes EMPTY_BYTES = b"" @@ -463,14 +468,14 @@ cdef class HttpParser: self._protocol, timer=self._timer, loop=self._loop, limit=self._limit) else: - payload = EMPTY_PAYLOAD + payload = empty_stream_reader() self._payload = payload if encoding is not None and self._auto_decompress: self._payload = DeflateBuffer(payload, encoding) if not self._response_with_body: - payload = EMPTY_PAYLOAD + payload = empty_stream_reader() self._messages.append((msg, payload)) diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index 84b59afc486..f19564bbb9d 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -54,7 +54,7 @@ TransferEncodingError, ) from .http_writer import HttpVersion, HttpVersion10 -from .streams import EMPTY_PAYLOAD, StreamReader +from .streams import EMPTY_PAYLOAD, StreamReader, empty_stream_reader from .typedefs import RawHeaders __all__ = ( @@ -436,7 +436,7 @@ def get_content_length() -> Optional[int]: if not payload_parser.done: self._payload_parser = payload_parser else: - payload = EMPTY_PAYLOAD + payload = empty_stream_reader() messages.append((msg, payload)) should_close = msg.should_close diff --git a/aiohttp/streams.py b/aiohttp/streams.py index db22f162396..56f7c22c184 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -30,6 +30,7 @@ "EofStream", "StreamReader", "DataQueue", + "empty_stream_reader", ) _T = TypeVar("_T") @@ -604,6 +605,18 @@ def read_nowait(self, n: int = -1) -> bytes: EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() +def empty_stream_reader() -> "EmptyStreamReader": + """Create a fresh EmptyStreamReader instance. + + This function should be used instead of the global EMPTY_PAYLOAD + to avoid state sharing issues between requests. + + Returns: + A new EmptyStreamReader instance. + """ + return EmptyStreamReader() + + class DataQueue(Generic[_T]): """DataQueue is a general-purpose blocking queue with one reader.""" diff --git a/tests/test_streams.py b/tests/test_streams.py index 4305f892eea..61cd5b6057f 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1131,6 +1131,49 @@ async def test_empty_stream_reader_iter_chunks() -> None: await iter_chunks.__anext__() +async def test_empty_stream_reader_function() -> None: + """Test the empty_stream_reader() function creates fresh instances.""" + reader1 = streams.empty_stream_reader() + reader2 = streams.empty_stream_reader() + + # Should be different instances + assert reader1 is not reader2 + assert reader1 is not streams.EMPTY_PAYLOAD + assert reader2 is not streams.EMPTY_PAYLOAD + + # Both should start with fresh state + assert reader1._read_eof_chunk is False + assert reader2._read_eof_chunk is False + + +async def test_empty_stream_reader_no_state_sharing() -> None: + """Test that fresh EmptyStreamReader instances don't share state.""" + reader1 = streams.empty_stream_reader() + reader2 = streams.empty_stream_reader() + + # Use reader1 - this will modify its internal state + chunks = [] + async for chunk in reader1.iter_chunks(): + chunks.append(chunk) + if len(chunks) > 5: # Safety break + break + + assert len(chunks) == 0 # Should terminate normally with no chunks + assert reader1._read_eof_chunk is True # State should be modified + + # reader2 should still have fresh state and work correctly + assert reader2._read_eof_chunk is False + + chunks = [] + async for chunk in reader2.iter_chunks(): + chunks.append(chunk) + if len(chunks) > 5: # Safety break + break + + assert len(chunks) == 0 # Should also terminate normally with no chunks + assert reader2._read_eof_chunk is True + + @pytest.fixture async def buffer(loop: asyncio.AbstractEventLoop) -> streams.DataQueue[bytes]: return streams.DataQueue(loop)