Skip to content

Commit 55dfaca

Browse files
committed
Working protocols with debugging prints
1 parent 63cfbbc commit 55dfaca

File tree

4 files changed

+123
-141
lines changed

4 files changed

+123
-141
lines changed

pymongo/asynchronous/network.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from bson import CodecOptions
5454
from pymongo.asynchronous.client_session import AsyncClientSession
5555
from pymongo.asynchronous.mongo_client import AsyncMongoClient
56-
from pymongo.asynchronous.pool import AsyncConnection, AsyncStreamConnection, AsyncConnectionStream
56+
from pymongo.asynchronous.pool import AsyncConnection, AsyncStreamConnection, AsyncConnectionProtocol
5757
from pymongo.compression_support import SnappyContext, ZlibContext, ZstdContext
5858
from pymongo.monitoring import _EventListeners
5959
from pymongo.read_concern import ReadConcern
@@ -70,7 +70,7 @@
7070

7171

7272
async def command_stream(
73-
conn: AsyncConnectionStream,
73+
conn: AsyncConnectionProtocol,
7474
dbname: str,
7575
spec: MutableMapping[str, Any],
7676
is_mongos: bool,
@@ -317,7 +317,7 @@ async def command_stream(
317317

318318

319319
async def receive_message(
320-
conn: AsyncConnectionStream, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
320+
conn: AsyncConnectionProtocol, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
321321
) -> Union[_OpReply, _OpMsg]:
322322
"""Receive a raw BSON message or raise socket.error."""
323323
# if _csot.get_timeout():
@@ -330,28 +330,28 @@ async def receive_message(
330330
# deadline = None
331331
deadline = None
332332
# Ignore the response's request id.
333-
length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data(conn, 16, deadline))
333+
data, op_code = await async_receive_data(conn, 0, deadline)
334+
# length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data(conn, 16, deadline))
334335
# No request_id for exhaust cursor "getMore".
335-
if request_id is not None:
336-
if request_id != response_to:
337-
raise ProtocolError(f"Got response id {response_to!r} but expected {request_id!r}")
338-
if length <= 16:
339-
raise ProtocolError(
340-
f"Message length ({length!r}) not longer than standard message header size (16)"
341-
)
342-
if length > max_message_size:
343-
raise ProtocolError(
344-
f"Message length ({length!r}) is larger than server max "
345-
f"message size ({max_message_size!r})"
346-
)
347-
if op_code == 2012:
348-
op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(
349-
await async_receive_data(conn, 9, deadline)
350-
)
351-
data = decompress(await async_receive_data(conn, length - 25, deadline), compressor_id)
352-
else:
353-
data = await async_receive_data(conn, length - 16, deadline)
354-
336+
# if request_id is not None:
337+
# if request_id != response_to:
338+
# raise ProtocolError(f"Got response id {response_to!r} but expected {request_id!r}")
339+
# if length <= 16:
340+
# raise ProtocolError(
341+
# f"Message length ({length!r}) not longer than standard message header size (16)"
342+
# )
343+
# if length > max_message_size:
344+
# raise ProtocolError(
345+
# f"Message length ({length!r}) is larger than server max "
346+
# f"message size ({max_message_size!r})"
347+
# )
348+
# if op_code == 2012:
349+
# op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(
350+
# await async_receive_data(conn, 9, deadline)
351+
# )
352+
# data = decompress(await async_receive_data(conn, length - 25, deadline), compressor_id)
353+
# else:
354+
# data = await async_receive_data(conn, length - 16, deadline)
355355
try:
356356
unpack_reply = _UNPACK_REPLY[op_code]
357357
except KeyError:

pymongo/asynchronous/pool.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None:
578578
try:
579579
await async_sendall(self.conn, message)
580580
except BaseException as error:
581+
print(error)
581582
self._raise_connection_failure(error)
582583

583584
async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]:
@@ -784,7 +785,7 @@ def __repr__(self) -> str:
784785
)
785786

786787

787-
class AsyncConnectionStream:
788+
class AsyncConnectionProtocol:
788789
"""Store a connection with some metadata.
789790
790791
:param conn: a raw connection object
@@ -1818,7 +1819,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
18181819
self.requests -= 1
18191820
self.size_cond.notify()
18201821

1821-
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnectionStream:
1822+
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnectionProtocol:
18221823
"""Connect to Mongo and return a new AsyncConnection.
18231824
18241825
Can raise ConnectionFailure.
@@ -1874,7 +1875,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
18741875

18751876
raise
18761877

1877-
conn = AsyncConnectionStream(sock, self, self.address, conn_id) # type: ignore[arg-type]
1878+
conn = AsyncConnectionProtocol(sock, self, self.address, conn_id) # type: ignore[arg-type]
18781879
async with self.lock:
18791880
self.active_contexts.add(conn.cancel_context)
18801881
self.active_contexts.discard(tmp_context)
@@ -1899,7 +1900,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
18991900
@contextlib.asynccontextmanager
19001901
async def checkout(
19011902
self, handler: Optional[_MongoClientErrorHandler] = None
1902-
) -> AsyncGenerator[AsyncConnectionStream, None]:
1903+
) -> AsyncGenerator[AsyncConnectionProtocol, None]:
19031904
"""Get a connection from the pool. Use with a "with" statement.
19041905
19051906
Returns a :class:`AsyncConnection` object wrapping a connected
@@ -2002,7 +2003,7 @@ def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) ->
20022003

20032004
async def _get_conn(
20042005
self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None
2005-
) -> AsyncConnectionStream:
2006+
) -> AsyncConnectionProtocol:
20062007
"""Get or create a AsyncConnection. Can raise ConnectionFailure."""
20072008
# We use the pid here to avoid issues with fork / multiprocessing.
20082009
# See test.test_client:TestClient.test_fork for an example of

pymongo/message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1547,7 +1547,7 @@ def unpack(cls, msg: bytes) -> _OpMsg:
15471547
raise ProtocolError(f"Unsupported OP_MSG payload type: 0x{first_payload_type:x}")
15481548

15491549
if len(msg) != first_payload_size + 5:
1550-
raise ProtocolError("Unsupported OP_MSG reply: >1 section")
1550+
raise ProtocolError(f"Unsupported OP_MSG reply: >1 section, {len(msg)} vs {first_payload_size + 5}")
15511551

15521552
payload_document = msg[5:]
15531553
return cls(flags, payload_document)

0 commit comments

Comments
 (0)