Skip to content

Commit 1cd73b2

Browse files
committed
Refactor AsynchronousSocketChannelStream
AsynchronousSocketChannelStream now extends common base class AsynchronousChannelStream along with TlsChannelStream. JAVA-3038
1 parent 96f29a1 commit 1cd73b2

File tree

1 file changed

+67
-199
lines changed

1 file changed

+67
-199
lines changed

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 67 additions & 199 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818

1919
import com.mongodb.MongoSocketException;
2020
import com.mongodb.MongoSocketOpenException;
21-
import com.mongodb.MongoSocketReadException;
22-
import com.mongodb.MongoSocketReadTimeoutException;
2321
import com.mongodb.ServerAddress;
2422
import com.mongodb.connection.AsyncCompletionHandler;
2523
import com.mongodb.connection.BufferProvider;
2624
import com.mongodb.connection.SocketSettings;
2725
import com.mongodb.connection.Stream;
28-
import org.bson.ByteBuf;
26+
import com.mongodb.internal.connection.tlschannel.async.ExtendedAsynchronousByteChannel;
2927

3028
import java.io.IOException;
3129
import java.net.SocketAddress;
@@ -34,47 +32,30 @@
3432
import java.nio.channels.AsynchronousChannelGroup;
3533
import java.nio.channels.AsynchronousSocketChannel;
3634
import java.nio.channels.CompletionHandler;
37-
import java.nio.channels.InterruptedByTimeoutException;
38-
import java.util.Iterator;
3935
import java.util.LinkedList;
40-
import java.util.List;
4136
import java.util.Queue;
37+
import java.util.concurrent.Future;
38+
import java.util.concurrent.TimeUnit;
4239
import java.util.concurrent.atomic.AtomicReference;
4340

4441
import static com.mongodb.assertions.Assertions.isTrue;
45-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4642

47-
public final class AsynchronousSocketChannelStream implements Stream {
43+
public final class AsynchronousSocketChannelStream extends AsynchronousChannelStream implements Stream {
4844
private final ServerAddress serverAddress;
4945
private final SocketSettings settings;
50-
private final BufferProvider bufferProvider;
5146
private final AsynchronousChannelGroup group;
52-
private volatile AsynchronousSocketChannel channel;
53-
private volatile boolean isClosed;
5447

5548
public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final SocketSettings settings,
5649
final BufferProvider bufferProvider, final AsynchronousChannelGroup group) {
50+
super(serverAddress, settings, bufferProvider);
5751
this.serverAddress = serverAddress;
5852
this.settings = settings;
59-
this.bufferProvider = bufferProvider;
6053
this.group = group;
6154
}
6255

63-
@Override
64-
public ByteBuf getBuffer(final int size) {
65-
return bufferProvider.getBuffer(size);
66-
}
67-
68-
@Override
69-
public void open() throws IOException {
70-
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<Void>();
71-
openAsync(handler);
72-
handler.getOpen();
73-
}
74-
7556
@Override
7657
public void openAsync(final AsyncCompletionHandler<Void> handler) {
77-
isTrue("unopened", channel == null);
58+
isTrue("unopened", getChannel() == null);
7859
initializeSocketChannel(handler, new LinkedList<SocketAddress>(serverAddress.getSocketAddresses()));
7960
}
8061

@@ -106,216 +87,103 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
10687
}
10788
}
10889

109-
@Override
110-
public void write(final List<ByteBuf> buffers) throws IOException {
111-
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<Void>();
112-
writeAsync(buffers, handler);
113-
handler.getWrite();
114-
}
115-
116-
@Override
117-
public ByteBuf read(final int numBytes) throws IOException {
118-
FutureAsyncCompletionHandler<ByteBuf> handler = new FutureAsyncCompletionHandler<ByteBuf>();
119-
readAsync(numBytes, handler);
120-
return handler.getRead();
121-
}
122-
123-
@Override
124-
public void writeAsync(final List<ByteBuf> buffers, final AsyncCompletionHandler<Void> handler) {
125-
final AsyncWritableByteChannel byteChannel = new AsyncWritableByteChannelAdapter();
126-
final Iterator<ByteBuf> iter = buffers.iterator();
127-
pipeOneBuffer(byteChannel, iter.next(), new AsyncCompletionHandler<Void>() {
128-
@Override
129-
public void completed(final Void t) {
130-
if (iter.hasNext()) {
131-
pipeOneBuffer(byteChannel, iter.next(), this);
132-
} else {
133-
handler.completed(null);
134-
}
135-
}
136-
137-
@Override
138-
public void failed(final Throwable t) {
139-
handler.failed(t);
140-
}
141-
});
142-
}
143-
144-
@Override
145-
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
146-
ByteBuf buffer = bufferProvider.getBuffer(numBytes);
147-
channel.read(buffer.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
148-
new BasicCompletionHandler(buffer, handler));
149-
}
150-
151-
@Override
152-
public ServerAddress getAddress() {
153-
return serverAddress;
154-
}
155-
156-
/**
157-
* Closes the connection.
158-
*/
159-
@Override
160-
public void close() {
161-
try {
162-
if (channel != null) {
163-
channel.close();
164-
}
165-
} catch (IOException e) { // NOPMD
166-
// ignore
167-
} finally {
168-
channel = null;
169-
isClosed = true;
170-
}
171-
}
172-
173-
@Override
174-
public boolean isClosed() {
175-
return isClosed;
176-
}
177-
178-
public ServerAddress getServerAddress() {
179-
return serverAddress;
180-
}
181-
182-
public SocketSettings getSettings() {
183-
return settings;
184-
}
185-
18690
public AsynchronousChannelGroup getGroup() {
18791
return group;
18892
}
18993

