diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index cc516ee822..9099efbf0f 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -222,7 +222,6 @@ def __init__(self, test_class): self._listeners: Dict[str, EventListenerUtil] = {} self._session_lsids: Dict[str, Mapping[str, Any]] = {} self.test: UnifiedSpecTestMixinV1 = test_class - self._cluster_time: Mapping[str, Any] = {} def __contains__(self, item): return item in self._entities @@ -421,13 +420,11 @@ def get_lsid_for_session(self, session_name): # session has been closed. return self._session_lsids[session_name] - async def advance_cluster_times(self) -> None: + async def advance_cluster_times(self, cluster_time) -> None: """Manually synchronize entities when desired""" - if not self._cluster_time: - self._cluster_time = (await self.test.client.admin.command("ping")).get("$clusterTime") for entity in self._entities.values(): - if isinstance(entity, AsyncClientSession) and self._cluster_time: - entity.advance_cluster_time(self._cluster_time) + if isinstance(entity, AsyncClientSession) and cluster_time: + entity.advance_cluster_time(cluster_time) class UnifiedSpecTestMixinV1(AsyncIntegrationTest): @@ -1044,7 +1041,7 @@ async def _testOperation_targetedFailPoint(self, spec): async def _testOperation_createEntities(self, spec): await self.entity_map.create_entities_from_spec(spec["entities"], uri=self._uri) - await self.entity_map.advance_cluster_times() + await self.entity_map.advance_cluster_times(self._cluster_time) def _testOperation_assertSessionTransactionState(self, spec): session = self.entity_map[spec["session"]] @@ -1443,11 +1440,12 @@ async def _run_scenario(self, spec, uri=None): await self.entity_map.create_entities_from_spec( self.TEST_SPEC.get("createEntities", []), uri=uri ) + self._cluster_time = None # process initialData if "initialData" in self.TEST_SPEC: await self.insert_initial_data(self.TEST_SPEC["initialData"]) - self._cluster_time = (await self.client.admin.command("ping")).get("$clusterTime") - await self.entity_map.advance_cluster_times() + self._cluster_time = self.client._topology.max_cluster_time() + await self.entity_map.advance_cluster_times(self._cluster_time) if "expectLogMessages" in spec: expect_log_messages = spec["expectLogMessages"] diff --git a/test/csot/waitQueueTimeout.json b/test/csot/waitQueueTimeout.json new file mode 100644 index 0000000000..138d5cc161 --- /dev/null +++ b/test/csot/waitQueueTimeout.json @@ -0,0 +1,176 @@ +{ + "description": "WaitQueueTimeoutError does not clear the pool", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1, + "appname": "waitQueueTimeoutErrorTest" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent", + "poolClearedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + } + ], + "tests": [ + { + "description": "WaitQueueTimeoutError does not clear the pool", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "ping" + ], + "blockConnection": true, + "blockTimeMS": 500, + "appName": "waitQueueTimeoutErrorTest" + } + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "thread": { + "id": "thread0" + } + } + ] + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread0", + "operation": { + "name": "runCommand", + "object": "database", + "arguments": { + "command": { + "ping": 1 + }, + "commandName": "ping" + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "commandStartedEvent": { + "commandName": "ping" + } + }, + "count": 1 + } + }, + { + "name": "runCommand", + "object": "database", + "arguments": { + "timeoutMS": 100, + "command": { + "hello": 1 + }, + "commandName": "hello" + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread0" + } + }, + { + "name": "runCommand", + "object": "database", + "arguments": { + "command": { + "hello": 1 + }, + "commandName": "hello" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "ping", + "databaseName": "test", + "command": { + "ping": 1 + } + } + }, + { + "commandStartedEvent": { + "commandName": "hello", + "databaseName": "test", + "command": { + "hello": 1 + } + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + } + ] +} diff --git a/test/unified_format.py b/test/unified_format.py index fd7f92909e..71d6cd50d4 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -221,7 +221,6 @@ def __init__(self, test_class): self._listeners: Dict[str, EventListenerUtil] = {} self._session_lsids: Dict[str, Mapping[str, Any]] = {} self.test: UnifiedSpecTestMixinV1 = test_class - self._cluster_time: Mapping[str, Any] = {} def __contains__(self, item): return item in self._entities @@ -420,13 +419,11 @@ def get_lsid_for_session(self, session_name): # session has been closed. return self._session_lsids[session_name] - def advance_cluster_times(self) -> None: + def advance_cluster_times(self, cluster_time) -> None: """Manually synchronize entities when desired""" - if not self._cluster_time: - self._cluster_time = (self.test.client.admin.command("ping")).get("$clusterTime") for entity in self._entities.values(): - if isinstance(entity, ClientSession) and self._cluster_time: - entity.advance_cluster_time(self._cluster_time) + if isinstance(entity, ClientSession) and cluster_time: + entity.advance_cluster_time(cluster_time) class UnifiedSpecTestMixinV1(IntegrationTest): @@ -1035,7 +1032,7 @@ def _testOperation_targetedFailPoint(self, spec): def _testOperation_createEntities(self, spec): self.entity_map.create_entities_from_spec(spec["entities"], uri=self._uri) - self.entity_map.advance_cluster_times() + self.entity_map.advance_cluster_times(self._cluster_time) def _testOperation_assertSessionTransactionState(self, spec): session = self.entity_map[spec["session"]] @@ -1428,11 +1425,12 @@ def _run_scenario(self, spec, uri=None): self._uri = uri self.entity_map = EntityMapUtil(self) self.entity_map.create_entities_from_spec(self.TEST_SPEC.get("createEntities", []), uri=uri) + self._cluster_time = None # process initialData if "initialData" in self.TEST_SPEC: self.insert_initial_data(self.TEST_SPEC["initialData"]) - self._cluster_time = (self.client.admin.command("ping")).get("$clusterTime") - self.entity_map.advance_cluster_times() + self._cluster_time = self.client._topology.max_cluster_time() + self.entity_map.advance_cluster_times(self._cluster_time) if "expectLogMessages" in spec: expect_log_messages = spec["expectLogMessages"]