Skip to content

Commit 9cff218

Browse files
committed
Fix Async Streams timeout behaviour
AsynchronousSocketChannelStream no longer wraps open or timeout exceptions Netty now respects the read timeout setting correctly. JAVA-1934
1 parent ac6fa70 commit 9cff218

File tree

5 files changed

+239
-7
lines changed

5 files changed

+239
-7
lines changed

LICENSE.txt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,20 @@
6868
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
6969
See the License for the specific language governing permissions and
7070
limitations under the License.
71+
72+
6) The following files: ReadTimeoutHandler.java
73+
74+
Copyright 2015 MongoDB, Inc.
75+
Copyright 2012 The Netty Project
76+
77+
Licensed under the Apache License, Version 2.0 (the "License");
78+
you may not use this file except in compliance with the License.
79+
You may obtain a copy of the License at
80+
81+
http://www.apache.org/licenses/LICENSE-2.0
82+
83+
Unless required by applicable law or agreed to in writing, software
84+
distributed under the License is distributed on an "AS IS" BASIS,
85+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
86+
See the License for the specific language governing permissions and
87+
limitations under the License.

driver-core/src/main/com/mongodb/connection/FutureAsyncCompletionHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.connection;
1818

19+
import com.mongodb.MongoException;
1920
import com.mongodb.MongoInternalException;
2021
import com.mongodb.MongoInterruptedException;
2122

