Skip to content

Commit 56a1ae0

Browse files
committed
JAVA-2053: When no stream factory factory is provided, ensure that any EventLoopGroup instances are shutdown cleanly
1 parent cb7f625 commit 56a1ae0

File tree

9 files changed

+203
-105
lines changed

9 files changed

+203
-105
lines changed

driver-async/src/main/com/mongodb/async/client/MongoClientImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.bson.BsonDocument;
3333
import org.bson.Document;
3434

35+
import java.io.Closeable;
36+
import java.io.IOException;
37+
3538
import static com.mongodb.assertions.Assertions.notNull;
3639
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
3740

@@ -40,15 +43,22 @@ class MongoClientImpl implements MongoClient {
4043
private final Cluster cluster;
4144
private final MongoClientSettings settings;
4245
private final AsyncOperationExecutor executor;
46+
private final Closeable externalResourceCloser;
4347

44-
MongoClientImpl(final MongoClientSettings settings, final Cluster cluster) {
45-
this(settings, cluster, createOperationExecutor(settings, cluster));
48+
MongoClientImpl(final MongoClientSettings settings, final Cluster cluster, final Closeable externalResourceCloser) {
49+
this(settings, cluster, createOperationExecutor(settings, cluster), externalResourceCloser);
4650
}
4751

4852
MongoClientImpl(final MongoClientSettings settings, final Cluster cluster, final AsyncOperationExecutor executor) {
53+
this(settings, cluster, executor, null);
54+
}
55+
56+
MongoClientImpl(final MongoClientSettings settings, final Cluster cluster, final AsyncOperationExecutor executor,
57+
final Closeable externalResourceCloser) {
4958
this.settings = notNull("settings", settings);
5059
this.cluster = notNull("cluster", cluster);
5160
this.executor = notNull("executor", executor);
61+
this.externalResourceCloser = externalResourceCloser;
5262
}
5363

5464
@Override
@@ -60,6 +70,14 @@ public MongoDatabase getDatabase(final String name) {
6070
@Override
6171
public void close() {
6272
cluster.close();
73+
if (externalResourceCloser != null) {
74+
try {
75+
externalResourceCloser.close();
76+
} catch (IOException e) {
77+
LOGGER.warn("Exception closing resource", e);
78+
}
79+
}
80+
6381
}
6482

6583
@Override

driver-async/src/main/com/mongodb/async/client/MongoClientSettings.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import com.mongodb.WriteConcern;
2323
import com.mongodb.annotations.Immutable;
2424
import com.mongodb.annotations.NotThreadSafe;
25-
import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory;
2625
import com.mongodb.connection.ClusterSettings;
2726
import com.mongodb.connection.ConnectionPoolSettings;
2827
import com.mongodb.connection.ServerSettings;
2928
import com.mongodb.connection.SocketSettings;
3029
import com.mongodb.connection.SslSettings;
3130
import com.mongodb.connection.StreamFactoryFactory;
32-
import com.mongodb.connection.netty.NettyStreamFactoryFactory;
3331
import com.mongodb.event.CommandListener;
3432
import org.bson.codecs.configuration.CodecRegistry;
3533

@@ -95,7 +93,7 @@ public static final class Builder {
9593
private WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
9694
private ReadConcern readConcern = ReadConcern.DEFAULT;
9795
private CodecRegistry codecRegistry = MongoClients.getDefaultCodecRegistry();
98-
private StreamFactoryFactory streamFactoryFactory = createDefaultStreamFactoryFactory();
96+
private StreamFactoryFactory streamFactoryFactory;
9997
private final List<CommandListener> commandListeners = new ArrayList<CommandListener>();
10098

10199
private ClusterSettings clusterSettings;
@@ -326,18 +324,6 @@ public Builder applicationName(final String applicationName) {
326324
public MongoClientSettings build() {
327325
return new MongoClientSettings(this);
328326
}
329-
330-
private static StreamFactoryFactory createDefaultStreamFactoryFactory() {
331-
String streamType = System.getProperty("org.mongodb.async.type", "nio2");
332-
333-
if (streamType.equals("netty")) {
334-
return NettyStreamFactoryFactory.builder().build();
335-
} else if (streamType.equals("nio2")) {
336-
return new AsynchronousSocketChannelStreamFactoryFactory();
337-
} else {
338-
throw new IllegalArgumentException("Unsupported stream type " + streamType);
339-
}
340-
}
341327
}
342328

343329
/**

driver-async/src/main/com/mongodb/async/client/MongoClients.java

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,31 @@
2020
import com.mongodb.client.MongoDriverInformation;
2121
import com.mongodb.client.gridfs.codecs.GridFSFileCodecProvider;
2222
import com.mongodb.client.model.geojson.codecs.GeoJsonCodecProvider;
23-
import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory;
24-
import com.mongodb.connection.Cluster;
23+
import com.mongodb.connection.AsynchronousSocketChannelStreamFactory;
2524
import com.mongodb.connection.ClusterSettings;
2625
import com.mongodb.connection.ConnectionPoolSettings;
2726
import com.mongodb.connection.DefaultClusterFactory;
2827
import com.mongodb.connection.ServerSettings;
2928
import com.mongodb.connection.SocketSettings;
3029
import com.mongodb.connection.SslSettings;
3130
import com.mongodb.connection.StreamFactory;
32-
import com.mongodb.connection.netty.NettyStreamFactoryFactory;
31+
import com.mongodb.connection.StreamFactoryFactory;
32+
import com.mongodb.connection.netty.NettyStreamFactory;
3333
import com.mongodb.event.CommandEventMulticaster;
3434
import com.mongodb.event.CommandListener;
3535
import com.mongodb.management.JMXConnectionPoolListener;
36+
import io.netty.channel.EventLoopGroup;
37+
import io.netty.channel.nio.NioEventLoopGroup;
3638
import org.bson.codecs.BsonValueCodecProvider;
3739
import org.bson.codecs.DocumentCodecProvider;
3840
import org.bson.codecs.IterableCodecProvider;
3941
import org.bson.codecs.ValueCodecProvider;
4042
import org.bson.codecs.configuration.CodecRegistry;
4143

44+
import java.io.Closeable;
45+
import java.io.IOException;
4246
import java.util.List;
4347

44-
import static java.lang.String.format;
4548
import static java.util.Arrays.asList;
4649
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
4750

@@ -91,7 +94,7 @@ public static MongoClient create(final String connectionString) {
9194
* </p>
9295
* <p>
9396
* The connection string's stream type is then applied by setting the
94-
* {@link com.mongodb.connection.StreamFactoryFactory} to an instance of {@link NettyStreamFactoryFactory},
97+
* {@link com.mongodb.connection.StreamFactory} to an instance of {@link NettyStreamFactory},
9598
* </p>
9699
*
97100
* @param connectionString the settings
@@ -115,21 +118,21 @@ public static MongoClient create(final ConnectionString connectionString) {
115118
*
116119
* <p>Note: Intended for driver and library authors to associate extra driver metadata with the connections.</p>
117120
*
118-
* @param settings the settings
121+
* @param settings the settings
119122
* @param mongoDriverInformation any driver information to associate with the MongoClient
120123
* @return the client
121124
* @since 3.4
122125
*/
123126
public static MongoClient create(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) {
124-
return new MongoClientImpl(settings, createCluster(settings, mongoDriverInformation));
127+
return create(settings, mongoDriverInformation, null);
125128
}
126129

127130
/**
128131
* Create a new client with the given connection string.
129132
*
130133
* <p>Note: Intended for driver and library authors to associate extra driver metadata with the connections.</p>
131134
*
132-
* @param connectionString the settings
135+
* @param connectionString the settings
133136
* @param mongoDriverInformation any driver information to associate with the MongoClient
134137
* @return the client
135138
* @throws IllegalArgumentException if the connection string's stream type is not one of "netty" or "nio2"
@@ -153,15 +156,7 @@ public static MongoClient create(final ConnectionString connectionString, final
153156
.socketSettings(SocketSettings.builder()
154157
.applyConnectionString(connectionString)
155158
.build());
156-
if (connectionString.getStreamType() != null) {
157-
if (connectionString.getStreamType().toLowerCase().equals("netty")) {
158-
builder.streamFactoryFactory(NettyStreamFactoryFactory.builder().build());
159-
} else if (connectionString.getStreamType().toLowerCase().equals("nio2")) {
160-
builder.streamFactoryFactory(new AsynchronousSocketChannelStreamFactoryFactory());
161-
} else if (!connectionString.getStreamType().toLowerCase().equals("nio2")) {
162-
throw new IllegalArgumentException(format("Unsupported stream type %s", connectionString.getStreamType()));
163-
}
164-
}
159+
165160
if (connectionString.getReadPreference() != null) {
166161
builder.readPreference(connectionString.getReadPreference());
167162
}
@@ -174,9 +169,27 @@ public static MongoClient create(final ConnectionString connectionString, final
174169
if (connectionString.getApplicationName() != null) {
175170
builder.applicationName(connectionString.getApplicationName());
176171
}
177-
return create(builder.build(), mongoDriverInformation);
172+
return create(builder.build(), mongoDriverInformation, connectionString.getStreamType());
178173
}
179174

175+
private static MongoClient create(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation,
176+
final String requestedStreamType) {
177+
String streamType = getStreamType(requestedStreamType);
178+
EventLoopGroup eventLoopGroup = getEventLoopGroupIfNecessary(settings.getStreamFactoryFactory(), streamType);
179+
StreamFactory streamFactory = getStreamFactory(settings.getStreamFactoryFactory(), settings.getSocketSettings(),
180+
settings.getSslSettings(), streamType, eventLoopGroup);
181+
StreamFactory heartbeatStreamFactory = getStreamFactory(settings.getStreamFactoryFactory(), settings.getHeartbeatSocketSettings(),
182+
settings.getSslSettings(), streamType, eventLoopGroup);
183+
return new MongoClientImpl(settings, new DefaultClusterFactory().create(settings.getClusterSettings(), settings.getServerSettings(),
184+
settings.getConnectionPoolSettings(), streamFactory,
185+
heartbeatStreamFactory,
186+
settings.getCredentialList(), null, new JMXConnectionPoolListener(), null,
187+
createCommandListener(settings.getCommandListeners()),
188+
settings.getApplicationName(), mongoDriverInformation),
189+
getEventLoopGroupCloser(eventLoopGroup));
190+
}
191+
192+
180193
/**
181194
* Gets the default codec registry. It includes the following providers:
182195
*
@@ -203,23 +216,55 @@ public static CodecRegistry getDefaultCodecRegistry() {
203216
new GeoJsonCodecProvider(),
204217
new GridFSFileCodecProvider()));
205218

206-
private static Cluster createCluster(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) {
207-
StreamFactory streamFactory = getStreamFactory(settings);
208-
StreamFactory heartbeatStreamFactory = getHeartbeatStreamFactory(settings);
209-
return new DefaultClusterFactory().create(settings.getClusterSettings(), settings.getServerSettings(),
210-
settings.getConnectionPoolSettings(), streamFactory,
211-
heartbeatStreamFactory,
212-
settings.getCredentialList(), null, new JMXConnectionPoolListener(), null,
213-
createCommandListener(settings.getCommandListeners()),
214-
settings.getApplicationName(), mongoDriverInformation);
219+
private static StreamFactory getStreamFactory(final StreamFactoryFactory streamFactoryFactory,
220+
final SocketSettings socketSettings, final SslSettings sslSettings,
221+
final String streamType, final EventLoopGroup eventLoopGroup) {
222+
if (streamFactoryFactory != null) {
223+
return streamFactoryFactory.create(socketSettings, sslSettings);
224+
} else if (isNetty(streamType)) {
225+
return new NettyStreamFactory(socketSettings, sslSettings, eventLoopGroup);
226+
} else if (isNio2(streamType)) {
227+
return new AsynchronousSocketChannelStreamFactory(socketSettings, sslSettings);
228+
} else {
229+
throw new IllegalArgumentException("Unsupported stream type: " + streamType);
230+
}
231+
}
232+
233+
private static boolean isNetty(final String streamType) {
234+
return streamType.toLowerCase().equals("netty");
235+
}
236+
237+
private static boolean isNio2(final String streamType) {
238+
return streamType.toLowerCase().equals("nio2");
215239
}
216240

217-
private static StreamFactory getHeartbeatStreamFactory(final MongoClientSettings settings) {
218-
return settings.getStreamFactoryFactory().create(settings.getHeartbeatSocketSettings(), settings.getSslSettings());
241+
private static String getStreamType(final String requestedStreamType) {
242+
if (requestedStreamType != null) {
243+
return requestedStreamType;
244+
} else {
245+
return System.getProperty("org.mongodb.async.type", "nio2");
246+
}
219247
}
220248

221-
private static StreamFactory getStreamFactory(final MongoClientSettings settings) {
222-
return settings.getStreamFactoryFactory().create(settings.getSocketSettings(), settings.getSslSettings());
249+
private static Closeable getEventLoopGroupCloser(final EventLoopGroup eventLoopGroup) {
250+
if (eventLoopGroup == null) {
251+
return null;
252+
} else {
253+
return new Closeable() {
254+
@Override
255+
public void close() throws IOException {
256+
eventLoopGroup.shutdownGracefully().awaitUninterruptibly();
257+
}
258+
};
259+
}
260+
}
261+
private static EventLoopGroup getEventLoopGroupIfNecessary(final StreamFactoryFactory streamFactoryFactory,
262+
final String streamType) {
263+
if (isNetty(streamType) && streamFactoryFactory == null) {
264+
return new NioEventLoopGroup();
265+
} else {
266+
return null;
267+
}
223268
}
224269

225270
private static CommandListener createCommandListener(final List<CommandListener> commandListeners) {

driver-async/src/test/functional/com/mongodb/async/client/MongoClientsSpecification.groovy

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@ import com.mongodb.ReadConcern
2222
import com.mongodb.ServerAddress
2323
import com.mongodb.WriteConcern
2424
import com.mongodb.client.MongoDriverInformation
25-
import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory
26-
import com.mongodb.connection.netty.NettyStreamFactoryFactory
2725
import org.bson.Document
2826
import spock.lang.IgnoreIf
2927
import spock.lang.Unroll
3028

31-
import static com.mongodb.ClusterFixture.getSslSettings
3229
import static com.mongodb.ClusterFixture.isStandalone
3330
import static com.mongodb.ClusterFixture.serverVersionAtLeast
3431
import static com.mongodb.ReadPreference.primary
@@ -96,29 +93,6 @@ class MongoClientsSpecification extends FunctionalSpecification {
9693
client?.close()
9794
}
9895

99-
def 'should apply connection string to netty stream type'() {
100-
when:
101-
def client = MongoClients.create('mongodb://localhost/?streamType=Netty')
102-
103-
then:
104-
client.settings.streamFactoryFactory instanceof NettyStreamFactoryFactory
105-
106-
cleanup:
107-
client?.close()
108-
}
109-
110-
@IgnoreIf({ javaVersion < 1.7 || getSslSettings().isEnabled() })
111-
def 'should apply connection string to nio2 stream type'() {
112-
when:
113-
def client = MongoClients.create('mongodb://localhost/?streamType=NIO2')
114-
115-
then:
116-
client.settings.streamFactoryFactory instanceof AsynchronousSocketChannelStreamFactoryFactory
117-
118-
cleanup:
119-
client?.close()
120-
}
121-
12296
def 'should apply connection string to socket settings'() {
12397
when:
12498
def client = MongoClients.create('mongodb://localhost/?connectTimeoutMS=300')
@@ -198,30 +172,6 @@ class MongoClientsSpecification extends FunctionalSpecification {
198172
'mongodb://localhost/?appname=app1' | 'app1'
199173
}
200174

201-
@Unroll
202-
def 'should respect the streamType over the system properties'() {
203-
given:
204-
def asyncType = System.getProperty('org.mongodb.async.type', null)
205-
System.setProperty('org.mongodb.async.type', systemType)
206-
207-
when:
208-
def client = MongoClients.create(uri)
209-
210-
then:
211-
client.settings.getStreamFactoryFactory().getClass() == streamFactoryFactoryClass
212-
213-
cleanup:
214-
client?.close()
215-
if (asyncType != null) {
216-
System.setProperty('org.mongodb.async.type', asyncType)
217-
}
218-
219-
where:
220-
uri | systemType | streamFactoryFactoryClass
221-
'mongodb://localhost/?streamType=nio2' | 'netty' | AsynchronousSocketChannelStreamFactoryFactory
222-
'mongodb://localhost/?streamType=netty' | 'nio2' | NettyStreamFactoryFactory
223-
}
224-
225175
@IgnoreIf({ !serverVersionAtLeast([3, 3, 9]) || !isStandalone() })
226176
def 'application name should appear in the system.profile collection'() {
227177
given:

driver-async/src/test/unit/com/mongodb/async/client/MongoClientSettingsSpecification.groovy

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import com.mongodb.ReadConcern
2121
import com.mongodb.ReadPreference
2222
import com.mongodb.ServerAddress
2323
import com.mongodb.WriteConcern
24-
import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory
2524
import com.mongodb.connection.ClusterSettings
2625
import com.mongodb.connection.ConnectionPoolSettings
2726
import com.mongodb.connection.ServerSettings
@@ -51,10 +50,7 @@ class MongoClientSettingsSpecification extends Specification {
5150
options.socketSettings == SocketSettings.builder().build()
5251
options.heartbeatSocketSettings == SocketSettings.builder().build()
5352
options.serverSettings == ServerSettings.builder().build()
54-
55-
System.getProperty('org.mongodb.async.type', 'nio2') == 'netty' ?
56-
options.streamFactoryFactory instanceof NettyStreamFactoryFactory :
57-
options.streamFactoryFactory instanceof AsynchronousSocketChannelStreamFactoryFactory
53+
options.streamFactoryFactory == null
5854
}
5955

6056
@SuppressWarnings('UnnecessaryObjectReferences')

driver-async/src/test/unit/com/mongodb/async/client/MongoClientSpecification.groovy

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ class MongoClientSpecification extends Specification {
8989
WriteConcern.MAJORITY, ReadConcern.MAJORITY, new TestOperationExecutor([]))
9090
}
9191

92+
93+
def 'should cleanly close the external resource closer on close'() {
94+
given:
95+
def closed = false
96+
def client = new MongoClientImpl(MongoClientSettings.builder().build(), Mock(Cluster), {
97+
closed = true
98+
throw new IOException()
99+
})
100+
101+
when:
102+
client.close()
103+
104+
then:
105+
closed
106+
}
107+
92108
def 'default codec registry should contain all supported providers'() {
93109
given:
94110
def codecRegistry = MongoClients.getDefaultCodecRegistry()

0 commit comments

Comments
 (0)