@@ -603,8 +603,9 @@ def buffer_updated(self, nbytes: int) -> None:
603
603
if self ._pending_messages :
604
604
done = self ._pending_messages .popleft ()
605
605
else :
606
- done = asyncio .get_event_loop ().create_future ()
606
+ done = asyncio .get_running_loop ().create_future ()
607
607
done .set_result ((self ._start , self ._body_length ))
608
+ self ._start = 0
608
609
self ._done_messages .append (done )
609
610
if self ._length > self ._body_length :
610
611
self ._read_waiter = asyncio .get_running_loop ().create_future ()
@@ -613,12 +614,14 @@ def buffer_updated(self, nbytes: int) -> None:
613
614
extra = self ._length - self ._body_length
614
615
self ._length -= extra
615
616
self ._expecting_header = True
617
+ self ._body_length = 0
618
+ self ._op_code = None # type: ignore[assignment]
616
619
self .buffer_updated (extra )
617
620
self .transport .pause_reading ()
618
621
619
622
def process_header (self ) -> tuple [int , int ]:
620
623
"""Unpack a MongoDB Wire Protocol header."""
621
- length , _ , response_to , op_code = _UNPACK_HEADER (self ._buffer [: 16 ])
624
+ length , _ , response_to , op_code = _UNPACK_HEADER (self ._buffer [self . _start : 16 ])
622
625
# No request_id for exhaust cursor "getMore".
623
626
if self ._request_id is not None :
624
627
if self ._request_id != response_to :
0 commit comments