Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
4158326
PYTHON-4700 - Convert CSFLE tests to async
NoahStapp Sep 6, 2024
acc8bdc
Add async TestSpecCreator
NoahStapp Sep 9, 2024
e6d4b11
Fix imports
NoahStapp Sep 9, 2024
d6a0429
Merge branch 'master' into PYTHON-4700
NoahStapp Sep 9, 2024
1922702
Linting
NoahStapp Sep 9, 2024
97f39dd
Predicate
NoahStapp Sep 9, 2024
7a4df29
Add test duration
NoahStapp Sep 26, 2024
c589fca
Merge branch 'master' into PYTHON-4700
NoahStapp Sep 26, 2024
689b0b8
Cleanup
NoahStapp Sep 27, 2024
6feec44
WIP
NoahStapp Sep 27, 2024
17388d1
Vastly improve performance
NoahStapp Oct 3, 2024
b472d3a
Async KMS
NoahStapp Oct 3, 2024
3bdd381
Fix deadline
NoahStapp Oct 7, 2024
8e6a30d
Fix async KMS
NoahStapp Oct 7, 2024
8ca4b0e
Fix _require
NoahStapp Oct 7, 2024
a229746
Type error fixes
NoahStapp Oct 7, 2024
88bf8d8
Merge branch 'master' into PYTHON-4700
NoahStapp Oct 8, 2024
4f46e0f
testing
NoahStapp Oct 8, 2024
7ac5ede
testing
NoahStapp Oct 8, 2024
a99e928
testing
NoahStapp Oct 8, 2024
e1c1dce
testing
NoahStapp Oct 8, 2024
5d15c89
testing
NoahStapp Oct 8, 2024
357355e
testing
NoahStapp Oct 8, 2024
4d85111
testing
NoahStapp Oct 8, 2024
66b7212
testing
NoahStapp Oct 8, 2024
02939ce
testing
NoahStapp Oct 8, 2024
e3cf7fb
testing
NoahStapp Oct 8, 2024
54992ef
testing
NoahStapp Oct 8, 2024
b01fa8a
testing
NoahStapp Oct 8, 2024
0a9d2ad
testing
NoahStapp Oct 8, 2024
4997c18
testing
NoahStapp Oct 8, 2024
04596d6
testing
NoahStapp Oct 9, 2024
636f869
testing
NoahStapp Oct 9, 2024
b427c56
testing
NoahStapp Oct 9, 2024
a68d09f
testing
NoahStapp Oct 9, 2024
ad5c717
testing
NoahStapp Oct 9, 2024
c3a5619
testing
NoahStapp Oct 9, 2024
0c4350e
testing
NoahStapp Oct 9, 2024
adcc11d
testing
NoahStapp Oct 9, 2024
92eafbf
testing
NoahStapp Oct 9, 2024
7d4495d
testing
NoahStapp Oct 9, 2024
4424bfa
testing
NoahStapp Oct 9, 2024
a7eb209
testing
NoahStapp Oct 9, 2024
56c33c1
testing
NoahStapp Oct 9, 2024
5105652
testing
NoahStapp Oct 9, 2024
b3015f2
testing
NoahStapp Oct 9, 2024
5d2996a
testing
NoahStapp Oct 9, 2024
4d41081
testing
NoahStapp Oct 9, 2024
9236b99
testing
NoahStapp Oct 9, 2024
871f84d
testing
NoahStapp Oct 9, 2024
8c70547
testing
NoahStapp Oct 9, 2024
21e1842
testing
NoahStapp Oct 9, 2024
ac1f6eb
testing
NoahStapp Oct 9, 2024
80818b5
testing
NoahStapp Oct 9, 2024
9231b94
testing
NoahStapp Oct 9, 2024
b1c2b83
testing
NoahStapp Oct 9, 2024
8f4fde2
testing
NoahStapp Oct 9, 2024
b40ae85
testing
NoahStapp Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ if [ -z "$GREEN_FRAMEWORK" ]; then
# Use --capture=tee-sys so pytest prints test output inline:
# https://docs.pytest.org/en/stable/how-to/capture-stdout-stderr.html
if [ -z "$TEST_SUITES" ]; then
python -m pytest -v --capture=tee-sys --durations=5 --maxfail=10 $TEST_ARGS
python -m pytest -v --capture=tee-sys --durations=5 $TEST_ARGS
else
python -m pytest -v --capture=tee-sys --durations=5 --maxfail=10 -m $TEST_SUITES $TEST_ARGS
python -m pytest -v --capture=tee-sys --durations=5 -m $TEST_SUITES $TEST_ARGS
fi
else
python green_framework_test.py $GREEN_FRAMEWORK -v $TEST_ARGS
Expand Down
2 changes: 2 additions & 0 deletions pymongo/asynchronous/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ async def _insert_command(
) -> None:
if bypass_doc_val:
command["bypassDocumentValidation"] = True
print("Running _insert_command")

