Skip to content

Commit c2e62ce

Browse files
committed
Wrapping buffer WIP
1 parent 51b6537 commit c2e62ce

File tree

3 files changed

+25
-10
lines changed

3 files changed

+25
-10
lines changed

pymongo/asynchronous/network.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,11 @@ async def receive_message_stream(
332332
# Ignore the response's request id.
333333
# data = bytearray(max_message_size)
334334
conn.conn[1].reset()
335-
data, op_code = await asyncio.wait_for(conn.conn[1].read(), timeout=None)
335+
# try:
336+
data, op_code = await asyncio.wait_for(conn.conn[1].read(), timeout=5)
337+
# except asyncio.TimeoutError:
338+
# print(f"Timed out on read in {asyncio.current_task()}. Start of reading memory at {conn.conn[1].ready_offset}, start of writing memory at {conn.conn[1].empty_offset}, max of {MAX_MESSAGE_SIZE}, messages: {conn.conn[1]._messages}")
339+
336340

337341
# length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data_stream(conn, 16, deadline))
338342
# # No request_id for exhaust cursor "getMore".

pymongo/message.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"""
2222
from __future__ import annotations
2323

24+
import asyncio
2425
import datetime
2526
import random
2627
import struct

pymongo/network_layer.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@
7676

7777
class PyMongoProtocol(asyncio.BufferedProtocol):
7878
def __init__(self):
79+
self._buffer_size = MAX_MESSAGE_SIZE
7980
self.transport = None
80-
self._buffer = memoryview(bytearray(MAX_MESSAGE_SIZE))
81+
self._buffer = memoryview(bytearray(self._buffer_size))
8182
self.expected_length = 0
8283
self.expecting_header = False
8384
self.ready_offset = 0
@@ -118,13 +119,15 @@ async def read(self):
118119
def unpack_message(self, message):
119120
start, end, opcode = message.result()
120121
if isinstance(start, tuple):
122+
# print(f"Unpacking message with start {start} and end {end} on {asyncio.current_task()}")
121123
return memoryview(
122-
self._buffer[start[0]:end[0]].tobytes() + self._buffer[start[1]:end[1]].tobytes()), opcode
124+
bytearray(self._buffer[start[0]:end[0]]) + bytearray(self._buffer[start[1]:end[1]])), opcode
123125
else:
124126
return self._buffer[start:end], opcode
125127

126128
def get_buffer(self, sizehint: int):
127-
if self.empty_offset + sizehint >= MAX_MESSAGE_SIZE - 1:
129+
# print(f"get_buffer with empty {self.empty_offset} and sizehint {sizehint}, ready {self.ready_offset}")
130+
if self.empty_offset + sizehint >= self._buffer_size:
128131
self.empty_offset = 0
129132
if self.empty_offset < self.ready_offset:
130133
return self._buffer[self.empty_offset:self.ready_offset]
@@ -139,18 +142,25 @@ def buffer_updated(self, nbytes: int):
139142
if self.expecting_header:
140143
self.expected_length, _, _, self.op_code = _UNPACK_HEADER(self._buffer[self.ready_offset:self.ready_offset + 16])
141144
self.expecting_header = False
145+
self.ready_offset += 16
146+
self.expected_length -= 16
142147

148+
# print(f"Ready: {self.ready_offset} out of {self._buffer_size}")
143149
if self.ready_offset < self.empty_offset:
144150
if self.empty_offset - self.ready_offset >= self.expected_length:
145-
self.store_message(self.ready_offset + 16, self.ready_offset + self.expected_length, self.op_code)
151+
self.store_message(self.ready_offset, self.ready_offset + self.expected_length, self.op_code)
146152
self.ready_offset += self.expected_length
147153
else:
148-
if self.ready_offset + self.expected_length <= MAX_MESSAGE_SIZE - 1:
149-
self.store_message(self.ready_offset + 16, self.ready_offset + self.expected_length, self.op_code)
154+
# print(f"Ready: {self.ready_offset}, Empty: {self.empty_offset}, expecting: {self.expected_length}")
155+
# print(f"Is linear: {self.ready_offset + self.expected_length <= self._buffer_size}, {self.ready_offset + self.expected_length} vs {self._buffer_size}")
156+
# print(f"Is wrapped: {self._buffer_size - self.ready_offset + self.empty_offset >= self.expected_length}, {self._buffer_size - self.ready_offset + self.empty_offset} vs {self.expected_length}")
157+
if self.ready_offset + self.expected_length <= self._buffer_size:
158+
self.store_message(self.ready_offset, self.ready_offset + self.expected_length, self.op_code)
150159
self.ready_offset += self.expected_length
151-
elif MAX_MESSAGE_SIZE - 1 - self.ready_offset + self.empty_offset >= self.expected_length:
152-
self.store_message((self.ready_offset, 0), (MAX_MESSAGE_SIZE - 1, self.expected_length - (MAX_MESSAGE_SIZE - 1 - self.ready_offset)), self.op_code)
153-
self.ready_offset = self.expected_length - (MAX_MESSAGE_SIZE - 1 - self.ready_offset)
160+
elif self._buffer_size - self.ready_offset + self.empty_offset >= self.expected_length:
161+
# print(f"{asyncio.current_task()} First chunk: {self._buffer_size - self.ready_offset}, second chunk: {self.expected_length - (self._buffer_size - self.ready_offset)}, total: {self._buffer_size - self.ready_offset + self.expected_length - (self._buffer_size - self.ready_offset)} of {self.expected_length}")
162+
self.store_message((self.ready_offset, 0), (self._buffer_size, self.expected_length - (self._buffer_size - self.ready_offset)), self.op_code)
163+
self.ready_offset = self.expected_length - (self._buffer_size - self.ready_offset)
154164

155165
def store_message(self, start, end, opcode):
156166
stored = False

0 commit comments

Comments
 (0)