Skip to content

Commit 70bf114

Browse files
committed
JAVA-2327: Remove race conditions in closing InternalConnection implementations
1 parent caa40b6 commit 70bf114

File tree

3 files changed

+26
-34
lines changed

3 files changed

+26
-34
lines changed

driver-core/src/main/com/mongodb/connection/DefaultConnectionPool.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.Executors;
4545
import java.util.concurrent.ScheduledExecutorService;
4646
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicBoolean;
4748
import java.util.concurrent.atomic.AtomicInteger;
4849

4950
import static com.mongodb.assertions.Assertions.isTrue;
@@ -372,47 +373,48 @@ private ConnectionId getId(final InternalConnection internalConnection) {
372373
}
373374

374375
private class PooledConnection implements InternalConnection {
375-
private volatile UsageTrackingInternalConnection wrapped;
376+
private final UsageTrackingInternalConnection wrapped;
377+
private final AtomicBoolean isClosed = new AtomicBoolean();
376378

377379
public PooledConnection(final UsageTrackingInternalConnection wrapped) {
378380
this.wrapped = notNull("wrapped", wrapped);
379381
}
380382

381383
@Override
382384
public void open() {
383-
isTrue("open", wrapped != null);
385+
isTrue("open", !isClosed.get());
384386
wrapped.open();
385387
}
386388

387389
@Override
388390
public void openAsync(final SingleResultCallback<Void> callback) {
389-
isTrue("open", wrapped != null);
391+
isTrue("open", !isClosed.get());
390392
wrapped.openAsync(callback);
391393
}
392394

393395
@Override
394396
public void close() {
395-
if (wrapped != null) {
396-
if (!closed) {
397+
// All but the first call is a no-op
398+
if (!isClosed.getAndSet(true)) {
399+
if (!DefaultConnectionPool.this.closed) {
397400
connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped)));
398401
if (LOGGER.isTraceEnabled()) {
399402
LOGGER.trace(format("Checked in connection [%s] to server %s", getId(wrapped), serverId.getAddress()));
400403
}
401404
}
402405
pool.release(wrapped, wrapped.isClosed() || shouldPrune(wrapped));
403-
wrapped = null;
404406
}
405407
}
406408

407409
@Override
408410
public boolean opened() {
409-
isTrue("open", wrapped != null);
411+
isTrue("open", !isClosed.get());
410412
return wrapped.opened();
411413
}
412414

413415
@Override
414416
public boolean isClosed() {
415-
return wrapped == null || wrapped.isClosed();
417+
return isClosed.get() || wrapped.isClosed();
416418
}
417419

