Skip to content

PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907

Open
blink1073 wants to merge 21 commits into
mongodb:masterfrom
blink1073:PYTHON-5846
Open

PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907
blink1073 wants to merge 21 commits into
mongodb:masterfrom
blink1073:PYTHON-5846

Conversation

@blink1073

@blink1073 blink1073 commented Jul 1, 2026

Copy link
Copy Markdown
Member

PYTHON-5846

Changes in this PR

Consolidates all APM event publishing and structured logging into pymongo/_telemetry.py, giving OpenTelemetry support a single place to hook into. Adds four new telemetry classes and one helper following the _CommandTelemetry pattern from #2891:

  • _CmapTelemetry — connection pool and connection lifecycle events, owned by Pool
  • _HeartbeatTelemetry — server heartbeat events, owned per-check by Monitor
  • _SdamTelemetry — topology and server description change events, owned by Topology and Server
  • _ServerSelectionTelemetry — server selection log entries (log-only per spec), constructed per select_server call
  • log_command_retry — retry log entries for retryable reads and writes

Test Plan

Covered by existing test suites (test_connection_monitoring.py, test_heartbeat_monitoring.py, test_sdam_monitoring_spec.py, test_connection_logging.py, test_server_selection_logging.py, unified format spec tests). No new public API.

Checklist

Checklist for Author

  • Did you update the changelog (if necessary)?
  • Is there test coverage?
  • Is any followup work tracked in a JIRA ticket? If so, add link(s).

Checklist for Reviewer

  • Does the title of the PR reference a JIRA Ticket?
  • Do you fully understand the implementation? (Would you be comfortable explaining how this code works to someone else?)
  • Is all relevant documentation (README or docstring) updated?

blink1073 added 10 commits June 30, 2026 08:27
…emetry.py

Add _CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry classes to
eliminate the repetitive if-enabled_for_cmap / if-logger.isEnabledFor
boilerplate spread across pool.py, monitor.py, topology.py, and server.py.
- Rename _CmapTelemetry._log -> _emit_log for consistency with _CommandTelemetry
- Rename _HeartbeatTelemetry.apm_started -> started to match started/succeeded/failed lifecycle
- Rename _HeartbeatTelemetry.log_started -> emit_started_log to signal it is the deferred log-only half
- Replace is_sdam flag with separate publish/log bool parameters on _CmapTelemetry
_CmapTelemetry now owns connection-creation and checkout durations:
connection_created() starts the clock, connection_ready() computes it;
checkout_started() starts the clock, checkout_succeeded/failed() compute it.

_HeartbeatTelemetry.started() starts the clock; failed() computes its
own duration instead of receiving it as a parameter.

Removes checkout_started_time from Pool._get_conn, _raise_if_not_ready,
and _raise_wait_queue_timeout, and removes AsyncConnection.creation_time.
Use response.awaitable (ground truth from wire) rather than self._awaited
(pre-computed before response) for ServerHeartbeatSucceededEvent.awaited.
Also fix stale method references in _HeartbeatTelemetry docstring.
_CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry now compute
_should_publish and _should_log as properties that check listener state
and logger level at call time rather than caching derived booleans in
the constructor.
@codecov-commenter

codecov-commenter commented Jul 1, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 98.92183% with 4 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
pymongo/_telemetry.py 98.23% 1 Missing and 3 partials ⚠️

📢 Thoughts on this report? Let us know!

@blink1073 blink1073 requested a review from NoahStapp July 1, 2026 17:47
@blink1073 blink1073 marked this pull request as ready for review July 1, 2026 17:47
@blink1073 blink1073 requested a review from a team as a code owner July 1, 2026 17:47
Comment thread pymongo/asynchronous/mongo_client.py Outdated
Comment thread pymongo/asynchronous/mongo_client.py Outdated
clientId=self._client._topology_id,
commandName=self._operation,
operationId=self._operation_id,
log_command_retry(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a helper method here to avoid duplicating the entire log_command_retry call except for is_write between this and the next usage?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread pymongo/asynchronous/monitor.py Outdated
from pymongo.hello import Hello
from pymongo.lock import _async_create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
from pymongo.logger import _SDAM_LOGGER, _debug_log

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They were used, I missed one direct usage, now moved to _telemetry.py

Comment thread pymongo/asynchronous/pool.py Outdated
Comment thread pymongo/asynchronous/pool.py
Comment thread pymongo/_telemetry.py
Comment thread pymongo/_telemetry.py Outdated
should_publish = self._should_publish
# Always record start time: logging or publishing may be enabled by the time
# checkout_succeeded or checkout_failed is called to compute the duration.
self._checkout_start = time.monotonic()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is shared across all connections in the pool and is called outside of a lock, introducing a possible race condition that can cause timing inconsistencies.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to keep the timing info in the pool class as it was before.

Comment thread pymongo/_telemetry.py Outdated
def connection_created(self, conn_id: int) -> None:
# Always record start time: logging or publishing may be enabled by the time
# connection_ready is called to compute the duration.
self._conn_created_start = time.monotonic()

@NoahStapp NoahStapp Jul 1, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same race condition here as above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Comment thread pymongo/asynchronous/topology.py Outdated
clientId=self.description._topology_settings._topology_id,
failure=self._error_message(selector),
)
ss.failed(self._error_message(selector))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will log self.description at the time of creation instead of at the time of failure. If SDAM updates self.description while this loop is waiting and before it calls ss.failed, we log the old description instead of the new.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