190-
private void pipeOneBuffer(final AsyncWritableByteChannel byteChannel, final ByteBuf byteBuffer,
191-
final AsyncCompletionHandler<Void> outerHandler) {
192-
byteChannel.write(byteBuffer.asNIO(), new AsyncCompletionHandler<Void>() {
193-
@Override
194-
public void completed(final Void t) {
195-
if (byteBuffer.hasRemaining()) {
196-
byteChannel.write(byteBuffer.asNIO(), this);
197-
} else {
198-
outerHandler.completed(null);
199-
}
200-
}
94+
private class OpenCompletionHandler implements CompletionHandler<Void, Object> {
95+
private AtomicReference<AsyncCompletionHandler<Void>> handlerReference;
96+
private final Queue<SocketAddress> socketAddressQueue;
97+
private final AsynchronousSocketChannel attemptConnectionChannel;
20198

202-
@Override
203-
public void failed(final Throwable t) {
204-
outerHandler.failed(t);
205-
}
206-
});
207-
}
99+
OpenCompletionHandler(final AsyncCompletionHandler<Void> handler, final Queue<SocketAddress> socketAddressQueue,
100+
final AsynchronousSocketChannel attemptConnectionChannel) {
101+
this.handlerReference = new AtomicReference<AsyncCompletionHandler<Void>>(handler);
102+
this.socketAddressQueue = socketAddressQueue;
103+
this.attemptConnectionChannel = attemptConnectionChannel;
104+
}
208105

209-
private class AsyncWritableByteChannelAdapter implements AsyncWritableByteChannel {
210106
@Override
211-
public void write(final ByteBuffer src, final AsyncCompletionHandler<Void> handler) {
212-
channel.write(src, null, new WriteCompletionHandler(handler));
107+
public void completed(final Void result, final Object attachment) {
108+
setChannel(new AsynchronousSocketChannelAdapter(attemptConnectionChannel));
109+
handlerReference.getAndSet(null).completed(null);
213110
}
214111

215-
private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> {
216-
217-
WriteCompletionHandler(final AsyncCompletionHandler<Void> handler) {
218-
super(handler);
219-
}
220-
221-
@Override
222-
public void completed(final Integer result, final Object attachment) {
223-
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
224-
localHandler.completed(null);
225-
}
112+
@Override
113+
public void failed(final Throwable exc, final Object attachment) {
114+
AsyncCompletionHandler<Void> localHandler = handlerReference.getAndSet(null);
226115

227-
@Override
228-
public void failed(final Throwable exc, final Object attachment) {
229-
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
230-
localHandler.failed(exc);
116+
if (socketAddressQueue.isEmpty()) {
117+
if (exc instanceof IOException) {
118+
localHandler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc));
119+
} else {
120+
localHandler.failed(exc);
121+
}
122+
} else {
123+
initializeSocketChannel(localHandler, socketAddressQueue);
231124
}
232125
}
233126
}
234127

235-
private final class BasicCompletionHandler extends BaseCompletionHandler<ByteBuf, Integer, Void> {
236-
private final AtomicReference<ByteBuf> byteBufReference;
128+
private static final class AsynchronousSocketChannelAdapter implements ExtendedAsynchronousByteChannel {
129+
private final AsynchronousSocketChannel channel;
237130

238-
private BasicCompletionHandler(final ByteBuf dst, final AsyncCompletionHandler<ByteBuf> handler) {
239-
super(handler);
240-
this.byteBufReference = new AtomicReference<ByteBuf>(dst);
131+
private AsynchronousSocketChannelAdapter(final AsynchronousSocketChannel channel) {
132+
this.channel = channel;
241133
}
242134

243135
@Override
244-
public void completed(final Integer result, final Void attachment) {
245-
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
246-
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
247-
if (result == -1) {
248-
localByteBuf.release();
249-
localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress));
250-
} else if (!localByteBuf.hasRemaining()) {
251-
localByteBuf.flip();
252-
localHandler.completed(localByteBuf);
253-
} else {
254-
channel.read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
255-
new BasicCompletionHandler(localByteBuf, localHandler));
256-
}
136+
public <A> void read(final ByteBuffer dst, final long timeout, final TimeUnit unit, final A attach,
137+
final CompletionHandler<Integer, ? super A> handler) {
138+
channel.read(dst, timeout, unit, attach, handler);
257139
}
258140

259141
@Override
260-
public void failed(final Throwable t, final Void attachment) {
261-
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
262-
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
263-
localByteBuf.release();
264-
if (t instanceof InterruptedByTimeoutException) {
265-
localHandler.failed(new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, t));
266-
} else {
267-
localHandler.failed(t);
268-
}
142+
public <A> void read(final ByteBuffer[] dsts, final int offset, final int length, final long timeout, final TimeUnit unit,
143+
final A attach, final CompletionHandler<Long, ? super A> handler) {
144+
channel.read(dsts, offset, length, timeout, unit, attach, handler);
269145
}
270-
}
271146

272-
private class OpenCompletionHandler extends BaseCompletionHandler<Void, Void, Object> {
273-
private final Queue<SocketAddress> socketAddressQueue;
274-
private final AsynchronousSocketChannel attemptConnectionChannel;
147+
@Override
148+
public <A> void write(final ByteBuffer src, final long timeout, final TimeUnit unit, final A attach,
149+
final CompletionHandler<Integer, ? super A> handler) {
150+
channel.write(src, timeout, unit, attach, handler);
151+
}
275152

276-
OpenCompletionHandler(final AsyncCompletionHandler<Void> handler, final Queue<SocketAddress> socketAddressQueue,
277-
final AsynchronousSocketChannel attemptConnectionChannel) {
278-
super(handler);
279-
this.socketAddressQueue = socketAddressQueue;
280-
this.attemptConnectionChannel = attemptConnectionChannel;
153+
@Override
154+
public <A> void write(final ByteBuffer[] srcs, final int offset, final int length, final long timeout, final TimeUnit unit,
155+
final A attach, final CompletionHandler<Long, ? super A> handler) {
156+
channel.write(srcs, offset, length, timeout, unit, attach, handler);
281157
}
282158

283159
@Override
284-
public void completed(final Void result, final Object attachment) {
285-
channel = attemptConnectionChannel;
286-
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
287-
localHandler.completed(null);
160+
public <A> void read(final ByteBuffer dst, final A attachment, final CompletionHandler<Integer, ? super A> handler) {
161+
channel.read(dst, attachment, handler);
288162
}
289163

290164
@Override
291-
public void failed(final Throwable exc, final Object attachment) {
292-
AsyncCompletionHandler<Void> localHandler = getHandlerAndClear();
165+
public Future<Integer> read(final ByteBuffer dst) {
166+
return channel.read(dst);
167+
}
293168

294-
if (socketAddressQueue.isEmpty()) {
295-
if (exc instanceof IOException) {
296-
localHandler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc));
297-
} else {
298-
localHandler.failed(exc);
299-
}
300-
} else {
301-
initializeSocketChannel(localHandler, socketAddressQueue);
302-
}
169+
@Override
170+
public <A> void write(final ByteBuffer src, final A attachment, final CompletionHandler<Integer, ? super A> handler) {
171+
channel.write(src, attachment, handler);
303172
}
304-
}
305173

306-
// Private base class for all CompletionHandler implementors that ensures the upstream handler is
307-
// set to null before it is used. This is to work around an observed issue with implementations of
308-
// AsynchronousSocketChannel that fail to clear references to handlers stored in instance fields of
309-
// the class.
310-
private abstract static class BaseCompletionHandler<T, V, A> implements CompletionHandler<V, A> {
311-
private final AtomicReference<AsyncCompletionHandler<T>> handlerReference;
174+
@Override
175+
public Future<Integer> write(final ByteBuffer src) {
176+
return channel.write(src);
177+
}
312178

313-
BaseCompletionHandler(final AsyncCompletionHandler<T> handler) {
314-
this.handlerReference = new AtomicReference<AsyncCompletionHandler<T>>(handler);
179+
@Override
180+
public boolean isOpen() {
181+
return channel.isOpen();
315182
}
316183

317-
protected AsyncCompletionHandler<T> getHandlerAndClear() {
318-
return handlerReference.getAndSet(null);
184+
@Override
185+
public void close() throws IOException {
186+
channel.close();
319187
}
320188
}
321189
}

0 commit comments

Comments
 (0)