418420
@Override
@@ -422,7 +424,7 @@ public ByteBuf getBuffer(final int capacity) {
422424

423425
@Override
424426
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
425-
isTrue("open", wrapped != null);
427+
isTrue("open", !isClosed.get());
426428
try {
427429
wrapped.sendMessage(byteBuffers, lastRequestId);
428430
} catch (MongoException e) {
@@ -433,7 +435,7 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
433435

434436
@Override
435437
public ResponseBuffers receiveMessage(final int responseTo) {
436-
isTrue("open", wrapped != null);
438+
isTrue("open", !isClosed.get());
437439
try {
438440
return wrapped.receiveMessage(responseTo);
439441
} catch (MongoException e) {
@@ -444,7 +446,7 @@ public ResponseBuffers receiveMessage(final int responseTo) {
444446

445447
@Override
446448
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
447-
isTrue("open", wrapped != null);
449+
isTrue("open", !isClosed.get());
448450
wrapped.sendMessageAsync(byteBuffers, lastRequestId, new SingleResultCallback<Void>() {
449451
@Override
450452
public void onResult(final Void result, final Throwable t) {
@@ -458,7 +460,7 @@ public void onResult(final Void result, final Throwable t) {
458460

459461
@Override
460462
public void receiveMessageAsync(final int responseTo, final SingleResultCallback<ResponseBuffers> callback) {
461-
isTrue("open", wrapped != null);
463+
isTrue("open", !isClosed.get());
462464
wrapped.receiveMessageAsync(responseTo, new SingleResultCallback<ResponseBuffers>() {
463465
@Override
464466
public void onResult(final ResponseBuffers result, final Throwable t) {
@@ -472,7 +474,7 @@ public void onResult(final ResponseBuffers result, final Throwable t) {
472474

473475
@Override
474476
public ConnectionDescription getDescription() {
475-
isTrue("open", wrapped != null);
477+
isTrue("open", !isClosed.get());
476478
return wrapped.getDescription();
477479
}
478480
}

driver-core/src/main/com/mongodb/connection/InternalStreamConnection.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,16 @@ public void failed(final Throwable t) {
168168

169169
@Override
170170
public void close() {
171-
if (LOGGER.isDebugEnabled()) {
172-
LOGGER.debug(String.format("Closing connection %s", getId()));
173-
}
174-
if (stream != null) {
175-
stream.close();
171+
// All but the first call is a no-op
172+
if (!isClosed.getAndSet(true)) {
173+
if (LOGGER.isDebugEnabled()) {
174+
LOGGER.debug(String.format("Closing connection %s", getId()));
175+
}
176+
if (stream != null) {
177+
stream.close();
178+
}
179+
connectionListener.connectionClosed(new ConnectionClosedEvent(getId()));
176180
}
177-
isClosed.set(true);
178-
connectionListener.connectionClosed(new ConnectionClosedEvent(getId()));
179181
}
180182

181183
@Override

driver-core/src/main/com/mongodb/connection/UsageTrackingInternalConnection.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.util.List;
2525

26-
import static com.mongodb.assertions.Assertions.isTrue;
2726
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
2827

2928
/**
@@ -34,7 +33,7 @@ class UsageTrackingInternalConnection implements InternalConnection {
3433
private volatile long openedAt;
3534
private volatile long lastUsedAt;
3635
private final int generation;
37-
private volatile InternalConnection wrapped;
36+
private final InternalConnection wrapped;
3837

3938
UsageTrackingInternalConnection(final InternalConnection wrapped, final int generation) {
4039
this.wrapped = wrapped;
@@ -45,15 +44,13 @@ class UsageTrackingInternalConnection implements InternalConnection {
4544

4645
@Override
4746
public void open() {
48-
isTrue("open", wrapped != null);
4947
wrapped.open();
5048
openedAt = System.currentTimeMillis();
5149
lastUsedAt = openedAt;
5250
}
5351

5452
@Override
5553
public void openAsync(final SingleResultCallback<Void> callback) {
56-
isTrue("open", wrapped != null);
5754
wrapped.openAsync(new SingleResultCallback<Void>() {
5855
@Override
5956
public void onResult(final Void result, final Throwable t) {
@@ -70,46 +67,39 @@ public void onResult(final Void result, final Throwable t) {
7067

7168
@Override
7269
public void close() {
73-
isTrue("open", wrapped != null);
7470
wrapped.close();
75-
wrapped = null;
7671
}
7772

7873
@Override
7974
public boolean opened() {
80-
isTrue("open", wrapped != null);
8175
return wrapped.opened();
8276
}
8377

8478
@Override
8579
public boolean isClosed() {
86-
return wrapped == null || wrapped.isClosed();
80+
return wrapped.isClosed();
8781
}
8882

8983
@Override
9084
public ByteBuf getBuffer(final int size) {
91-
isTrue("open", wrapped != null);
9285
return wrapped.getBuffer(size);
9386
}
9487

9588
@Override
9689
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
97-
isTrue("open", wrapped != null);
9890
wrapped.sendMessage(byteBuffers, lastRequestId);
9991
lastUsedAt = System.currentTimeMillis();
10092
}
10193

10294
@Override
10395
public ResponseBuffers receiveMessage(final int responseTo) {
104-
isTrue("open", wrapped != null);
10596
ResponseBuffers responseBuffers = wrapped.receiveMessage(responseTo);
10697
lastUsedAt = System.currentTimeMillis();
10798
return responseBuffers;
10899
}
109100

110101
@Override
111102
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
112-
isTrue("open", wrapped != null);
113103
SingleResultCallback<Void> errHandlingCallback = errorHandlingCallback(new SingleResultCallback<Void>() {
114104
@Override
115105
public void onResult(final Void result, final Throwable t) {
@@ -122,7 +112,6 @@ public void onResult(final Void result, final Throwable t) {
122112

123113
@Override
124114
public void receiveMessageAsync(final int responseTo, final SingleResultCallback<ResponseBuffers> callback) {
125-
isTrue("open", wrapped != null);
126115
SingleResultCallback<ResponseBuffers> errHandlingCallback = errorHandlingCallback(new SingleResultCallback<ResponseBuffers>() {
127116
@Override
128117
public void onResult(final ResponseBuffers result, final Throwable t) {
@@ -135,7 +124,6 @@ public void onResult(final ResponseBuffers result, final Throwable t) {
135124

136125
@Override
137126
public ConnectionDescription getDescription() {
138-
isTrue("open", wrapped != null);
139127
return wrapped.getDescription();
140128
}
141129

0 commit comments

Comments
 (0)