blink1073 and others added 2 commits July 1, 2026 13:32
Co-authored-by: Noah Stapp <noah@noahstapp.com>
- Fix race condition in _CmapTelemetry: remove shared _conn_created_start
  and _checkout_start slots; connection_ready now takes creation_time from
  AsyncConnection.creation_time, checkout_started returns the start time
  and checkout_succeeded/failed accept it as a parameter
- Move SRV monitor failure log into _telemetry.py as log_srv_monitor_failure
- Add _log_retry helper to _ClientConnectionRetryable to deduplicate
  log_command_retry call sites
- Fix _ServerSelectionTelemetry.failed to accept live topology_description
  at failure time instead of using the stale snapshot from construction
@blink1073 blink1073 requested a review from NoahStapp July 1, 2026 19:06
Comment thread pymongo/asynchronous/pool.py Outdated
await self.checkin(conn)

def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None:
def _raise_if_not_ready(self, emit_event: bool, checkout_start: float) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional re-ordering + naming of checkout_started_time?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored to original signature

Comment thread pymongo/asynchronous/pool.py Outdated

async def _get_conn(
self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None
self, handler: Optional[_MongoClientErrorHandler] = None, checkout_start: float = 0.0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Also, does a default of 0 make sense here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored to original signature

Comment thread pymongo/asynchronous/pool.py Outdated
self._raise_wait_queue_timeout(checkout_started_time)
self._raise_if_not_ready(checkout_started_time, emit_event=False)
self._raise_wait_queue_timeout(checkout_start)
self._raise_if_not_ready(emit_event=False, checkout_start=checkout_start)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these uses too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread pymongo/_telemetry.py
**extra,
)

def pool_created(self, non_default_options: dict[str, Any]) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need docstrings on these public methods for consistency with the other telemetry classes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added :allthedocstrings:

topologyDescription=self.description,
clientId=self.description._topology_settings._topology_id,
)
ss = _ServerSelectionTelemetry(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass this through as an argument from select_server so we don't have to make two _ServerSelectionTelemetry per call?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread pymongo/asynchronous/server.py Outdated
Comment on lines 67 to 69
@@ -74,6 +69,7 @@ def __init__(
self._events = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cut all three of these now that _SdamTelemetry exists?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread pymongo/_telemetry.py Outdated
remainingTimeMS=remaining_time_ms,
)

def failed(self, failure: str, topology_description: Any = None) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topology_description appears to always be passed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

- Restore original _raise_if_not_ready and _get_conn parameter names/order
- Add docstrings to all public methods on _CmapTelemetry and _SdamTelemetry
- Remove redundant _publish/_listener/_events attrs from Server.__init__
- Avoid creating two _ServerSelectionTelemetry per select_server call by
  threading ss through select_servers and _select_server
- Make _ServerSelectionTelemetry.failed topology_description a required arg
- Move SRV monitor failure log to telemetry.py (log_srv_monitor_failure)
@blink1073 blink1073 requested a review from NoahStapp July 1, 2026 20:32
Comment thread pymongo/asynchronous/topology.py Outdated
serverHost=server.description.address[0],
serverPort=server.description.address[1],
)
ss.succeeded(server.description.address[0], server.description.address[1])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here to the ss.failed() issue earlier, should we pass the current description to be logged instead of the potentially stale one saved to ss at the start of selection?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the existing internal _CommandTelemetry pattern to CMAP, heartbeat, SDAM, and server-selection telemetry by consolidating structured logging and APM event publishing into pymongo/_telemetry.py, creating a single integration point for future OpenTelemetry hooks.

