Skip to content

Commit cd7b1e1

Browse files
committed
JAVA-2778: Test that sessions and bindings have been properly released after every functional test in driver-core
1 parent 0a6781c commit cd7b1e1

9 files changed

+101
-25
lines changed

driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ public void close() {
9696
}
9797
}
9898

99+
public int getInUseCount() {
100+
return serverSessionPool.getInUseCount();
101+
}
102+
99103
private void closeSession(final ServerSessionImpl serverSession) {
100104
serverSession.close();
101105
// only track closed sessions when pool is in the process of closing

driver-core/src/test/functional/com/mongodb/ClusterFixture.java

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.mongodb.binding.AsyncWriteBinding;
2929
import com.mongodb.binding.ClusterBinding;
3030
import com.mongodb.binding.ReadWriteBinding;
31+
import com.mongodb.binding.ReferenceCounted;
3132
import com.mongodb.binding.SessionBinding;
3233
import com.mongodb.binding.SingleConnectionBinding;
3334
import com.mongodb.connection.AsyncConnection;
@@ -72,6 +73,7 @@
7273
import static com.mongodb.connection.ClusterType.REPLICA_SET;
7374
import static com.mongodb.connection.ClusterType.SHARDED;
7475
import static com.mongodb.connection.ClusterType.STANDALONE;
76+
import static java.lang.String.format;
7577
import static java.lang.Thread.sleep;
7678
import static java.util.Arrays.asList;
7779
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -99,7 +101,7 @@ public final class ClusterFixture {
99101
static {
100102
String mongoURIProperty = System.getProperty(MONGODB_URI_SYSTEM_PROPERTY_NAME);
101103
String mongoURIString = mongoURIProperty == null || mongoURIProperty.isEmpty()
102-
? DEFAULT_URI : mongoURIProperty;
104+
? DEFAULT_URI : mongoURIProperty;
103105
connectionString = new ConnectionString(mongoURIString);
104106
Runtime.getRuntime().addShutdownHook(new ShutdownHook());
105107
}
@@ -159,7 +161,7 @@ public static Document getBuildInfo() {
159161

160162
public static Document getServerStatus() {
161163
return new CommandWriteOperation<Document>("admin", new BsonDocument("serverStatus", new BsonInt32(1)), new DocumentCodec())
162-
.execute(getBinding());
164+
.execute(getBinding());
163165
}
164166

165167
@SuppressWarnings("unchecked")
@@ -281,12 +283,12 @@ public static synchronized Cluster getAsyncCluster() {
281283
@SuppressWarnings("deprecation")
282284
public static Cluster createCluster(final StreamFactory streamFactory) {
283285
return new DefaultClusterFactory().createCluster(ClusterSettings.builder().applyConnectionString(getConnectionString()).build(),
284-
ServerSettings.builder().build(),
285-
ConnectionPoolSettings.builder().applyConnectionString(getConnectionString()).build(),
286-
streamFactory,
287-
new SocketStreamFactory(SocketSettings.builder().build(), getSslSettings()),
288-
getConnectionString().getCredentialList(), null, null, null,
289-
getConnectionString().getCompressorList());
286+
ServerSettings.builder().build(),
287+
ConnectionPoolSettings.builder().applyConnectionString(getConnectionString()).build(),
288+
streamFactory,
289+
new SocketStreamFactory(SocketSettings.builder().build(), getSslSettings()),
290+
getConnectionString().getCredentialList(), null, null, null,
291+
getConnectionString().getCompressorList());
290292
}
291293

292294
public static StreamFactory getAsyncStreamFactory() {
@@ -352,7 +354,7 @@ public static List<MongoCredential> getCredentialList() {
352354

353355
public static boolean isDiscoverableReplicaSet() {
354356
return getCluster().getDescription().getType() == REPLICA_SET
355-
&& getCluster().getDescription().getConnectionMode() == MULTIPLE;
357+
&& getCluster().getDescription().getConnectionMode() == MULTIPLE;
356358
}
357359

358360
public static boolean isSharded() {
@@ -496,20 +498,20 @@ public static <T> void loopCursor(final AsyncBatchCursor<T> batchCursor, final B
496498
batchCursor.next(new SingleResultCallback<List<T>>() {
497499
@Override
498500
public void onResult(final List<T> results, final Throwable t) {
499-
if (t != null || results == null) {
500-
batchCursor.close();
501-
callback.onResult(null, t);
502-
} else {
503-
try {
504-
for (T result : results) {
505-
block.apply(result);
506-
}
507-
loopCursor(batchCursor, block, callback);
508-
} catch (Throwable tr) {
509-
batchCursor.close();
510-
callback.onResult(null, tr);
501+
if (t != null || results == null) {
502+
batchCursor.close();
503+
callback.onResult(null, t);
504+
} else {
505+
try {
506+
for (T result : results) {
507+
block.apply(result);
511508
}
509+
loopCursor(batchCursor, block, callback);
510+
} catch (Throwable tr) {
511+
batchCursor.close();
512+
callback.onResult(null, tr);
512513
}
514+
}
513515
}
514516
});
515517
}
@@ -552,4 +554,30 @@ public static AsyncConnection getConnection(final AsyncConnectionSource source)
552554
source.getConnection(futureResultCallback);
553555
return futureResultCallback.get(TIMEOUT, SECONDS);
554556
}
557+
558+
public static synchronized void checkReferenceCountReachesTarget(final ReferenceCounted referenceCounted, final int target) {
559+
int count = getReferenceCountAfterTimeout(referenceCounted, target);
560+
if (count != target) {
561+
throw new MongoTimeoutException(
562+
format("Timed out waiting for reference count to drop to %d. Now at %d for %s", target, count,
563+
referenceCounted));
564+
}
565+
}
566+
567+
public static int getReferenceCountAfterTimeout(final ReferenceCounted referenceCounted, final int target) {
568+
long startTime = System.currentTimeMillis();
569+
int count = referenceCounted.getCount();
570+
while (count > target) {
571+
try {
572+
if (System.currentTimeMillis() > startTime + 5000) {
573+
return count;
574+
}
575+
sleep(10);
576+
count = referenceCounted.getCount();
577+
} catch (InterruptedException e) {
578+
throw new MongoInterruptedException("Interrupted", e);
579+
}
580+
}
581+
return count;
582+
}
555583
}

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import static com.mongodb.ClusterFixture.getAsyncBinding
6868
import static com.mongodb.ClusterFixture.getBinding
6969
import static com.mongodb.ClusterFixture.getPrimary
7070
import static com.mongodb.ClusterFixture.loopCursor
71+
import static com.mongodb.ClusterFixture.checkReferenceCountReachesTarget
7172
import static com.mongodb.WriteConcern.ACKNOWLEDGED
7273

7374
class OperationFunctionalSpecification extends Specification {
@@ -78,6 +79,8 @@ class OperationFunctionalSpecification extends Specification {
7879

7980
def cleanup() {
8081
CollectionHelper.drop(getNamespace())
82+
checkReferenceCountReachesTarget(getBinding(), 1)
83+
checkReferenceCountReachesTarget(getAsyncBinding(), 1)
8184
ServerHelper.checkPool(getPrimary())
8285
}
8386

@@ -173,6 +176,13 @@ class OperationFunctionalSpecification extends Specification {
173176
next
174177
}
175178

179+
def consumeAsyncResults(cursor) {
180+
def batch = next(cursor, true)
181+
while (batch != null) {
182+
batch = next(cursor, true)
183+
}
184+
}
185+
176186
void testOperation(Map params) {
177187
params.async = params.async ?: false
178188
params.result = params.result ?: null

driver-core/src/test/functional/com/mongodb/operation/AggregateOperationSpecification.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ class AggregateOperationSpecification extends OperationFunctionalSpecification {
440440
thrown(MongoExecutionTimeoutException)
441441

442442
cleanup:
443+
cursor.close()
443444
disableMaxTimeFailPoint()
444445

445446
where:

driver-core/src/test/functional/com/mongodb/operation/AsyncQueryBatchCursorFunctionalSpecification.groovy

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import static com.mongodb.ClusterFixture.getAsyncCluster
4949
import static com.mongodb.ClusterFixture.getBinding
5050
import static com.mongodb.ClusterFixture.getConnection
5151
import static com.mongodb.ClusterFixture.getReadConnectionSource
52+
import static com.mongodb.ClusterFixture.getReferenceCountAfterTimeout
5253
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
5354
import static com.mongodb.ClusterFixture.isSharded
5455
import static com.mongodb.connection.ServerHelper.waitForLastRelease
@@ -122,8 +123,8 @@ class AsyncQueryBatchCursorFunctionalSpecification extends OperationFunctionalSp
122123
nextBatch()
123124

124125
then:
125-
connection.count == 1
126-
connectionSource.count == 1
126+
getReferenceCountAfterTimeout(connection, 1) == 1
127+
getReferenceCountAfterTimeout(connectionSource, 1) == 1
127128
}
128129

129130
def 'should not retain connection and source after cursor is exhausted after first batch'() {
@@ -132,8 +133,8 @@ class AsyncQueryBatchCursorFunctionalSpecification extends OperationFunctionalSp
132133
connection)
133134

134135
then:
135-
connection.count == 1
136-
connectionSource.count == 1
136+
getReferenceCountAfterTimeout(connection, 1) == 1
137+
getReferenceCountAfterTimeout(connectionSource, 1) == 1
137138
}
138139

139140
def 'should exhaust single batch with limit'() {

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
163163
next.getNamespace() == helper.getNamespace()
164164
next.getOperationType() == OperationType.INSERT
165165
next.getUpdateDescription() == null
166+
167+
cleanup:
168+
cursor?.close()
166169
}
167170

168171
def 'should decode update to ChangeStreamDocument '() {
@@ -186,6 +189,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
186189
next.getNamespace() == helper.getNamespace()
187190
next.getOperationType() == OperationType.UPDATE
188191
next.getUpdateDescription() == new UpdateDescription(['y'], BsonDocument.parse('{x : 3}'))
192+
193+
cleanup:
194+
cursor?.close()
189195
}
190196

191197
def 'should decode replace to ChangeStreamDocument '() {
@@ -209,6 +215,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
209215
next.getNamespace() == helper.getNamespace()
210216
next.getOperationType() == OperationType.REPLACE
211217
next.getUpdateDescription() == null
218+
219+
cleanup:
220+
cursor?.close()
212221
}
213222

214223
def 'should decode delete to ChangeStreamDocument '() {
@@ -232,6 +241,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
232241
next.getNamespace() == helper.getNamespace()
233242
next.getOperationType() == OperationType.DELETE
234243
next.getUpdateDescription() == null
244+
245+
cleanup:
246+
cursor?.close()
235247
}
236248

237249
def 'should decode invalidate to ChangeStreamDocument '() {
@@ -255,6 +267,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
255267
next.getNamespace() == null
256268
next.getOperationType() == OperationType.INVALIDATE
257269
next.getUpdateDescription() == null
270+
271+
cleanup:
272+
cursor?.close()
258273
}
259274

260275
def 'should throw if the _id field is projected out'() {
@@ -271,6 +286,9 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
271286
then:
272287
thrown(MongoChangeStreamException)
273288

289+
cleanup:
290+
cursor?.close()
291+
274292
where:
275293
async << [true, false]
276294
}

driver-core/src/test/functional/com/mongodb/operation/ListCollectionsOperationSpecification.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ class ListCollectionsOperationSpecification extends OperationFunctionalSpecifica
312312
collections.size() <= 2 // pre 3.0 items may be filtered out the batch by the driver
313313
cursor.hasNext()
314314
cursor.getBatchSize() == 2
315+
316+
cleanup:
317+
cursor?.close()
315318
}
316319

317320
@Category(Async)
@@ -341,6 +344,9 @@ class ListCollectionsOperationSpecification extends OperationFunctionalSpecifica
341344
then:
342345
callback.get().size() <= 2 // pre 3.0 items may be filtered out the batch by the driver
343346
cursor.getBatchSize() == 2
347+
348+
cleanup:
349+
consumeAsyncResults(cursor)
344350
}
345351

346352
@IgnoreIf({ isSharded() })

driver-core/src/test/functional/com/mongodb/operation/ListIndexesOperationSpecification.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ class ListIndexesOperationSpecification extends OperationFunctionalSpecification
179179
collections.size() <= 2 // pre 3.0 items may be filtered out the batch by the driver
180180
cursor.hasNext()
181181
cursor.getBatchSize() == 2
182+
183+
cleanup:
184+
cursor?.close()
182185
}
183186

184187
@Category(Async)
@@ -207,6 +210,9 @@ class ListIndexesOperationSpecification extends OperationFunctionalSpecification
207210
then:
208211
callback.get().size() <= 2 // pre 3.0 items may be filtered out the batch by the driver
209212
cursor.getBatchSize() == 2
213+
214+
cleanup:
215+
consumeAsyncResults(cursor)
210216
}
211217

212218
@IgnoreIf({ isSharded() })

driver-core/src/test/functional/com/mongodb/operation/QueryBatchCursorFunctionalSpecification.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class QueryBatchCursorFunctionalSpecification extends OperationFunctionalSpecifi
6868

6969
def cleanup() {
7070
cursor?.close()
71+
connectionSource?.release()
7172
}
7273

7374
def 'server cursor should not be null'() {
@@ -540,6 +541,7 @@ class QueryBatchCursorFunctionalSpecification extends OperationFunctionalSpecifi
540541
@IgnoreIf({ !isDiscoverableReplicaSet() })
541542
def 'should get more from a secondary'() {
542543
given:
544+
connectionSource.release() // release the connection source established in setup, since we're substituting our own here
543545
connectionSource = getBinding(ReadPreference.secondary()).getReadConnectionSource()
544546

545547
def firstBatch = executeQuery(2, ReadPreference.secondary())

0 commit comments

Comments
 (0)