@@ -511,6 +511,8 @@ async def read(self, bytes_needed: int) -> bytes:
511
511
raise OSError ("connection is already closed" ) from None
512
512
if self .transport and self .transport .is_closing ():
513
513
raise OSError ("connection is already closed" )
514
+ if self ._bytes_ready >= bytes_needed :
515
+ return self ._read (bytes_needed )
514
516
self ._pending_reads .append (bytes_needed )
515
517
read_waiter = asyncio .get_running_loop ().create_future ()
516
518
self ._pending_listeners .append (read_waiter )
@@ -540,18 +542,24 @@ def buffer_updated(self, nbytes: int) -> None:
540
542
self ._bytes_ready += nbytes
541
543
542
544
# Bail we don't have the current requested number of bytes.
543
- n_requested = self ._bytes_requested
544
- if n_requested == 0 and self ._pending_reads :
545
- n_requested = self ._pending_reads .popleft ()
546
- if n_requested == 0 or self ._bytes_ready < n_requested :
545
+ bytes_needed = self ._bytes_requested
546
+ if bytes_needed == 0 and self ._pending_reads :
547
+ bytes_needed = self ._pending_reads .popleft ()
548
+ if bytes_needed == 0 or self ._bytes_ready < bytes_needed :
547
549
return
548
550
551
+ data = self ._read (bytes_needed )
552
+ waiter = self ._pending_listeners .popleft ()
553
+ waiter .set_result (data )
554
+
555
+ def _read (self , bytes_needed ):
556
+ """Read bytes from the buffer."""
549
557
# Send the bytes to the listener.
550
- self ._bytes_ready -= n_requested
558
+ self ._bytes_ready -= bytes_needed
551
559
self ._bytes_requested = 0
552
- waiter = self . _pending_listeners . popleft ()
553
- output_buf = bytearray (n_requested )
554
- n_remaining = n_requested
560
+
561
+ output_buf = bytearray (bytes_needed )
562
+ n_remaining = bytes_needed
555
563
out_index = 0
556
564
while n_remaining > 0 :
557
565
buffer = self ._buffers .popleft ()
@@ -572,7 +580,7 @@ def buffer_updated(self, nbytes: int) -> None:
572
580
n_remaining -= buffer_remaining
573
581
buffer .start_index = 0
574
582
self ._buffer_pool .append (buffer )
575
- waiter . set_result ( output_buf )
583
+ return output_buf
576
584
577
585
578
586
async def async_sendall (conn : PyMongoBaseProtocol , buf : bytes ) -> None :
0 commit comments