Skip to content

Commit aa42f60

Browse files
authored
[PR #9659/1bb146a backport][3.11] Refactor FlowControlDataQueue to improve performances (#9665)
1 parent 8f6d7b5 commit aa42f60

File tree

2 files changed

+34
-32
lines changed

2 files changed

+34
-32
lines changed

CHANGES/9659.misc.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved performance of the internal ``DataQueue`` -- by :user:`bdraco`.

aiohttp/streams.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,6 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
622622
self._eof = False
623623
self._waiter: Optional[asyncio.Future[None]] = None
624624
self._exception: Optional[BaseException] = None
625-
self._size = 0
626625
self._buffer: Deque[Tuple[_T, int]] = collections.deque()
627626

628627
def __len__(self) -> int:
@@ -644,48 +643,40 @@ def set_exception(
644643
) -> None:
645644
self._eof = True
646645
self._exception = exc
647-
648-
waiter = self._waiter
649-
if waiter is not None:
646+
if (waiter := self._waiter) is not None:
650647
self._waiter = None
651648
set_exception(waiter, exc, exc_cause)
652649

653650
def feed_data(self, data: _T, size: int = 0) -> None:
654-
self._size += size
655651
self._buffer.append((data, size))
656-
657-
waiter = self._waiter
658-
if waiter is not None:
652+
if (waiter := self._waiter) is not None:
659653
self._waiter = None
660654
set_result(waiter, None)
661655

662656
def feed_eof(self) -> None:
663657
self._eof = True
664-
665-
waiter = self._waiter
666-
if waiter is not None:
658+
if (waiter := self._waiter) is not None:
667659
self._waiter = None
668660
set_result(waiter, None)
669661

662+
async def _wait_for_data(self) -> None:
663+
assert not self._waiter
664+
self._waiter = self._loop.create_future()
665+
try:
666+
await self._waiter
667+
except (asyncio.CancelledError, asyncio.TimeoutError):
668+
self._waiter = None
669+
raise
670+
670671
async def read(self) -> _T:
671672
if not self._buffer and not self._eof:
672-
assert not self._waiter
673-
self._waiter = self._loop.create_future()
674-
try:
675-
await self._waiter
676-
except (asyncio.CancelledError, asyncio.TimeoutError):
677-
self._waiter = None
678-
raise
679-
673+
await self._wait_for_data()
680674
if self._buffer:
681-
data, size = self._buffer.popleft()
682-
self._size -= size
675+
data, _ = self._buffer.popleft()
683676
return data
684-
else:
685-
if self._exception is not None:
686-
raise self._exception
687-
else:
688-
raise EofStream
677+
if self._exception is not None:
678+
raise self._exception
679+
raise EofStream
689680

690681
def __aiter__(self) -> AsyncStreamIterator[_T]:
691682
return AsyncStreamIterator(self.read)
@@ -701,19 +692,29 @@ def __init__(
701692
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
702693
) -> None:
703694
super().__init__(loop=loop)
704-
695+
self._size = 0
705696
self._protocol = protocol
706697
self._limit = limit * 2
698+
self._buffer: Deque[Tuple[_T, int]] = collections.deque()
707699

708700
def feed_data(self, data: _T, size: int = 0) -> None:
709-
super().feed_data(data, size)
710-
701+
self._size += size
702+
self._buffer.append((data, size))
703+
if (waiter := self._waiter) is not None:
704+
self._waiter = None
705+
set_result(waiter, None)
711706
if self._size > self._limit and not self._protocol._reading_paused:
712707
self._protocol.pause_reading()
713708

714709
async def read(self) -> _T:
715-
try:
716-
return await super().read()
717-
finally:
710+
if not self._buffer and not self._eof:
711+
await self._wait_for_data()
712+
if self._buffer:
713+
data, size = self._buffer.popleft()
714+
self._size -= size
718715
if self._size < self._limit and self._protocol._reading_paused:
719716
self._protocol.resume_reading()
717+
return data
718+
if self._exception is not None:
719+
raise self._exception
720+
raise EofStream

0 commit comments

Comments
 (0)