Skip to content

PYTHON-4493 - Use asyncio protocols instead of sockets for network IO #2151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 70 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
4bff4fd
Standalone commands only
NoahStapp Dec 3, 2024
488c93f
remove socket code
NoahStapp Dec 3, 2024
4601fbf
Add TLS support
NoahStapp Dec 3, 2024
fc010ee
Support reading more than 64KB
NoahStapp Dec 3, 2024
39dcb1e
debugging
NoahStapp Dec 3, 2024
3e9d992
Spike
NoahStapp Dec 6, 2024
4853245
Test BufferedProtocol
NoahStapp Dec 16, 2024
09dbece
Remove TOTALs
NoahStapp Dec 17, 2024
79705b9
Restore TOTALS
NoahStapp Dec 17, 2024
39e9ea5
Restore TOTALS
NoahStapp Dec 17, 2024
0f165b7
Comment out unused networking
NoahStapp Dec 18, 2024
83e7e6b
Only one drain waiter
NoahStapp Dec 18, 2024
f638c04
Reuse protocol buffer
NoahStapp Dec 18, 2024
51b6537
Use sliding buffer for protocols
NoahStapp Dec 18, 2024
c2e62ce
Wrapping buffer WIP
NoahStapp Dec 19, 2024
63cfbbc
Don't unpack messages inside protocol
NoahStapp Jan 2, 2025
55dfaca
Working protocols with debugging prints
NoahStapp Jan 7, 2025
2d0f4c1
Final POC for Wire Protocol-handling protocol
NoahStapp Jan 7, 2025
4b25e95
Fix synchro
NoahStapp Jan 7, 2025
574c0ec
WIP abstraction of Connection.conn to Connection.NetworkingInterface
NoahStapp Jan 8, 2025
482485d
Sync tests all passing
NoahStapp Jan 9, 2025
cf27d65
WIP exhaust cursors should return all data in first read
NoahStapp Jan 13, 2025
fbd33cd
WIP exhaust + changestream support in protocols
NoahStapp Jan 14, 2025
8e56a93
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Feb 19, 2025
e7aedbf
Test cleanup
NoahStapp Feb 19, 2025
bbe4ef8
More cleanup
NoahStapp Feb 19, 2025
cfeb7c4
No SSL no auth single tests passing
NoahStapp Feb 19, 2025
1eccc12
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Feb 19, 2025
d73aab1
Correctly pause and resume reading
NoahStapp Feb 20, 2025
f13fe67
Fix typing and KMS requests
NoahStapp Feb 20, 2025
0abdcc0
More typing fixes
NoahStapp Feb 20, 2025
ca1d208
Fix overflow calculation
NoahStapp Feb 20, 2025
850ff33
Fix encryption
NoahStapp Feb 21, 2025
9aa51c5
Fix connection helpers
NoahStapp Feb 21, 2025
e528773
Fixes
NoahStapp Feb 21, 2025
427bc36
Re-combine into pool_shared
NoahStapp Feb 21, 2025
2d341b8
More typing fixes
NoahStapp Feb 21, 2025
f1d1486
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Feb 21, 2025
f1dd064
Use synchronous getaddrinfo
NoahStapp Feb 21, 2025
77389e4
Typing
NoahStapp Feb 21, 2025
e00e44f
Fix buffer_updated race condition with read
NoahStapp Feb 26, 2025
01af881
Fix process_header offsets
NoahStapp Feb 26, 2025
02cf3c1
Store opcode in future for read
NoahStapp Feb 26, 2025
36fb9cf
Fix multiple message processing in buffer_updated
NoahStapp Feb 26, 2025
b5ac22d
Fix recursive buffer_updated condition
NoahStapp Feb 26, 2025
9fd9176
Cleanup + comments
NoahStapp Feb 28, 2025
2af8299
Fix request_id resetting
NoahStapp Feb 28, 2025
3e45d72
Fix synchronous socket pending
NoahStapp Feb 28, 2025
b94afb4
Update get_buffer docstring
NoahStapp Feb 28, 2025
2aff6b1
Reset Protocol after each message
NoahStapp Mar 3, 2025
ee1f308
Test edge case where get_buffer is called with a sizehint of 0 on SSL
NoahStapp Mar 3, 2025
344f1f9
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 4, 2025
d8f4734
Add SSL EOF check in get_buffer
NoahStapp Mar 4, 2025
f3ebf92
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 4, 2025
be4bec6
PyMongoProtocol.read() returns a copy of the buffer
NoahStapp Mar 4, 2025
04fa6db
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 5, 2025
09d5e49
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 12, 2025
0c11812
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 14, 2025
9b00835
PYTHON-4493 set_write_buffer_limits plus ignore paused writing
ShaneHarvey Mar 14, 2025
90e6720
PYTHON-4493 Fix read perf
ShaneHarvey Mar 18, 2025
95569ac
PYTHON-4493 Fix type errors
ShaneHarvey Mar 20, 2025
a8c9fc1
PYTHON-4493 Remove broken SSL workaround in get_buffer
ShaneHarvey Mar 21, 2025
f5cbce0
PYTHON-4493 Remove unused buffer size arg, await future in close()
ShaneHarvey Mar 25, 2025
993de69
PYTHON-4493 Add workaround for SSL ProtocolError issues
ShaneHarvey Mar 25, 2025
b69688b
Call transport.close instead of connection_lost
NoahStapp Mar 25, 2025
506527f
Test removing transport.is_closing check
NoahStapp Mar 26, 2025
988ee60
Resume reading only in read
NoahStapp Mar 26, 2025
7006cf4
Don't store request_id
NoahStapp Mar 26, 2025
712024c
Merge branch 'master' into PYTHON-4493-prod
NoahStapp Mar 26, 2025
78a2cd7
Address review
NoahStapp Mar 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pymongo/asynchronous/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@
from pymongo.asynchronous.cursor import AsyncCursor
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.mongo_client import AsyncMongoClient
from pymongo.asynchronous.pool import (
_configured_socket,
_get_timeout_details,
_raise_connection_failure,
)
from pymongo.common import CONNECT_TIMEOUT
from pymongo.daemon import _spawn_daemon
from pymongo.encryption_options import AutoEncryptionOpts, RangeOpts
Expand All @@ -80,12 +75,17 @@
NetworkTimeout,
ServerSelectionTimeoutError,
)
from pymongo.network_layer import BLOCKING_IO_ERRORS, async_sendall
from pymongo.network_layer import async_socket_sendall
from pymongo.operations import UpdateOne
from pymongo.pool_options import PoolOptions
from pymongo.pool_shared import (
_async_configured_socket,
_get_timeout_details,
_raise_connection_failure,
)
from pymongo.read_concern import ReadConcern
from pymongo.results import BulkWriteResult, DeleteResult
from pymongo.ssl_support import get_ssl_context
from pymongo.ssl_support import BLOCKING_IO_ERRORS, get_ssl_context
from pymongo.typings import _DocumentType, _DocumentTypeArg
from pymongo.uri_parser_shared import parse_host
from pymongo.write_concern import WriteConcern
Expand Down Expand Up @@ -113,7 +113,7 @@

