|
33 | 33 | async_client_context,
|
34 | 34 | unittest,
|
35 | 35 | )
|
| 36 | +from test.asynchronous.helpers import client_knobs |
36 | 37 | from test.utils import (
|
37 | 38 | EventListener,
|
38 | 39 | ExceptionCatchingThread,
|
| 40 | + HeartbeatEventListener, |
39 | 41 | OvertCommandListener,
|
40 | 42 | async_wait_until,
|
41 | 43 | )
|
@@ -1133,12 +1135,10 @@ async def asyncSetUp(self):
|
1133 | 1135 | if "$clusterTime" not in (await async_client_context.hello):
|
1134 | 1136 | raise SkipTest("$clusterTime not supported")
|
1135 | 1137 |
|
| 1138 | + # Sessions prose test: 3) $clusterTime in commands |
1136 | 1139 | async def test_cluster_time(self):
|
1137 | 1140 | listener = SessionTestListener()
|
1138 |
| - # Prevent heartbeats from updating $clusterTime between operations. |
1139 |
| - client = await self.async_rs_or_single_client( |
1140 |
| - event_listeners=[listener], heartbeatFrequencyMS=999999 |
1141 |
| - ) |
| 1141 | + client = await self.async_rs_or_single_client(event_listeners=[listener]) |
1142 | 1142 | collection = client.pymongo_test.collection
|
1143 | 1143 | # Prepare for tests of find() and aggregate().
|
1144 | 1144 | await collection.insert_many([{} for _ in range(10)])
|
@@ -1217,6 +1217,40 @@ async def aggregate():
|
1217 | 1217 | f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
|
1218 | 1218 | )
|
1219 | 1219 |
|
| 1220 | + # Sessions prose test: 20) $clusterTime in commands |
| 1221 | + async def test_cluster_time_not_used_by_sdam(self): |
| 1222 | + heartbeat_listener = HeartbeatEventListener() |
| 1223 | + cmd_listener = OvertCommandListener() |
| 1224 | + with client_knobs(min_heartbeat_interval=0.01): |
| 1225 | + c1 = await self.async_single_client( |
| 1226 | + event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10 |
| 1227 | + ) |
| 1228 | + cluster_time = (await c1.admin.command({"ping": 1}))["$clusterTime"] |
| 1229 | + self.assertEqual(c1._topology.max_cluster_time(), cluster_time) |
| 1230 | + |
| 1231 | + # Advance the server's $clusterTime by performing an insert via another client. |
| 1232 | + await self.db.test.insert_one({"advance": "$clusterTime"}) |
| 1233 | + # Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events. |
| 1234 | + heartbeat_listener.reset() |
| 1235 | + |
| 1236 | + async def next_heartbeat(): |
| 1237 | + events = heartbeat_listener.events |
| 1238 | + for i in range(len(events) - 1): |
| 1239 | + if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent): |
| 1240 | + if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent): |
| 1241 | + return True |
| 1242 | + return False |
| 1243 | + |
| 1244 | + await async_wait_until( |
| 1245 | + next_heartbeat, "never found pair of heartbeat started + succeeded events" |
| 1246 | + ) |
| 1247 | + # Assert that C1's max $clusterTime is still the same and has not been updated by SDAM. |
| 1248 | + cmd_listener.reset() |
| 1249 | + await c1.admin.command({"ping": 1}) |
| 1250 | + started = cmd_listener.started_events[0] |
| 1251 | + self.assertEqual(started.command_name, "ping") |
| 1252 | + self.assertEqual(started.command["$clusterTime"], cluster_time) |
| 1253 | + |
1220 | 1254 |
|
1221 | 1255 | if __name__ == "__main__":
|
1222 | 1256 | unittest.main()
|
0 commit comments