@@ -61,6 +62,8 @@ private T get(final String prefix) throws IOException {
6162
if (error != null) {
6263
if (error instanceof IOException) {
6364
throw (IOException) error;
65+
} else if (error instanceof MongoException) {
66+
throw (MongoException) error;
6467
} else {
6568
throw new MongoInternalException(prefix + " the AsynchronousSocketChannelStream failed", error);
6669
}

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.connection.netty;
1818

19+
import com.mongodb.MongoException;
1920
import com.mongodb.MongoInternalException;
2021
import com.mongodb.MongoInterruptedException;
2122
import com.mongodb.MongoSocketOpenException;
@@ -32,6 +33,7 @@
3233
import io.netty.channel.Channel;
3334
import io.netty.channel.ChannelFuture;
3435
import io.netty.channel.ChannelFutureListener;
36+
import io.netty.channel.ChannelHandler;
3537
import io.netty.channel.ChannelHandlerContext;
3638
import io.netty.channel.ChannelInitializer;
3739
import io.netty.channel.ChannelOption;
@@ -41,7 +43,7 @@
4143
import io.netty.channel.socket.nio.NioSocketChannel;
4244
import io.netty.handler.ssl.SslHandler;
4345
import io.netty.handler.timeout.ReadTimeoutException;
44-
import io.netty.handler.timeout.ReadTimeoutHandler;
46+
import io.netty.util.concurrent.EventExecutor;
4547
import org.bson.ByteBuf;
4648

4749
import javax.net.ssl.SSLContext;
@@ -59,6 +61,7 @@
5961
* A Stream implementation based on Netty 4.0.
6062
*/
6163
final class NettyStream implements Stream {
64+
private static final String READ_HANDLER_NAME = "ReadTimeoutHandler";
6265
private final ServerAddress address;
6366
private final SocketSettings settings;
6467
private final SslSettings sslSettings;
@@ -122,7 +125,10 @@ public void initChannel(final SocketChannel ch) throws Exception {
122125
}
123126
ch.pipeline().addFirst("ssl", new SslHandler(engine, false));
124127
}
125-
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(settings.getReadTimeout(MILLISECONDS), MILLISECONDS));
128+
int readTimeout = settings.getReadTimeout(MILLISECONDS);
129+
if (readTimeout > 0) {
130+
ch.pipeline().addLast(READ_HANDLER_NAME, new ReadTimeoutHandler(readTimeout));
131+
}
126132
ch.pipeline().addLast(new InboundBufferHandler());
127133
}
128134
});
@@ -134,7 +140,7 @@ public void operationComplete(final ChannelFuture future) throws Exception {
134140
channel = channelFuture.channel();
135141
handler.completed(null);
136142
} else {
137-
handler.failed(future.cause());
143+
handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), future.cause()));
138144
}
139145
}
140146
});
@@ -177,6 +183,7 @@ public void operationComplete(final ChannelFuture future) throws Exception {
177183

178184
@Override
179185
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
186+
scheduleReadTimeout();
180187
ByteBuf buffer = null;
181188
Throwable exceptionResult = null;
182189
synchronized (this) {
@@ -208,9 +215,11 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
208215
}
209216
}
210217
if (exceptionResult != null) {
218+
disableReadTimeout();
211219
handler.failed(exceptionResult);
212220
}
213221
if (buffer != null) {
222+
disableReadTimeout();
214223
handler.completed(buffer);
215224
}
216225
}
@@ -322,10 +331,8 @@ public T get() throws IOException {
322331
if (throwable != null) {
323332
if (throwable instanceof IOException) {
324333
throw (IOException) throwable;
325-
} else if (throwable instanceof MongoSocketReadTimeoutException) {
326-
throw (MongoSocketReadTimeoutException) throwable;
327-
} else if (throwable instanceof MongoSocketOpenException) {
328-
throw (MongoSocketOpenException) throwable;
334+
} else if (throwable instanceof MongoException) {
335+
throw (MongoException) throwable;
329336
} else {
330337
throw new MongoInternalException("Exception thrown from Netty Stream", throwable);
331338
}
@@ -336,4 +343,47 @@ public T get() throws IOException {
336343
}
337344
}
338345
}
346+
347+
348+
private void scheduleReadTimeout() {
349+
adjustTimeout(false);
350+
}
351+
352+
private void disableReadTimeout() {
353+
adjustTimeout(true);
354+
}
355+
356+
private void adjustTimeout(final boolean disable) {
357+
ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME);
358+
if (timeoutHandler != null) {
359+
final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler;
360+
final ChannelHandlerContext handlerContext = channel.pipeline().context(timeoutHandler);
361+
EventExecutor executor = handlerContext.executor();
362+
363+
if (disable) {
364+
if (executor.inEventLoop()) {
365+
readTimeoutHandler.removeTimeout(handlerContext);
366+
} else {
367+
executor.submit(new Runnable() {
368+
@Override
369+
public void run() {
370+
readTimeoutHandler.removeTimeout(handlerContext);
371+
}
372+
});
373+
}
374+
} else {
375+
if (executor.inEventLoop()) {
376+
readTimeoutHandler.scheduleTimeout(handlerContext);
377+
} else {
378+
executor.submit(new Runnable() {
379+
@Override
380+
public void run() {
381+
readTimeoutHandler.scheduleTimeout(handlerContext);
382+
}
383+
});
384+
}
385+
}
386+
}
387+
388+
}
339389
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2015 MongoDB, Inc.
3+
* Copyright 2012 The Netty Project
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.mongodb.connection.netty;
19+
20+
import io.netty.channel.ChannelHandlerContext;
21+
import io.netty.channel.ChannelInboundHandlerAdapter;
22+
import io.netty.handler.timeout.ReadTimeoutException;
23+
24+
import java.util.concurrent.ScheduledFuture;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import static com.mongodb.assertions.Assertions.isTrueArgument;
28+
29+
/**
30+
* Passes a {@link ReadTimeoutException} if the time between a {@link #scheduleTimeout} and {@link #removeTimeout} is longer than the set
31+
* timeout.
32+
*/
33+
final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
34+
private final long readTimeout;
35+
private volatile ScheduledFuture<?> timeout;
36+
37+
public ReadTimeoutHandler(final long readTimeout) {
38+
isTrueArgument("readTimeout must be greater than zero.", readTimeout > 0);
39+
this.readTimeout = readTimeout;
40+
}
41+
42+
void scheduleTimeout(final ChannelHandlerContext ctx) {
43+
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout, TimeUnit.MILLISECONDS);
44+
}
45+
46+
void removeTimeout(final ChannelHandlerContext ctx) {
47+
if (ctx.channel().eventLoop().inEventLoop()) {
48+
if (timeout != null) {
49+
timeout.cancel(false);
50+
}
51+
}
52+
}
53+
54+
private static final class ReadTimeoutTask implements Runnable {
55+
56+
private final ChannelHandlerContext ctx;
57+
58+
ReadTimeoutTask(final ChannelHandlerContext ctx) {
59+
this.ctx = ctx;
60+
}
61+
62+
@Override
63+
public void run() {
64+
if (ctx.channel().isOpen()) {
65+
try {
66+
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
67+
ctx.close();
68+
} catch (Throwable t) {
69+
ctx.fireExceptionCaught(t);
70+
}
71+
}
72+
}
73+
}
74+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2015 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.connection
18+
19+
import category.Slow
20+
import com.mongodb.MongoSocketOpenException
21+
import com.mongodb.MongoSocketReadTimeoutException
22+
import com.mongodb.OperationFunctionalSpecification
23+
import com.mongodb.ServerAddress
24+
import com.mongodb.connection.netty.NettyStreamFactory
25+
import org.bson.BsonDocument
26+
import org.bson.BsonInt32
27+
import org.bson.BsonString
28+
import org.junit.experimental.categories.Category
29+
import spock.lang.Unroll
30+
31+
import java.util.concurrent.TimeUnit
32+
33+
import static com.mongodb.ClusterFixture.getCredentialList
34+
import static com.mongodb.ClusterFixture.getPrimary
35+
import static com.mongodb.ClusterFixture.getSslSettings
36+
import static com.mongodb.connection.CommandHelper.executeCommand
37+
38+
@Category(Slow)
39+
class AsyncStreamTimeoutsSpecification extends OperationFunctionalSpecification {
40+
41+
static SocketSettings openSocketSettings = SocketSettings.builder().connectTimeout(1, TimeUnit.MILLISECONDS).build();
42+
static SocketSettings readSocketSettings = SocketSettings.builder().readTimeout(5, TimeUnit.SECONDS).build();
43+
44+
@Unroll
45+
def 'should throw a MongoSocketOpenException when the #description Stream fails to open'() {
46+
given:
47+
def connection = new InternalStreamConnectionFactory(streamFactory, getCredentialList(), new NoOpConnectionListener())
48+
.create(new ServerId(new ClusterId(), new ServerAddress(new InetSocketAddress('192.168.255.255', 27017))));
49+
50+
when:
51+
connection.open()
52+
53+
then:
54+
thrown(MongoSocketOpenException)
55+
56+
where:
57+
description | streamFactory
58+
'AsynchronousSocket' | new AsynchronousSocketChannelStreamFactory(openSocketSettings, getSslSettings())
59+
'NettyStream' | new NettyStreamFactory(openSocketSettings, getSslSettings())
60+
}
61+
62+
@Unroll
63+
def 'should throw a MongoSocketReadTimeoutException with the #description stream'() {
64+
given:
65+
def connection = new InternalStreamConnectionFactory(streamFactory, getCredentialList(), new NoOpConnectionListener())
66+
.create(new ServerId(new ClusterId(), getPrimary()))
67+
connection.open()
68+
69+
getCollectionHelper().insertDocuments(new BsonDocument('_id', new BsonInt32(1)));
70+
def countCommand = new BsonDocument('count', new BsonString(getCollectionName()))
71+
countCommand.put('query', new BsonDocument('$where', new BsonString('sleep(5050); return true;')))
72+
73+
when:
74+
executeCommand(getDatabaseName(), countCommand, connection)
75+
76+
then:
77+
thrown(MongoSocketReadTimeoutException)
78+
79+
cleanup:
80+
connection?.close()
81+
82+
where:
83+
description | streamFactory
84+
'AsynchronousSocket' | new AsynchronousSocketChannelStreamFactory(readSocketSettings, getSslSettings())
85+
'NettyStream' | new NettyStreamFactory(readSocketSettings, getSslSettings())
86+
}
87+
88+
}

0 commit comments

Comments
 (0)