result = await conn.command(
self._database.name,
Expand Down Expand Up @@ -886,6 +887,7 @@ async def insert_one(
common.validate_is_document_type("document", document)
if not (isinstance(document, RawBSONDocument) or "_id" in document):
document["_id"] = ObjectId() # type: ignore[index]
print("Running insert_one")

write_concern = self._write_concern_for(session)
return InsertOneResult(
Expand Down
8 changes: 6 additions & 2 deletions pymongo/asynchronous/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ async def _list_collections(
AsyncCollection[MutableMapping[str, Any]],
self.get_collection("$cmd", read_preference=read_preference),
)
print("Calling _list_collections")
cmd = {"listCollections": 1, "cursor": {}}
cmd.update(kwargs)
async with self._client._tmp_session(session, close=False) as tmp_session:
Expand Down Expand Up @@ -1125,13 +1126,14 @@ async def _list_collections_helper(

.. versionadded:: 3.6
"""
print("Calling _list_collections_helper")
if filter is not None:
kwargs["filter"] = filter
read_pref = (session and session._txn_read_preference()) or ReadPreference.PRIMARY
if comment is not None:
kwargs["comment"] = comment

async def _cmd(
async def _cmd_list_collections(
session: Optional[AsyncClientSession],
_server: Server,
conn: AsyncConnection,
Expand All @@ -1141,8 +1143,9 @@ async def _cmd(
conn, session, read_preference=read_preference, **kwargs
)

print("Calling _list_collections_helper _cmd")
return await self._client._retryable_read(
_cmd, read_pref, session, operation=_Op.LIST_COLLECTIONS
_cmd_list_collections, read_pref, session, operation=_Op.LIST_COLLECTIONS
)

async def list_collections(
Expand Down Expand Up @@ -1171,6 +1174,7 @@ async def list_collections(

.. versionadded:: 3.6
"""
print("Calling list_collections")
return await self._list_collections_helper(session, filter, comment, **kwargs)

async def _list_collection_names(
Expand Down
63 changes: 49 additions & 14 deletions pymongo/asynchronous/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import contextlib
import enum
import socket
import traceback
import uuid
import weakref
from copy import deepcopy
Expand Down Expand Up @@ -142,6 +143,37 @@ def __init__(
self.opts = opts
self._spawned = False

async def _async_kms_request(
self,
kms_context: MongoCryptKmsContext,
host: str,
port: Optional[int],
opts: PoolOptions,
message: bytes,
) -> None:
from pymongo.network_layer import async_receive_data_socket # type: ignore[attr-defined]

try:
conn = await _configured_socket((host, port), opts)
try:
await async_sendall(conn, message)
while kms_context.bytes_needed > 0:
# CSOT: update timeout.
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = await async_receive_data_socket(conn, kms_context.bytes_needed)
kms_context.feed(data)
except OSError as err:
raise OSError("KMS connection closed") from err
except BLOCKING_IO_ERRORS:
raise socket.timeout("timed out") from None
finally:
conn.close()
except (PyMongoError, MongoCryptError):
raise # Propagate pymongo errors directly.
except Exception as error:
# Wrap I/O errors in PyMongo exceptions.
_raise_connection_failure((host, port), error)

async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
"""Complete a KMS request.

Expand Down Expand Up @@ -174,20 +206,23 @@ async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
)
host, port = parse_host(endpoint, _HTTPS_PORT)
try:
conn = await _configured_socket((host, port), opts)
try:
await async_sendall(conn, message)
while kms_context.bytes_needed > 0:
# CSOT: update timeout.
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = conn.recv(kms_context.bytes_needed)
if not data:
raise OSError("KMS connection closed")
kms_context.feed(data)
except BLOCKING_IO_ERRORS:
raise socket.timeout("timed out") from None
finally:
conn.close()
if _IS_SYNC:
conn = await _configured_socket((host, port), opts)
try:
await async_sendall(conn, message)
while kms_context.bytes_needed > 0:
# CSOT: update timeout.
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = conn.recv(kms_context.bytes_needed)
if not data:
raise OSError("KMS connection closed")
kms_context.feed(data)
except BLOCKING_IO_ERRORS:
raise socket.timeout("timed out") from None
finally:
conn.close()
else:
await self._async_kms_request(kms_context, host, port, opts, message)
except (PyMongoError, MongoCryptError):
raise # Propagate pymongo errors directly.
except Exception as error:
Expand Down
29 changes: 28 additions & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import contextlib
import os
import threading
import traceback
import warnings
import weakref
from collections import defaultdict
Expand Down Expand Up @@ -2565,7 +2567,12 @@ async def run(self) -> T:
while True:
self._check_last_error(check_csot=True)
try:
return await self._read() if self._is_read else await self._write()
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"{threading.current_thread().name} -- Retryable read for {self._operation}")
res = await self._read() if self._is_read else await self._write()
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Result for {self._operation}: {res}")
return res
except ServerSelectionTimeoutError:
# The application may think the write was never attempted
# if we raise ServerSelectionTimeoutError on the retry
Expand All @@ -2574,8 +2581,12 @@ async def run(self) -> T:
# A ServerSelectionTimeoutError error indicates that there may
# be a persistent outage. Attempting to retry in this case will
# most likely be a waste of time.
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise
except PyMongoError as exc:
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Error for {self._operation}: {exc}, {traceback.print_exception(exc)}")
# Execute specialized catch on read
if self._is_read:
if isinstance(exc, (ConnectionFailure, OperationFailure)):
Expand All @@ -2585,15 +2596,21 @@ async def run(self) -> T:
isinstance(exc, OperationFailure)
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
):
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise
self._retrying = True
self._last_error = exc
else:
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise

# Specialized catch on write operation
if not self._is_read:
if not self._retryable:
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise
if isinstance(exc, ClientBulkWriteException) and exc.error:
retryable_write_error_exc = isinstance(
Expand All @@ -2606,8 +2623,12 @@ async def run(self) -> T:
await self._session._unpin()
if not retryable_write_error_exc or self._is_not_eligible_for_retry():
if exc.has_error_label("NoWritesPerformed") and self._last_error:
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise self._last_error from exc
else:
if self._operation in (_Op.LIST_COLLECTIONS, _Op.INSERT):
print(f"Raising error for {self._operation} at {traceback.print_stack(limit=1)}")
raise
if self._bulk:
self._bulk.retrying = True
Expand Down Expand Up @@ -2670,10 +2691,12 @@ async def _write(self) -> T:

:return: Output for func()'s call
"""
print(f"Calling _write for {self._func.__name__}")
try:
max_wire_version = 0
is_mongos = False
self._server = await self._get_server()
print(f"Got server for _write for {self._func.__name__}")
async with self._client._checkout(self._server, self._session) as conn:
max_wire_version = conn.max_wire_version
sessions_supported = (
Expand All @@ -2689,6 +2712,7 @@ async def _write(self) -> T:
self._retryable = False
return await self._func(self._session, conn, self._retryable) # type: ignore
except PyMongoError as exc:
print(f"Got error for _write for {self._func.__name__}")
if not self._retryable:
raise
# Add the RetryableWriteError label, if applicable.
Expand All @@ -2700,12 +2724,15 @@ async def _read(self) -> T:

:return: Output for func()'s call
"""
print(f"Calling _read for {self._func.__name__}")
self._server = await self._get_server()
assert self._read_pref is not None, "Read Preference required on read calls"
print(f"Got server for _read for {self._func.__name__}")
async with self._client._conn_from_server(self._read_pref, self._server, self._session) as (
conn,
read_pref,
):
print(f"Got conn for _read for {self._func.__name__}")
if self._retrying and not self._retryable:
self._check_last_error()
return await self._func(self._session, self._server, conn, read_pref) # type: ignore
Expand Down
5 changes: 5 additions & 0 deletions pymongo/asynchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
import logging
import threading
import time
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -117,6 +118,8 @@ async def command(
name = next(iter(spec))
ns = dbname + ".$cmd"
speculative_hello = False
if name == "insertOne":
print(f"Running insertOne with {spec}")

# Publish the original command document, perhaps with lsid and $clusterTime.
orig = spec
Expand All @@ -132,6 +135,8 @@ async def command(
spec["collation"] = collation

publish = listeners is not None and listeners.enabled_for_commands
if name == "listCollections":
print(f"{threading.current_thread().name} -- Publishing is {publish} listeners: {listeners} {listeners.enabled_for_commands if listeners is not None else 'NONE'}!")
start = datetime.datetime.now()
if publish:
speculative_hello = _is_speculative_authenticate(name, spec)
Expand Down
2 changes: 2 additions & 0 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,10 +838,12 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket
sock.settimeout(timeout)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
_set_keepalive_times(sock)
print(f"Socket: {sock}, {sock.timeout}")
sock.connect(sa)
return sock
except OSError as e:
err = e
print(f"Socket err: {e}")
sock.close()

if err is not None:
Expand Down
26 changes: 25 additions & 1 deletion pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _is_ready(fut: Future) -> None:
loop.remove_writer(fd)

async def _async_receive_ssl(
conn: _sslConn, length: int, loop: AbstractEventLoop
conn: _sslConn, length: int, loop: AbstractEventLoop, once: Optional[bool] = False
) -> memoryview:
mv = memoryview(bytearray(length))
total_read = 0
Expand All @@ -145,6 +145,8 @@ def _is_ready(fut: Future) -> None:
read = conn.recv_into(mv[total_read:])
if read == 0:
raise OSError("connection closed")
if once:
return mv[:read]
total_read += read
except BLOCKING_IO_ERRORS as exc:
fd = conn.fileno()
Expand Down Expand Up @@ -275,6 +277,28 @@ async def async_receive_data(
sock.settimeout(sock_timeout)


async def async_receive_data_socket(
sock: Union[socket.socket, _sslConn], length: int
) -> memoryview:
sock_timeout = sock.gettimeout()
timeout = sock_timeout

sock.settimeout(0.0)
loop = asyncio.get_event_loop()
try:
if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
return await asyncio.wait_for(
_async_receive_ssl(sock, length, loop, once=True), # type: ignore[arg-type]
timeout=timeout,
)
else:
return await asyncio.wait_for(_async_receive(sock, length, loop), timeout=timeout) # type: ignore[arg-type]
except asyncio.TimeoutError as err:
raise socket.timeout("timed out") from err
finally:
sock.settimeout(sock_timeout)


async def _async_receive(conn: socket.socket, length: int, loop: AbstractEventLoop) -> memoryview:
mv = memoryview(bytearray(length))
bytes_read = 0
Expand Down
Loading
Loading