|
17 | 17 |
|
18 | 18 | import datetime |
19 | 19 | import logging |
20 | | -import time |
21 | 20 | from typing import ( |
22 | 21 | TYPE_CHECKING, |
23 | 22 | Any, |
|
31 | 30 |
|
32 | 31 | from bson import _decode_all_selective |
33 | 32 | from pymongo import _csot, helpers_shared, message |
34 | | -from pymongo.common import MAX_MESSAGE_SIZE |
35 | | -from pymongo.compression_support import _NO_COMPRESSION, decompress |
| 33 | +from pymongo.compression_support import _NO_COMPRESSION |
36 | 34 | from pymongo.errors import ( |
37 | 35 | NotPrimaryError, |
38 | 36 | OperationFailure, |
39 | | - ProtocolError, |
40 | 37 | ) |
41 | 38 | from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log |
42 | | -from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply |
| 39 | +from pymongo.message import _OpMsg |
43 | 40 | from pymongo.monitoring import _is_speculative_authenticate |
44 | 41 | from pymongo.network_layer import ( |
45 | | - _UNPACK_COMPRESSION_HEADER, |
46 | | - _UNPACK_HEADER, |
47 | | - async_receive_data, |
| 42 | + async_receive_message, |
48 | 43 | async_sendall, |
49 | 44 | ) |
50 | 45 |
|
@@ -194,13 +189,13 @@ async def command( |
194 | 189 | ) |
195 | 190 |
|
196 | 191 | try: |
197 | | - await async_sendall(conn.conn, msg) |
| 192 | + await async_sendall(conn.conn.get_conn, msg) |
198 | 193 | if use_op_msg and unacknowledged: |
199 | 194 | # Unacknowledged, fake a successful command response. |
200 | 195 | reply = None |
201 | 196 | response_doc: _DocumentOut = {"ok": 1} |
202 | 197 | else: |
203 | | - reply = await receive_message(conn, request_id) |
| 198 | + reply = await async_receive_message(conn, request_id) |
204 | 199 | conn.more_to_come = reply.more_to_come |
205 | 200 | unpacked_docs = reply.unpack_response( |
206 | 201 | codec_options=codec_options, user_fields=user_fields |
@@ -301,47 +296,3 @@ async def command( |
301 | 296 | ) |
302 | 297 |
|
303 | 298 | return response_doc # type: ignore[return-value] |
304 | | - |
305 | | - |
306 | | -async def receive_message( |
307 | | - conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE |
308 | | -) -> Union[_OpReply, _OpMsg]: |
309 | | - """Receive a raw BSON message or raise socket.error.""" |
310 | | - if _csot.get_timeout(): |
311 | | - deadline = _csot.get_deadline() |
312 | | - else: |
313 | | - timeout = conn.conn.gettimeout() |
314 | | - if timeout: |
315 | | - deadline = time.monotonic() + timeout |
316 | | - else: |
317 | | - deadline = None |
318 | | - # Ignore the response's request id. |
319 | | - length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data(conn, 16, deadline)) |
320 | | - # No request_id for exhaust cursor "getMore". |
321 | | - if request_id is not None: |
322 | | - if request_id != response_to: |
323 | | - raise ProtocolError(f"Got response id {response_to!r} but expected {request_id!r}") |
324 | | - if length <= 16: |
325 | | - raise ProtocolError( |
326 | | - f"Message length ({length!r}) not longer than standard message header size (16)" |
327 | | - ) |
328 | | - if length > max_message_size: |
329 | | - raise ProtocolError( |
330 | | - f"Message length ({length!r}) is larger than server max " |
331 | | - f"message size ({max_message_size!r})" |
332 | | - ) |
333 | | - if op_code == 2012: |
334 | | - op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER( |
335 | | - await async_receive_data(conn, 9, deadline) |
336 | | - ) |
337 | | - data = decompress(await async_receive_data(conn, length - 25, deadline), compressor_id) |
338 | | - else: |
339 | | - data = await async_receive_data(conn, length - 16, deadline) |
340 | | - |
341 | | - try: |
342 | | - unpack_reply = _UNPACK_REPLY[op_code] |
343 | | - except KeyError: |
344 | | - raise ProtocolError( |
345 | | - f"Got opcode {op_code!r} but expected {_UNPACK_REPLY.keys()!r}" |
346 | | - ) from None |
347 | | - return unpack_reply(data) |
0 commit comments