Skip to content

Commit 1c29c1a

Browse files
authored
PYTHON-2299 Add the "awaited" field to heartbeat events (#457)
1 parent 5d92b2f commit 1c29c1a

File tree

4 files changed

+64
-20
lines changed

4 files changed

+64
-20
lines changed

pymongo/monitor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,13 @@ def _check_server(self):
212212
except ReferenceError:
213213
raise
214214
except Exception as error:
215-
address = self._server_description.address
215+
sd = self._server_description
216+
address = sd.address
216217
duration = _time() - start
217218
if self._publish:
219+
awaited = sd.is_server_type_known and sd.topology_version
218220
self._listeners.publish_server_heartbeat_failed(
219-
address, duration, error)
221+
address, duration, error, awaited)
220222
self._reset_connection()
221223
if isinstance(error, _OperationCancelled):
222224
raise
@@ -231,7 +233,6 @@ def _check_once(self):
231233
"""
232234
address = self._server_description.address
233235
if self._publish:
234-
# PYTHON-2299: Add the "awaited" field to heartbeat events.
235236
self._listeners.publish_server_heartbeat_started(address)
236237

237238
if self._cancel_context and self._cancel_context.cancelled:
@@ -246,7 +247,7 @@ def _check_once(self):
246247
self._rtt_monitor.average())
247248
if self._publish:
248249
self._listeners.publish_server_heartbeat_succeeded(
249-
address, round_trip_time, response)
250+
address, round_trip_time, response, response.awaitable)
250251
return sd
251252

252253
def _check_with_socket(self, conn):

pymongo/monitoring.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,12 +1099,13 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
10991099
.. versionadded:: 3.3
11001100
"""
11011101

1102-
__slots__ = ('__duration', '__reply')
1102+
__slots__ = ('__duration', '__reply', '__awaited')
11031103

1104-
def __init__(self, duration, reply, *args):
1105-
super(ServerHeartbeatSucceededEvent, self).__init__(*args)
1104+
def __init__(self, duration, reply, connection_id, awaited=False):
1105+
super(ServerHeartbeatSucceededEvent, self).__init__(connection_id)
11061106
self.__duration = duration
11071107
self.__reply = reply
1108+
self.__awaited = awaited
11081109

11091110
@property
11101111
def duration(self):
@@ -1116,10 +1117,20 @@ def reply(self):
11161117
"""An instance of :class:`~pymongo.ismaster.IsMaster`."""
11171118
return self.__reply
11181119

1120+
@property
1121+
def awaited(self):
1122+
"""Whether the heartbeat was awaited.
1123+
1124+
If true, then :meth:`duration` reflects the sum of the round trip time
1125+
to the server and the time that the server waited before sending a
1126+
response.
1127+
"""
1128+
return self.__awaited
1129+
11191130
def __repr__(self):
1120-
return "<%s %s duration: %s, reply: %s>" % (
1131+
return "<%s %s duration: %s, awaited: %s, reply: %s>" % (
11211132
self.__class__.__name__, self.connection_id,
1122-
self.duration, self.reply)
1133+
self.duration, self.awaited, self.reply)
11231134

11241135