async def _connect_kms(address: _Address, opts: PoolOptions) -> Union[socket.socket, _sslConn]:
try:
return await _configured_socket(address, opts)
return await _async_configured_socket(address, opts)
except Exception as exc:
_raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts))

Expand Down Expand Up @@ -196,7 +196,7 @@ async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
try:
conn = await _connect_kms(address, opts)
try:
await async_sendall(conn, message)
await async_socket_sendall(conn, message)
while kms_context.bytes_needed > 0:
# CSOT: update timeout.
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
Expand Down
2 changes: 1 addition & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2078,7 +2078,7 @@ async def _cleanup_cursor_lock(
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
await conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
Expand Down
6 changes: 5 additions & 1 deletion pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
from pymongo.server_description import ServerDescription

if TYPE_CHECKING:
from pymongo.asynchronous.pool import AsyncConnection, Pool, _CancellationContext
from pymongo.asynchronous.pool import ( # type: ignore[attr-defined]
AsyncConnection,
Pool,
_CancellationContext,
)
from pymongo.asynchronous.settings import TopologySettings
from pymongo.asynchronous.topology import Topology

Expand Down
59 changes: 5 additions & 54 deletions pymongo/asynchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import datetime
import logging
import time
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -31,20 +30,16 @@

from bson import _decode_all_selective
from pymongo import _csot, helpers_shared, message
from pymongo.common import MAX_MESSAGE_SIZE
from pymongo.compression_support import _NO_COMPRESSION, decompress
from pymongo.compression_support import _NO_COMPRESSION
from pymongo.errors import (
NotPrimaryError,
OperationFailure,
ProtocolError,
)
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply
from pymongo.message import _OpMsg
from pymongo.monitoring import _is_speculative_authenticate
from pymongo.network_layer import (
_UNPACK_COMPRESSION_HEADER,
_UNPACK_HEADER,
async_receive_data,
async_receive_message,
async_sendall,
)

Expand Down Expand Up @@ -194,13 +189,13 @@ async def command(
)

try:
await async_sendall(conn.conn, msg)
await async_sendall(conn.conn.get_conn, msg)
if use_op_msg and unacknowledged:
# Unacknowledged, fake a successful command response.
reply = None
response_doc: _DocumentOut = {"ok": 1}
else:
reply = await receive_message(conn, request_id)
reply = await async_receive_message(conn, request_id)
conn.more_to_come = reply.more_to_come
unpacked_docs = reply.unpack_response(
codec_options=codec_options, user_fields=user_fields
Expand Down Expand Up @@ -301,47 +296,3 @@ async def command(
)

return response_doc # type: ignore[return-value]


async def receive_message(
conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
) -> Union[_OpReply, _OpMsg]:
"""Receive a raw BSON message or raise socket.error."""
if _csot.get_timeout():
deadline = _csot.get_deadline()
else:
timeout = conn.conn.gettimeout()
if timeout:
deadline = time.monotonic() + timeout
else:
deadline = None
# Ignore the response's request id.
length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data(conn, 16, deadline))
# No request_id for exhaust cursor "getMore".
if request_id is not None:
if request_id != response_to:
raise ProtocolError(f"Got response id {response_to!r} but expected {request_id!r}")
if length <= 16:
raise ProtocolError(
f"Message length ({length!r}) not longer than standard message header size (16)"
)
if length > max_message_size:
raise ProtocolError(
f"Message length ({length!r}) is larger than server max "
f"message size ({max_message_size!r})"
)
if op_code == 2012:
op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(
await async_receive_data(conn, 9, deadline)
)
data = decompress(await async_receive_data(conn, length - 25, deadline), compressor_id)
else:
data = await async_receive_data(conn, length - 16, deadline)

try:
unpack_reply = _UNPACK_REPLY[op_code]
except KeyError:
raise ProtocolError(
f"Got opcode {op_code!r} but expected {_UNPACK_REPLY.keys()!r}"
) from None
return unpack_reply(data)
Loading
Loading