Skip to content

Commit 2ffdcf5

Browse files
committed
JAVA-1883: Since the AsynchronousSocketChannelStream implementation provided by the JDK may store references to completion handlers long after they are no longer used, ensure that the AsynchronousSocketChannelStream completion handlers release their references to upstream handlers as soon as they are no longer needed.
1 parent 169aa0c commit 2ffdcf5

File tree

1 file changed

+77
-40
lines changed

1 file changed

+77
-40
lines changed

driver-core/src/main/com/mongodb/connection/AsynchronousSocketChannelStream.java

Lines changed: 77 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.nio.channels.InterruptedByTimeoutException;
3232
import java.util.Iterator;
3333
import java.util.List;
34+
import java.util.concurrent.atomic.AtomicReference;
3435

3536
import static com.mongodb.assertions.Assertions.isTrue;
3637
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -75,21 +76,7 @@ public void openAsync(final AsyncCompletionHandler<Void> handler) {
7576
channel.setOption(StandardSocketOptions.SO_SNDBUF, settings.getSendBufferSize());
7677
}
7778

78-
channel.connect(serverAddress.getSocketAddress(), null, new CompletionHandler<Void, Object>() {
79-
@Override
80-
public void completed(final Void result, final Object attachment) {
81-
handler.completed(null);
82-
}
83-
84-
@Override
85-
public void failed(final Throwable exc, final Object attachment) {
86-
if (exc instanceof ConnectException) {
87-
handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc));
88-
} else {
89-
handler.failed(exc);
90-
}
91-
}
92-
});
79+
channel.connect(serverAddress.getSocketAddress(), null, new OpenCompletionHandler(handler));
9380
} catch (IOException e) {
9481
handler.failed(new MongoSocketOpenException("Exception opening socket", serverAddress, e));
9582
} catch (Throwable t) {
@@ -188,51 +175,101 @@ public void failed(final Throwable t) {
188175
private class AsyncWritableByteChannelAdapter implements AsyncWritableByteChannel {
189176
@Override
190177
public void write(final ByteBuffer src, final AsyncCompletionHandler<Void> handler) {
191-
channel.write(src, null, new CompletionHandler<Integer, Object>() {
192-
@Override
193-
public void completed(final Integer result, final Object attachment) {
194-
handler.completed(null);
195-
}
178+
channel.write(src, null, new WriteCompletionHandler(handler));
179+
}
196180

197-
@Override
198-
public void failed(final Throwable exc, final Object attachment) {
199-
handler.failed(exc);
200-
}
201-
});
181+
private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> {
182+
183+
public WriteCompletionHandler(final AsyncCompletionHandler<Void> handler) {
184+
super(handler);
185+
}
186+
187+
@Override
188+
public void completed(final Integer result, final Object attachment) {
189+
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
190+
localHandler.completed(null);
191+
}
192+
193+
@Override
194+
public void failed(final Throwable exc, final Object attachment) {
195+
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
196+
localHandler.failed(exc);
197+
}
202198
}
203199
}
204200

205-
private final class BasicCompletionHandler implements CompletionHandler<Integer, Void> {
206-
private final ByteBuf dst;
207-
private final AsyncCompletionHandler<ByteBuf> handler;
201+
private final class BasicCompletionHandler extends BaseCompletionHandler<ByteBuf, Integer, Void> {
202+
private final AtomicReference<ByteBuf> byteBufReference;
208203

209204
private BasicCompletionHandler(final ByteBuf dst, final AsyncCompletionHandler<ByteBuf> handler) {
210-
this.dst = dst;
211-
this.handler = handler;
205+
super(handler);
206+
this.byteBufReference = new AtomicReference<ByteBuf>(dst);
212207
}
213208

214209
@Override
215210
public void completed(final Integer result, final Void attachment) {
211+
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
212+
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
216213
if (result == -1) {
217-
dst.release();
218-
handler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress));
219-
} else if (!dst.hasRemaining()) {
220-
dst.flip();
221-
handler.completed(dst);
214+
localByteBuf.release();
215+
localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress));
216+
} else if (!localByteBuf.hasRemaining()) {
217+
localByteBuf.flip();
218+
localHandler.completed(localByteBuf);
222219
} else {
223-
channel.read(dst.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
224-
new BasicCompletionHandler(dst, handler));
220+
channel.read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
221+
new BasicCompletionHandler(localByteBuf, localHandler));
225222
}
226223
}
227224

228225
@Override
229226
public void failed(final Throwable t, final Void attachment) {
230-
dst.release();
227+
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
228+
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
229+
localByteBuf.release();
231230
if (t instanceof InterruptedByTimeoutException) {
232-
handler.failed(new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, t));
231+
localHandler.failed(new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, t));
233232
} else {
234-
handler.failed(t);
233+
localHandler.failed(t);
235234
}
236235
}
237236
}
237+
238+
private class OpenCompletionHandler extends BaseCompletionHandler<Void, Void, Object> {
239+
public OpenCompletionHandler(final AsyncCompletionHandler<Void> handler) {
240+
super(handler);
241+
}
242+
243+
@Override
244+
public void completed(final Void result, final Object attachment) {
245+
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
246+
localHandler.completed(null);
247+
}
248+
249+
@Override
250+
public void failed(final Throwable exc, final Object attachment) {
251+
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
252+
if (exc instanceof ConnectException) {
253+
localHandler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc));
254+
} else {
255+
localHandler.failed(exc);
256+
}
257+
}
258+
}
259+
260+
// Private base class for all CompletionHandler implementors that ensures the upstream handler is
261+
// set to null before it is used. This is to work around an observed issue with implementations of
262+
// AsynchronousSocketChannel that fail to clear references to handlers stored in instance fields of
263+
// the class.
264+
private abstract static class BaseCompletionHandler<T, V, A> implements CompletionHandler<V, A> {
265+
private final AtomicReference<AsyncCompletionHandler<T>> handlerReference;
266+
267+
public BaseCompletionHandler(final AsyncCompletionHandler<T> handler) {
268+
this.handlerReference = new AtomicReference<AsyncCompletionHandler<T>>(handler);
269+
}
270+
271+
protected AsyncCompletionHandler<T> getHandlerAndClear() {
272+
return handlerReference.getAndSet(null);
273+
}
274+
}
238275
}

0 commit comments

Comments
 (0)