Skip to content

Commit 17a80a2

Browse files
committed
JAVA-2603: Wait for all connections pools to have no checked out connections after closing an asynchronous cursor.
Avoid a race condition between the async cursor closing and the check for checked out connections in the common cleanup method
1 parent ca7cf3b commit 17a80a2

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

driver-core/src/test/functional/com/mongodb/connection/ServerHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ public static void checkPool(final ServerAddress address) {
3434
checkPool(address, getAsyncCluster());
3535
}
3636

37+
public static void waitForLastRelease(final Cluster cluster) {
38+
for (ServerDescription cur : cluster.getCurrentDescription().getServerDescriptions()) {
39+
if (cur.isOk()) {
40+
waitForLastRelease(cur.getAddress(), cluster);
41+
}
42+
}
43+
}
44+
3745
public static void waitForLastRelease(final ServerAddress address, final Cluster cluster) {
3846
DefaultServer server = (DefaultServer) cluster.selectServer(new ServerAddressSelector(address));
3947
DefaultConnectionPool connectionProvider = (DefaultConnectionPool) server.getConnectionPool();

driver-core/src/test/functional/com/mongodb/operation/AggregateOperationSpecification.groovy

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ import static com.mongodb.ClusterFixture.collectCursorResults
5555
import static com.mongodb.ClusterFixture.disableMaxTimeFailPoint
5656
import static com.mongodb.ClusterFixture.enableMaxTimeFailPoint
5757
import static com.mongodb.ClusterFixture.executeAsync
58+
import static com.mongodb.ClusterFixture.getAsyncCluster
5859
import static com.mongodb.ClusterFixture.getBinding
5960
import static com.mongodb.ClusterFixture.getCluster
60-
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
6161
import static com.mongodb.ClusterFixture.isSharded
62+
import static com.mongodb.ClusterFixture.isStandalone
6263
import static com.mongodb.ClusterFixture.serverVersionAtLeast
6364
import static com.mongodb.ExplainVerbosity.QUERY_PLANNER
65+
import static com.mongodb.connection.ServerHelper.waitForLastRelease
6466
import static com.mongodb.connection.ServerType.STANDALONE
6567
import static com.mongodb.operation.QueryOperationHelper.getKeyPattern
6668
import static com.mongodb.operation.ReadConcernHelper.appendReadConcernToCommand
@@ -191,7 +193,7 @@ class AggregateOperationSpecification extends OperationFunctionalSpecification {
191193
async << [true, false]
192194
}
193195

194-
@IgnoreIf({ !(serverVersionAtLeast(3, 6) && isDiscoverableReplicaSet()) })
196+
@IgnoreIf({ !(serverVersionAtLeast(3, 6) && !isStandalone()) })
195197
def 'should support changeStreams'() {
196198
given:
197199
def expected = [createExpectedChangeNotification(namespace, 0), createExpectedChangeNotification(namespace, 1)]
@@ -213,7 +215,7 @@ class AggregateOperationSpecification extends OperationFunctionalSpecification {
213215

214216
cleanup:
215217
cursor?.close()
216-
helper?.drop()
218+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
217219

218220
where:
219221
async << [true, false]

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ import org.bson.codecs.DocumentCodec
3838
import org.bson.codecs.ValueCodecProvider
3939
import spock.lang.IgnoreIf
4040

41+
import static com.mongodb.ClusterFixture.getAsyncCluster
42+
import static com.mongodb.ClusterFixture.getCluster
4143
import static com.mongodb.ClusterFixture.isStandalone
4244
import static com.mongodb.ClusterFixture.serverVersionAtLeast
45+
import static com.mongodb.connection.ServerHelper.waitForLastRelease
4346
import static java.util.concurrent.TimeUnit.MILLISECONDS
4447
import static org.bson.codecs.configuration.CodecRegistries.fromProviders
4548

@@ -134,7 +137,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
134137

135138
cleanup:
136139
cursor?.close()
137-
helper?.drop()
140+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
138141

139142
where:
140143
async << [true, false]
@@ -302,7 +305,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
302305

303306
cleanup:
304307
cursor?.close()
305-
helper?.drop()
308+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
306309

307310
where:
308311
async << [true, false]
@@ -341,7 +344,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
341344

342345
cleanup:
343346
cursor?.close()
344-
helper?.drop()
347+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
345348

346349
where:
347350
async << [true, false]
@@ -364,6 +367,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
364367

365368
when:
366369
cursor.close()
370+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
367371

368372
operation.resumeAfter(result.head().getDocument('_id'))
369373
cursor = execute(operation, async)
@@ -374,7 +378,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
374378

375379
cleanup:
376380
cursor?.close()
377-
helper?.drop()
381+
waitForLastRelease(async ? getAsyncCluster() : getCluster())
378382

379383
where:
380384
async << [true, false]

0 commit comments

Comments
 (0)