11251136
class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
@@ -1129,12 +1140,13 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
11291140
.. versionadded:: 3.3
11301141
"""
11311142

1132-
__slots__ = ('__duration', '__reply')
1143+
__slots__ = ('__duration', '__reply', '__awaited')
11331144

1134-
def __init__(self, duration, reply, *args):
1135-
super(ServerHeartbeatFailedEvent, self).__init__(*args)
1145+
def __init__(self, duration, reply, connection_id, awaited=False):
1146+
super(ServerHeartbeatFailedEvent, self).__init__(connection_id)
11361147
self.__duration = duration
11371148
self.__reply = reply
1149+
self.__awaited = awaited
11381150

11391151
@property
11401152
def duration(self):
@@ -1146,10 +1158,20 @@ def reply(self):
11461158
"""A subclass of :exc:`Exception`."""
11471159
return self.__reply
11481160

1161+
@property
1162+
def awaited(self):
1163+
"""Whether the heartbeat was awaited.
1164+
1165+
If true, then :meth:`duration` reflects the sum of the round trip time
1166+
to the server and the time that the server waited before sending a
1167+
response.
1168+
"""
1169+
return self.__awaited
1170+
11491171
def __repr__(self):
1150-
return "<%s %s duration: %s, reply: %r>" % (
1172+
return "<%s %s duration: %s, awaited: %s, reply: %r>" % (
11511173
self.__class__.__name__, self.connection_id,
1152-
self.duration, self.reply)
1174+
self.duration, self.awaited, self.reply)
11531175

11541176

11551177
class _EventListeners(object):
@@ -1303,7 +1325,7 @@ def publish_server_heartbeat_started(self, connection_id):
13031325
_handle_exception()
13041326

13051327
def publish_server_heartbeat_succeeded(self, connection_id, duration,
1306-
reply):
1328+
reply, awaited):
13071329
"""Publish a ServerHeartbeatSucceededEvent to all server heartbeat
13081330
listeners.
13091331
@@ -1312,15 +1334,18 @@ def publish_server_heartbeat_succeeded(self, connection_id, duration,
13121334
- `duration`: The execution time of the event in the highest possible
13131335
resolution for the platform.
13141336
- `reply`: The command reply.
1337+
- `awaited`: True if the response was awaited.
13151338
"""
1316-
event = ServerHeartbeatSucceededEvent(duration, reply, connection_id)
1339+
event = ServerHeartbeatSucceededEvent(duration, reply, connection_id,
1340+
awaited)
13171341
for subscriber in self.__server_heartbeat_listeners:
13181342
try:
13191343
subscriber.succeeded(event)
13201344
except Exception:
13211345
_handle_exception()
13221346

1323-
def publish_server_heartbeat_failed(self, connection_id, duration, reply):
1347+
def publish_server_heartbeat_failed(self, connection_id, duration, reply,
1348+
awaited):
13241349
"""Publish a ServerHeartbeatFailedEvent to all server heartbeat
13251350
listeners.
13261351
@@ -1329,8 +1354,10 @@ def publish_server_heartbeat_failed(self, connection_id, duration, reply):
13291354
- `duration`: The execution time of the event in the highest possible
13301355
resolution for the platform.
13311356
- `reply`: The command reply.
1357+
- `awaited`: True if the response was awaited.
13321358
"""
1333-
event = ServerHeartbeatFailedEvent(duration, reply, connection_id)
1359+
event = ServerHeartbeatFailedEvent(duration, reply, connection_id,
1360+
awaited)
13341361
for subscriber in self.__server_heartbeat_listeners:
13351362
try:
13361363
subscriber.failed(event)

test/test_monitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,13 +1466,13 @@ def test_server_heartbeat_event_repr(self):
14661466
self.assertEqual(
14671467
repr(event),
14681468
"<ServerHeartbeatSucceededEvent ('localhost', 27017) "
1469-
"duration: 0.1, reply: {'ok': 1}>")
1469+
"duration: 0.1, awaited: False, reply: {'ok': 1}>")
14701470
event = monitoring.ServerHeartbeatFailedEvent(
14711471
delta, 'ERROR', connection_id)
14721472
self.assertEqual(
14731473
repr(event),
14741474
"<ServerHeartbeatFailedEvent ('localhost', 27017) "
1475-
"duration: 0.1, reply: 'ERROR'>")
1475+
"duration: 0.1, awaited: False, reply: 'ERROR'>")
14761476

14771477
def test_server_event_repr(self):
14781478
server_address = ('localhost', 27017)

test/test_streaming_protocol.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@ def hb_started(event):
160160
return (isinstance(event, monitoring.ServerHeartbeatStartedEvent)
161161
and event.connection_id == address)
162162

163+
def hb_succeeded(event):
164+
return (isinstance(event, monitoring.ServerHeartbeatSucceededEvent)
165+
and event.connection_id == address)
166+
167+
def hb_failed(event):
168+
return (isinstance(event, monitoring.ServerHeartbeatFailedEvent)
169+
and event.connection_id == address)
170+
163171
hb_started_events = hb_listener.matching(hb_started)
164172
# Explanation of the expected heartbeat events:
165173
# Time: event
@@ -178,6 +186,14 @@ def hb_started(event):
178186
# This can be reduced to ~15 after SERVER-49220 is fixed.
179187
self.assertLess(len(hb_started_events), 40)
180188

189+
# Check the awaited flag.
190+
hb_succeeded_events = hb_listener.matching(hb_succeeded)
191+
hb_failed_events = hb_listener.matching(hb_failed)
192+
self.assertFalse(hb_succeeded_events[0].awaited)
193+
self.assertTrue(hb_succeeded_events[1].awaited)
194+
self.assertTrue(hb_failed_events[0].awaited)
195+
self.assertFalse(hb_failed_events[1].awaited)
196+
181197

182198
if __name__ == "__main__":
183199
unittest.main()

0 commit comments

Comments
 (0)