Changes:

  • Introduces internal telemetry helpers (_CmapTelemetry, _HeartbeatTelemetry, _SdamTelemetry, _ServerSelectionTelemetry) and logging helpers (log_srv_monitor_failure, log_command_retry) in pymongo/_telemetry.py.
  • Refactors sync/async Topology, Pool, Monitor, Server, and MongoClient codepaths to delegate telemetry/logging behavior to the consolidated helpers.
  • Updates affected tests/utilities to account for Topology.select_servers() / _select_server() now returning an additional telemetry value.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
test/utils.py Updates helper(s) to unpack the new Topology._select_server / select_servers return shape.
test/utils_selection_tests.py Updates server selection scenario harness to unpack select_servers results.
test/test_read_preferences.py Adjusts nearest-mode test to iterate servers from the updated select_servers return shape.
test/test_mongos_load_balancing.py Adjusts helper to iterate servers from the updated select_servers return shape.
test/asynchronous/utils.py Async equivalent updates for unpacking updated topology selection returns.
test/asynchronous/utils_selection_tests.py Async equivalent updates for server selection scenario harness.
test/asynchronous/test_read_preferences.py Async equivalent updates for nearest-mode test iteration.
test/asynchronous/test_mongos_load_balancing.py Async equivalent updates for helper iteration.
pymongo/synchronous/topology.py Routes SDAM + server-selection structured logging/APM through _telemetry.py; changes select_servers to return telemetry info.
pymongo/synchronous/server.py Routes server closed events/logging through _SdamTelemetry.
pymongo/synchronous/pool.py Routes CMAP structured logging/APM through _CmapTelemetry across pool/connection lifecycle.
pymongo/synchronous/monitor.py Routes heartbeat structured logging/APM through _HeartbeatTelemetry and SRV monitor failure logging helper.
pymongo/synchronous/mongo_client.py Routes retry log entries through log_command_retry.
pymongo/asynchronous/topology.py Async equivalent of telemetry consolidation and select_servers return change.
pymongo/asynchronous/server.py Async equivalent of routing server close telemetry through _SdamTelemetry.
pymongo/asynchronous/pool.py Async equivalent of routing CMAP telemetry through _CmapTelemetry.
pymongo/asynchronous/monitor.py Async equivalent of routing heartbeat + SRV failure telemetry through _telemetry.py.
pymongo/asynchronous/mongo_client.py Async equivalent of routing retry log entries through log_command_retry.
pymongo/_telemetry.py Centralizes CMAP, heartbeat, SDAM, server-selection telemetry classes and retry/SRV logging helpers.

deprioritized_servers: Optional[list[Server]] = None,
) -> list[Server]:
) -> tuple[list[Server], _ServerSelectionTelemetry]:
"""Return a list of Servers matching selector, or time out.
deprioritized_servers: Optional[list[Server]] = None,
) -> list[Server]:
) -> tuple[list[Server], _ServerSelectionTelemetry]:
"""Return a list of Servers matching selector, or time out.
Comment thread pymongo/_telemetry.py
Comment on lines +413 to +417
# Cached at construction: this object is short-lived (one heartbeat check) so
# listener registration and logging level are stable for its lifetime.
self._should_publish = listeners is not None and listeners.enabled_for_server_heartbeat
self._should_log = _SDAM_LOGGER.isEnabledFor(logging.DEBUG)

Comment thread pymongo/_telemetry.py

def __init__(
self,
topology_id: ObjectId,
Comment on lines +67 to +68
_events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc]
self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type]
Comment on lines +67 to +68
_events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc]
self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type]
Comment on lines 293 to +297
latencies = ", ".join(
"%s: %sms" % (server.description.address, server.description.round_trip_time)
for server in (c._get_topology()).select_servers(readable_server_selector, _Op.TEST)
for server in ((c._get_topology()).select_servers(readable_server_selector, _Op.TEST))[
0
]
Comment on lines 311 to +315
latencies = ", ".join(
"%s: %sms" % (server.description.address, server.description.round_trip_time)
for server in await (await c._get_topology()).select_servers(
readable_server_selector, _Op.TEST
)
for server in (
await (await c._get_topology()).select_servers(readable_server_selector, _Op.TEST)
)[0]
Comment on lines 61 to 65
def writable_addresses(topology):
return {
server.description.address
for server in topology.select_servers(writable_server_selector, _Op.TEST)
for server in (topology.select_servers(writable_server_selector, _Op.TEST))[0]
}
Comment on lines 61 to 65
async def writable_addresses(topology):
return {
server.description.address
for server in await topology.select_servers(writable_server_selector, _Op.TEST)
for server in (await topology.select_servers(writable_server_selector, _Op.TEST))[0]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants