Skip to content

Commit ef1c166

Browse files
committed
Override forEach(Consumer<T>) in MongoIterable implementations
The implementations of this method now all properly close the MongoCursor<T> so that cursors are not leaked when the forEach method doesn't complete normally. JAVA-2010
1 parent 59906de commit ef1c166

28 files changed

+1279
-72
lines changed

driver-legacy/src/main/com/mongodb/MongoClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import com.mongodb.client.ListDatabasesIterable;
2222
import com.mongodb.client.MongoDatabase;
2323
import com.mongodb.client.MongoIterable;
24-
import com.mongodb.client.internal.ChangeStreamIterableImpl;
25-
import com.mongodb.client.internal.ListDatabasesIterableImpl;
2624
import com.mongodb.client.internal.MongoDatabaseImpl;
25+
import com.mongodb.client.internal.MongoIterables;
2726
import com.mongodb.client.model.changestream.ChangeStreamLevel;
2827
import com.mongodb.lang.Nullable;
2928
import org.bson.BsonDocument;
@@ -537,7 +536,7 @@ public <T> ListDatabasesIterable<T> listDatabases(final ClientSession clientSess
537536
}
538537

539538
private <T> ListDatabasesIterable<T> createListDatabasesIterable(@Nullable final ClientSession clientSession, final Class<T> clazz) {
540-
return new ListDatabasesIterableImpl<T>(clientSession, clazz, getMongoClientOptions().getCodecRegistry(),
539+
return MongoIterables.listDatabasesOf(clientSession, clazz, getMongoClientOptions().getCodecRegistry(),
541540
ReadPreference.primary(), createOperationExecutor());
542541
}
543542

@@ -700,7 +699,7 @@ private <TResult> ChangeStreamIterable<TResult> createChangeStreamIterable(@Null
700699
final List<? extends Bson> pipeline,
701700
final Class<TResult> resultClass) {
702701
MongoClientOptions clientOptions = getMongoClientOptions();
703-
return new ChangeStreamIterableImpl<TResult>(clientSession, "admin",
702+
return MongoIterables.changeStreamOf(clientSession, "admin",
704703
clientOptions.getCodecRegistry(), clientOptions.getReadPreference(), clientOptions.getReadConcern(),
705704
createOperationExecutor(), pipeline, resultClass, ChangeStreamLevel.CLIENT);
706705
}

driver-legacy/src/test/unit/com/mongodb/MongoClientSpecification.groovy

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,18 @@
1616

1717
package com.mongodb
1818

19-
import com.mongodb.client.internal.ChangeStreamIterableImpl
20-
import com.mongodb.client.internal.ListDatabasesIterableImpl
19+
import com.mongodb.client.ClientSession
2120
import com.mongodb.client.internal.MongoClientImpl
21+
import com.mongodb.client.internal.MongoIterables
2222
import com.mongodb.client.internal.TestOperationExecutor
2323
import com.mongodb.client.model.changestream.ChangeStreamLevel
2424
import com.mongodb.client.model.geojson.MultiPolygon
25-
import com.mongodb.client.ClientSession
2625
import com.mongodb.connection.Cluster
2726
import org.bson.BsonDocument
2827
import org.bson.Document
2928
import spock.lang.Specification
3029

3130
import static com.mongodb.CustomMatchers.isTheSameAs
32-
import static com.mongodb.MongoClient.getDefaultCodecRegistry
3331
import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry
3432
import static com.mongodb.ReadPreference.primary
3533
import static com.mongodb.client.internal.TestHelper.execute
@@ -65,22 +63,22 @@ class MongoClientSpecification extends Specification {
6563
def listDatabasesIterable = execute(listDatabasesMethod, session)
6664

6765
then:
68-
expect listDatabasesIterable, isTheSameAs(new ListDatabasesIterableImpl<Document>(session, Document, getDefaultCodecRegistry(),
66+
expect listDatabasesIterable, isTheSameAs(MongoIterables.listDatabasesOf(session, Document, getDefaultCodecRegistry(),
6967
primary(), executor))
7068

7169
when:
7270
listDatabasesIterable = execute(listDatabasesMethod, session, BsonDocument)
7371

7472
then:
75-
expect listDatabasesIterable, isTheSameAs(new ListDatabasesIterableImpl<BsonDocument>(session, BsonDocument,
73+
expect listDatabasesIterable, isTheSameAs(MongoIterables.listDatabasesOf(session, BsonDocument,
7674
getDefaultCodecRegistry(), primary(), executor))
7775

7876
when:
7977
def listDatabaseNamesIterable = execute(listDatabasesNamesMethod, session)
8078

8179
then:
8280
// listDatabaseNamesIterable is an instance of a MappingIterable, so have to get the mapped iterable inside it
83-
expect listDatabaseNamesIterable.getMapped(), isTheSameAs(new ListDatabasesIterableImpl<BsonDocument>(session, BsonDocument,
81+
expect listDatabaseNamesIterable.getMapped(), isTheSameAs(MongoIterables.listDatabasesOf(session, BsonDocument,
8482
getDefaultCodecRegistry(), primary(), executor).nameOnly(true))
8583

8684
cleanup:
@@ -110,21 +108,22 @@ class MongoClientSpecification extends Specification {
110108
def changeStreamIterable = execute(watchMethod, session)
111109

112110
then:
113-
expect changeStreamIterable, isTheSameAs(new ChangeStreamIterableImpl(session, namespace, codecRegistry, readPreference,
111+
expect changeStreamIterable, isTheSameAs(MongoIterables.changeStreamOf(session, namespace, codecRegistry,
112+
readPreference,
114113
readConcern, executor, [], Document, ChangeStreamLevel.CLIENT), ['codec'])
115114

116115
when:
117116
changeStreamIterable = execute(watchMethod, session, [new Document('$match', 1)])
118117

119118
then:
120-
expect changeStreamIterable, isTheSameAs(new ChangeStreamIterableImpl(session, namespace, codecRegistry, readPreference,
119+
expect changeStreamIterable, isTheSameAs(MongoIterables.changeStreamOf(session, namespace, codecRegistry, readPreference,
121120
readConcern, executor, [new Document('$match', 1)], Document, ChangeStreamLevel.CLIENT), ['codec'])
122121

123122
when:
124123
changeStreamIterable = execute(watchMethod, session, [new Document('$match', 1)], BsonDocument)
125124

126125
then:
127-
expect changeStreamIterable, isTheSameAs(new ChangeStreamIterableImpl(session, namespace, codecRegistry, readPreference,
126+
expect changeStreamIterable, isTheSameAs(MongoIterables.changeStreamOf(session, namespace, codecRegistry, readPreference,
128127
readConcern, executor, [new Document('$match', 1)], BsonDocument, ChangeStreamLevel.CLIENT), ['codec'])
129128

130129
where:

driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@
4646
/**
4747
* This class is NOT part of the public API. It may change at any time without notification.
4848
*/
49-
public final class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<ChangeStreamDocument<TResult>>
50-
implements ChangeStreamIterable<TResult> {
49+
class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<ChangeStreamDocument<TResult>> implements ChangeStreamIterable<TResult> {
5150
private final MongoNamespace namespace;
5251
private final CodecRegistry codecRegistry;
5352
private final List<? extends Bson> pipeline;
@@ -60,18 +59,18 @@ public final class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<C
6059
private Collation collation;
6160
private BsonTimestamp startAtOperationTime;
6261

63-
public ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final String databaseName,
64-
final CodecRegistry codecRegistry, final ReadPreference readPreference, final ReadConcern readConcern,
65-
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
66-
final ChangeStreamLevel changeStreamLevel) {
62+
ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final String databaseName,
63+
final CodecRegistry codecRegistry, final ReadPreference readPreference, final ReadConcern readConcern,
64+
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
65+
final ChangeStreamLevel changeStreamLevel) {
6766
this(clientSession, new MongoNamespace(databaseName, "ignored"), codecRegistry, readPreference, readConcern, executor, pipeline,
6867
resultClass, changeStreamLevel);
6968
}
7069

71-
public ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final MongoNamespace namespace,
72-
final CodecRegistry codecRegistry, final ReadPreference readPreference, final ReadConcern readConcern,
73-
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
74-
final ChangeStreamLevel changeStreamLevel) {
70+
ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final MongoNamespace namespace,
71+
final CodecRegistry codecRegistry, final ReadPreference readPreference, final ReadConcern readConcern,
72+
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
73+
final ChangeStreamLevel changeStreamLevel) {
7574
super(clientSession, executor, readConcern, readPreference);
7675
this.namespace = notNull("namespace", namespace);
7776
this.codecRegistry = notNull("codecRegistry", codecRegistry);
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.internal;
18+
19+
import com.mongodb.MongoNamespace;
20+
import com.mongodb.ReadConcern;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.WriteConcern;
23+
import com.mongodb.client.AggregateIterable;
24+
import com.mongodb.client.ChangeStreamIterable;
25+
import com.mongodb.client.ClientSession;
26+
import com.mongodb.client.DistinctIterable;
27+
import com.mongodb.client.FindIterable;
28+
import com.mongodb.client.ListCollectionsIterable;
29+
import com.mongodb.client.ListDatabasesIterable;
30+
import com.mongodb.client.ListIndexesIterable;
31+
import com.mongodb.client.MapReduceIterable;
32+
import com.mongodb.client.model.changestream.ChangeStreamLevel;
33+
import com.mongodb.lang.Nullable;
34+
import org.bson.codecs.configuration.CodecRegistry;
35+
import org.bson.conversions.Bson;
36+
37+
import java.util.List;
38+
39+
class FallbackMongoIterableFactory implements MongoIterableFactory {
40+
@Override
41+
public <TDocument, TResult>
42+
FindIterable<TResult> findOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
43+
final Class<TDocument> documentClass, final Class<TResult> resultClass, final CodecRegistry codecRegistry,
44+
final ReadPreference readPreference, final ReadConcern readConcern, final OperationExecutor executor,
45+
final Bson filter) {
46+
return new FindIterableImpl<TDocument, TResult>(clientSession, namespace, documentClass, resultClass, codecRegistry,
47+
readPreference, readConcern, executor, filter);
48+
}
49+
50+
@Override
51+
public <TDocument, TResult>
52+
AggregateIterable<TResult> aggregateOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
53+
final Class<TDocument> documentClass, final Class<TResult> resultClass,
54+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
55+
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
56+
final List<? extends Bson> pipeline) {
57+
return new AggregateIterableImpl<TDocument, TResult>(clientSession, namespace, documentClass, resultClass, codecRegistry,
58+
readPreference, readConcern, writeConcern, executor, pipeline);
59+
}
60+
61+
@Override
62+
public <TResult>
63+
ChangeStreamIterable<TResult> changeStreamOf(final @Nullable ClientSession clientSession, final String databaseName,
64+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
65+
final ReadConcern readConcern, final OperationExecutor executor,
66+
final List<? extends Bson> pipeline, final Class<TResult> resultClass,
67+
final ChangeStreamLevel changeStreamLevel) {
68+
return new ChangeStreamIterableImpl<TResult>(clientSession, databaseName, codecRegistry, readPreference, readConcern, executor,
69+
pipeline, resultClass, changeStreamLevel);
70+
}
71+
72+
@Override
73+
public <TResult>
74+
ChangeStreamIterable<TResult> changeStreamOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
75+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
76+
final ReadConcern readConcern, final OperationExecutor executor,
77+
final List<? extends Bson> pipeline, final Class<TResult> resultClass,
78+
final ChangeStreamLevel changeStreamLevel) {
79+
return new ChangeStreamIterableImpl<TResult>(clientSession, namespace, codecRegistry, readPreference, readConcern, executor,
80+
pipeline, resultClass, changeStreamLevel);
81+
}
82+
83+
84+
@Override
85+
public <TDocument, TResult>
86+
DistinctIterable<TResult> distinctOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
87+
final Class<TDocument> documentClass, final Class<TResult> resultClass,
88+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
89+
final ReadConcern readConcern, final OperationExecutor executor, final String fieldName,
90+
final Bson filter) {
91+
return new DistinctIterableImpl<TDocument, TResult>(clientSession, namespace, documentClass, resultClass, codecRegistry,
92+
readPreference, readConcern, executor, fieldName, filter);
93+
}
94+
95+
@Override
96+
public <TResult>
97+
ListDatabasesIterable<TResult> listDatabasesOf(final @Nullable ClientSession clientSession, final Class<TResult> resultClass,
98+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
99+
final OperationExecutor executor) {
100+
return new ListDatabasesIterableImpl<TResult>(clientSession, resultClass, codecRegistry, readPreference, executor);
101+
}
102+
103+
@Override
104+
public <TResult>
105+
ListCollectionsIterable<TResult> listCollectionsOf(final @Nullable ClientSession clientSession, final String databaseName,
106+
final boolean collectionNamesOnly, final Class<TResult> resultClass,
107+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
108+
final OperationExecutor executor) {
109+
return new ListCollectionsIterableImpl<TResult>(clientSession, databaseName, collectionNamesOnly, resultClass, codecRegistry,
110+
readPreference, executor);
111+
}
112+
113+
@Override
114+
public <TResult>
115+
ListIndexesIterable<TResult> listIndexesOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
116+
final Class<TResult> resultClass, final CodecRegistry codecRegistry,
117+
final ReadPreference readPreference, final OperationExecutor executor) {
118+
return new ListIndexesIterableImpl<TResult>(clientSession, namespace, resultClass, codecRegistry, readPreference, executor);
119+
}
120+
121+
@Override
122+
public <TDocument, TResult>
123+
MapReduceIterable<TResult> mapReduceOf(final @Nullable ClientSession clientSession, final MongoNamespace namespace,
124+
final Class<TDocument> documentClass, final Class<TResult> resultClass,
125+
final CodecRegistry codecRegistry, final ReadPreference readPreference,
126+
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
127+
final String mapFunction, final String reduceFunction) {
128+
return new MapReduceIterableImpl<TDocument, TResult>(clientSession, namespace, documentClass, resultClass, codecRegistry,
129+
readPreference, readConcern, writeConcern, executor, mapFunction, reduceFunction);
130+
}
131+
}

driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import com.mongodb.MongoNamespace;
2121
import com.mongodb.ReadConcern;
2222
import com.mongodb.ReadPreference;
23+
import com.mongodb.client.ClientSession;
2324
import com.mongodb.client.FindIterable;
2425
import com.mongodb.client.model.Collation;
2526
import com.mongodb.client.model.FindOptions;
2627
import com.mongodb.internal.operation.SyncOperations;
2728
import com.mongodb.lang.Nullable;
2829
import com.mongodb.operation.BatchCursor;
2930
import com.mongodb.operation.ReadOperation;
30-
import com.mongodb.client.ClientSession;
3131
import org.bson.codecs.configuration.CodecRegistry;
3232
import org.bson.conversions.Bson;
3333

@@ -36,7 +36,7 @@
3636
import static com.mongodb.assertions.Assertions.notNull;
3737

3838
@SuppressWarnings("deprecation")
39-
final class FindIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements FindIterable<TResult> {
39+
class FindIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements FindIterable<TResult> {
4040

4141
private final SyncOperations<TDocument> operations;
4242

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.internal;
18+
19+
import com.mongodb.MongoNamespace;
20+
import com.mongodb.ReadConcern;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.WriteConcern;
23+
import com.mongodb.client.ClientSession;
24+
import com.mongodb.lang.Nullable;
25+
import org.bson.codecs.configuration.CodecRegistry;
26+
import org.bson.conversions.Bson;
27+
28+
import java.util.List;
29+
import java.util.function.Consumer;
30+
31+
class Java8AggregateIterableImpl<TDocument, TResult> extends AggregateIterableImpl<TDocument, TResult> {
32+
Java8AggregateIterableImpl(@Nullable final ClientSession clientSession, final MongoNamespace namespace,
33+
final Class<TDocument> documentClass, final Class<TResult> resultClass, final CodecRegistry codecRegistry,
34+
final ReadPreference readPreference, final ReadConcern readConcern, final WriteConcern writeConcern,
35+
final OperationExecutor executor, final List<? extends Bson> pipeline) {
36+
super(clientSession, namespace, documentClass, resultClass, codecRegistry, readPreference, readConcern, writeConcern, executor,
37+
pipeline);
38+
}
39+
40+
@Override
41+
public void forEach(final Consumer<? super TResult> action) {
42+
Java8ForEachHelper.forEach(this, action);
43+
}
44+
}

0 commit comments

Comments
 (0)