Skip to content
18 changes: 10 additions & 8 deletions pymongo/asynchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,18 @@ def add_update(
self,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
upsert: bool = False,
multi: bool,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create an update document and add it to the list of ops."""
validate_ok_for_update(update)
cmd: dict[str, Any] = dict( # noqa: C406
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
)
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand All @@ -173,14 +173,16 @@ def add_replace(
self,
selector: Mapping[str, Any],
replacement: Mapping[str, Any],
upsert: bool = False,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create a replace document and add it to the list of ops."""
validate_ok_for_replace(replacement)
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
cmd: dict[str, Any] = {"q": selector, "u": replacement}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand All @@ -200,7 +202,7 @@ def add_delete(
hint: Union[str, dict[str, Any], None] = None,
) -> None:
"""Create a delete document and add it to the list of ops."""
cmd = {"q": selector, "limit": limit}
cmd: dict[str, Any] = {"q": selector, "limit": limit}
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand Down
16 changes: 1 addition & 15 deletions pymongo/asynchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,13 @@ def __init__(
self.bypass_doc_val = bypass_document_validation
self.comment = comment
self.verbose_results = verbose_results

self.ops: list[tuple[str, Mapping[str, Any]]] = []
self.namespaces: list[str] = []
self.idx_offset: int = 0
self.total_ops: int = 0

self.executed = False
self.uses_upsert = False
self.uses_collation = False
self.uses_array_filters = False
self.uses_hint_update = False
self.uses_hint_delete = False
self.uses_sort = False

self.is_retryable = self.client.options.retry_writes
self.retrying = False
self.started_retryable_write = False
Expand All @@ -144,7 +137,7 @@ def add_update(
namespace: str,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
multi: bool,
upsert: Optional[bool] = None,
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
Expand All @@ -160,19 +153,16 @@ def add_update(
"multi": multi,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if array_filters is not None:
self.uses_array_filters = True
cmd["arrayFilters"] = array_filters
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
if multi:
# A bulk_write containing an update_many is not retryable.
Expand Down Expand Up @@ -200,16 +190,13 @@ def add_replace(
"multi": False,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
self.ops.append(("replace", cmd))
self.namespaces.append(namespace)
Expand All @@ -226,7 +213,6 @@ def add_delete(
"""Create a delete document and add it to the list of ops."""
cmd = {"delete": -1, "filter": selector, "multi": multi}
if hint is not None:
self.uses_hint_delete = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
Expand Down
5 changes: 5 additions & 0 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
self._listeners = self._settings._pool_options._event_listeners
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
self._cancel_context: Optional[_CancellationContext] = None
self._conn_id: Optional[int] = None
self._rtt_monitor = _RttMonitor(
topology,
topology_settings,
Expand Down Expand Up @@ -243,6 +244,7 @@ async def _check_server(self) -> ServerDescription:

Returns a ServerDescription.
"""
self._conn_id = None
start = time.monotonic()
try:
try:
Expand Down Expand Up @@ -272,6 +274,7 @@ async def _check_server(self) -> ServerDescription:
awaited=awaited,
durationMS=duration * 1000,
failure=error,
driverConnectionId=self._conn_id,
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
)
await self._reset_connection()
Expand Down Expand Up @@ -314,6 +317,8 @@ async def _check_once(self) -> ServerDescription:
)

self._cancel_context = conn.cancel_context
# Record the connection id so we can later attach it to the failed log message.
self._conn_id = conn.id
response, round_trip_time = await self._check_with_socket(conn)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)
Expand Down
4 changes: 2 additions & 2 deletions pymongo/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def __init__(
self,
filter: Mapping[str, Any],
replacement: Union[_DocumentType, RawBSONDocument],
upsert: bool = False,
upsert: Optional[bool] = None,
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
namespace: Optional[str] = None,
Expand Down Expand Up @@ -693,7 +693,7 @@ def _add_to_bulk(self, bulkobj: _AgnosticBulk) -> None:
self._filter,
self._doc,
True,
bool(self._upsert),
self._upsert,
collation=validate_collation_or_none(self._collation),
array_filters=self._array_filters,
hint=self._hint,
Expand Down
18 changes: 10 additions & 8 deletions pymongo/synchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,18 @@ def add_update(
self,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
upsert: bool = False,
multi: bool,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create an update document and add it to the list of ops."""
validate_ok_for_update(update)
cmd: dict[str, Any] = dict( # noqa: C406
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
)
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand All @@ -173,14 +173,16 @@ def add_replace(
self,
selector: Mapping[str, Any],
replacement: Mapping[str, Any],
upsert: bool = False,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create a replace document and add it to the list of ops."""
validate_ok_for_replace(replacement)
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
cmd: dict[str, Any] = {"q": selector, "u": replacement}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand All @@ -200,7 +202,7 @@ def add_delete(
hint: Union[str, dict[str, Any], None] = None,
) -> None:
"""Create a delete document and add it to the list of ops."""
cmd = {"q": selector, "limit": limit}
cmd: dict[str, Any] = {"q": selector, "limit": limit}
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
Expand Down
16 changes: 1 addition & 15 deletions pymongo/synchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,13 @@ def __init__(
self.bypass_doc_val = bypass_document_validation
self.comment = comment
self.verbose_results = verbose_results

self.ops: list[tuple[str, Mapping[str, Any]]] = []
self.namespaces: list[str] = []
self.idx_offset: int = 0
self.total_ops: int = 0

self.executed = False
self.uses_upsert = False
self.uses_collation = False
self.uses_array_filters = False
self.uses_hint_update = False
self.uses_hint_delete = False
self.uses_sort = False

self.is_retryable = self.client.options.retry_writes
self.retrying = False
self.started_retryable_write = False
Expand All @@ -144,7 +137,7 @@ def add_update(
namespace: str,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
multi: bool,
upsert: Optional[bool] = None,
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
Expand All @@ -160,19 +153,16 @@ def add_update(
"multi": multi,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if array_filters is not None:
self.uses_array_filters = True
cmd["arrayFilters"] = array_filters
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
if multi:
# A bulk_write containing an update_many is not retryable.
Expand Down Expand Up @@ -200,16 +190,13 @@ def add_replace(
"multi": False,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
self.ops.append(("replace", cmd))
self.namespaces.append(namespace)
Expand All @@ -226,7 +213,6 @@ def add_delete(
"""Create a delete document and add it to the list of ops."""
cmd = {"delete": -1, "filter": selector, "multi": multi}
if hint is not None:
self.uses_hint_delete = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
Expand Down
5 changes: 5 additions & 0 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
self._listeners = self._settings._pool_options._event_listeners
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
self._cancel_context: Optional[_CancellationContext] = None
self._conn_id: Optional[int] = None
self._rtt_monitor = _RttMonitor(
topology,
topology_settings,
Expand Down Expand Up @@ -243,6 +244,7 @@ def _check_server(self) -> ServerDescription:

Returns a ServerDescription.
"""
self._conn_id = None
start = time.monotonic()
try:
try:
Expand Down Expand Up @@ -272,6 +274,7 @@ def _check_server(self) -> ServerDescription:
awaited=awaited,
durationMS=duration * 1000,
failure=error,
driverConnectionId=self._conn_id,
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
)
self._reset_connection()
Expand Down Expand Up @@ -314,6 +317,8 @@ def _check_once(self) -> ServerDescription:
)

self._cancel_context = conn.cancel_context
# Record the connection id so we can later attach it to the failed log message.
self._conn_id = conn.id
response, round_trip_time = self._check_with_socket(conn)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)
Expand Down
4 changes: 2 additions & 2 deletions test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,8 @@ def format_logs(log_list):
if log.module == "ocsp_support":
continue
data = json_util.loads(log.getMessage())
client = data.pop("clientId") if "clientId" in data else data.pop("topologyId")
client_to_log[client].append(
client_id = data.get("clientId", data.get("topologyId"))
client_to_log[client_id].append(
{
"level": log.levelname.lower(),
"component": log.name.replace("pymongo.", "", 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down Expand Up @@ -398,6 +399,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down Expand Up @@ -439,6 +441,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down Expand Up @@ -589,6 +592,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down
2 changes: 2 additions & 0 deletions test/discovery_and_monitoring/unified/logging-sharded.json
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down Expand Up @@ -475,6 +476,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
Expand Down
Loading
Loading