Skip to content

Commit 823ea62

Browse files
abalanonlinestIncMalerozzaCopilot
authored
Properly handle exceptions in threads created by MongoClient (#1764)
JAVA-5913 --------- Co-authored-by: Valentin Kovalenko <[email protected]> Co-authored-by: Ross Lawley <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 8d7c0dc commit 823ea62

File tree

9 files changed

+177
-133
lines changed

9 files changed

+177
-133
lines changed

driver-core/src/main/com/mongodb/internal/connection/AsynchronousClusterEventListener.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.mongodb.event.ServerMonitorListener;
3232
import com.mongodb.event.ServerOpeningEvent;
3333
import com.mongodb.internal.VisibleForTesting;
34+
import com.mongodb.internal.diagnostics.logging.Logger;
35+
import com.mongodb.internal.diagnostics.logging.Loggers;
3436

3537
import java.util.concurrent.BlockingQueue;
3638
import java.util.concurrent.LinkedBlockingQueue;
@@ -53,6 +55,8 @@
5355
*/
5456
@ThreadSafe
5557
final class AsynchronousClusterEventListener implements ClusterListener, ServerListener, ServerMonitorListener {
58+
private static final Logger LOGGER = Loggers.getLogger("cluster");
59+
5660
private final BlockingQueue<Supplier<Boolean>> eventPublishers = new LinkedBlockingQueue<>();
5761
private final ClusterListener clusterListener;
5862
private final ServerListener serverListener;
@@ -162,16 +166,21 @@ private void addEvent(final Supplier<Boolean> supplier) {
162166
}
163167

164168
private void publishEvents() {
165-
while (true) {
166-
try {
167-
Supplier<Boolean> eventPublisher = eventPublishers.take();
168-
boolean isLastEvent = eventPublisher.get();
169-
if (isLastEvent) {
170-
break;
169+
try {
170+
while (true) {
171+
try {
172+
Supplier<Boolean> eventPublisher = eventPublishers.take();
173+
boolean isLastEvent = eventPublisher.get();
174+
if (isLastEvent) {
175+
break;
176+
}
177+
} catch (Exception e) {
178+
// ignore exceptions thrown from listeners, also ignore interrupts that user code may cause
171179
}
172-
} catch (Exception e) {
173-
// ignore exceptions thrown from listeners, also ignore interrupts that user code may cause
174180
}
181+
} catch (Throwable t) {
182+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
183+
throw t;
175184
}
176185
}
177186
}

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -501,39 +501,44 @@ private final class WaitQueueHandler implements Runnable {
501501
}
502502

503503
public void run() {
504-
while (!isClosed) {
505-
CountDownLatch currentPhase = phase.get();
506-
ClusterDescription curDescription = description;
504+
try {
505+
while (!isClosed) {
506+
CountDownLatch currentPhase = phase.get();
507+
ClusterDescription curDescription = description;
508+
509+
Timeout timeout = Timeout.infinite();
510+
boolean someWaitersNotSatisfied = false;
511+
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
512+
ServerSelectionRequest currentRequest = iter.next();
513+
if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) {
514+
iter.remove();
515+
} else {
516+
someWaitersNotSatisfied = true;
517+
timeout = Timeout.earliest(
518+
timeout,
519+
currentRequest.getTimeout(),
520+
startMinWaitHeartbeatTimeout());
521+
}
522+
}
507523

508-
Timeout timeout = Timeout.infinite();
509-
boolean someWaitersNotSatisfied = false;
510-
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
511-
ServerSelectionRequest currentRequest = iter.next();
512-
if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) {
513-
iter.remove();
514-
} else {
515-
someWaitersNotSatisfied = true;
516-
timeout = Timeout.earliest(
517-
timeout,
518-
currentRequest.getTimeout(),
519-
startMinWaitHeartbeatTimeout());
524+
if (someWaitersNotSatisfied) {
525+
connect();
520526
}
521-
}
522527

523-
if (someWaitersNotSatisfied) {
524-
connect();
528+
try {
529+
timeout.awaitOn(currentPhase, () -> "ignored");
530+
} catch (MongoInterruptedException closed) {
531+
// The cluster has been closed and the while loop will exit.
532+
}
525533
}
526-
527-
try {
528-
timeout.awaitOn(currentPhase, () -> "ignored");
529-
} catch (MongoInterruptedException closed) {
530-
// The cluster has been closed and the while loop will exit.
534+
// Notify all remaining waiters that a shutdown is in progress
535+
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
536+
iter.next().onResult(null, new MongoClientException("Shutdown in progress"));
537+
iter.remove();
531538
}
532-
}
533-
// Notify all remaining waiters that a shutdown is in progress
534-
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
535-
iter.next().onResult(null, new MongoClientException("Shutdown in progress"));
536-
iter.remove();
539+
} catch (Throwable t) {
540+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
541+
throw t;
537542
}
538543
}
539544
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ private void runAndLogUncaught(final Runnable runnable) {
13711371
try {
13721372
runnable.run();
13731373
} catch (Throwable t) {
1374-
LOGGER.error("The pool is not going to work correctly from now on. You may want to recreate the MongoClient", t);
1374+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
13751375
throw t;
13761376
}
13771377
}

driver-core/src/main/com/mongodb/internal/connection/DefaultDnsSrvRecordMonitor.java

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,41 +75,46 @@ private class DnsSrvRecordMonitorRunnable implements Runnable {
7575

7676
@Override
7777
public void run() {
78-
while (!isClosed && shouldContinueMonitoring()) {
79-
try {
80-
List<String> resolvedHostNames = dnsResolver.resolveHostFromSrvRecords(hostName, srvServiceName);
81-
Set<ServerAddress> hosts = createServerAddressSet(resolvedHostNames);
82-
83-
if (isClosed) {
84-
return;
85-
}
78+
try {
79+
while (!isClosed && shouldContinueMonitoring()) {
80+
try {
81+
List<String> resolvedHostNames = dnsResolver.resolveHostFromSrvRecords(hostName, srvServiceName);
82+
Set<ServerAddress> hosts = createServerAddressSet(resolvedHostNames);
83+
84+
if (isClosed) {
85+
return;
86+
}
8687

87-
if (!hosts.equals(currentHosts)) {
88-
try {
89-
dnsSrvRecordInitializer.initialize(unmodifiableSet(hosts));
90-
currentHosts = hosts;
91-
} catch (Exception e) {
92-
LOGGER.warn("Exception in monitor thread during notification of DNS resolution state change", e);
88+
if (!hosts.equals(currentHosts)) {
89+
try {
90+
dnsSrvRecordInitializer.initialize(unmodifiableSet(hosts));
91+
currentHosts = hosts;
92+
} catch (Exception e) {
93+
LOGGER.warn("Exception in monitor thread during notification of DNS resolution state change", e);
94+
}
9395
}
96+
} catch (MongoException e) {
97+
if (currentHosts.isEmpty()) {
98+
dnsSrvRecordInitializer.initialize(e);
99+
}
100+
LOGGER.info("Exception while resolving SRV records", e);
101+
} catch (Exception e) {
102+
if (currentHosts.isEmpty()) {
103+
dnsSrvRecordInitializer.initialize(new MongoInternalException("Unexpected runtime exception", e));
104+
}
105+
LOGGER.info("Unexpected runtime exception while resolving SRV record", e);
94106
}
95-
} catch (MongoException e) {
96-
if (currentHosts.isEmpty()) {
97-
dnsSrvRecordInitializer.initialize(e);
98-
}
99-
LOGGER.info("Exception while resolving SRV records", e);
100-
} catch (Exception e) {
101-
if (currentHosts.isEmpty()) {
102-
dnsSrvRecordInitializer.initialize(new MongoInternalException("Unexpected runtime exception", e));
103-
}
104-
LOGGER.info("Unexpected runtime exception while resolving SRV record", e);
105-
}
106107

107-
try {
108-
Thread.sleep(getRescanFrequencyMillis());
109-
} catch (InterruptedException closed) {
110-
// fall through
108+
try {
109+
Thread.sleep(getRescanFrequencyMillis());
110+
} catch (InterruptedException closed) {
111+
// fall through
112+
}
113+
clusterType = dnsSrvRecordInitializer.getClusterType();
111114
}
112-
clusterType = dnsSrvRecordInitializer.getClusterType();
115+
} catch (Throwable t) {
116+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
117+
throw t;
113118
}
114119
}
115120

driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,9 @@ public void run() {
231231
}
232232
} catch (InterruptedException | MongoInterruptedException closed) {
233233
// stop the monitor
234-
} catch (RuntimeException e) {
235-
LOGGER.error(format("Server monitor for %s exiting with exception", serverId), e);
234+
} catch (Throwable t) {
235+
LOGGER.error(format("%s for %s stopped working. You may want to recreate the MongoClient", this, serverId), t);
236+
throw t;
236237
} finally {
237238
if (connection != null) {
238239
connection.close();
@@ -261,7 +262,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
261262

262263
// Get existing connection
263264
return doHeartbeat(currentServerDescription, shouldStreamResponses);
264-
} catch (Throwable t) {
265+
} catch (Exception t) {
265266
roundTripTimeSampler.reset();
266267
InternalConnection localConnection = withLock(lock, () -> {
267268
InternalConnection result = connection;
@@ -532,7 +533,7 @@ public void run() {
532533
} else {
533534
pingServer(connection);
534535
}
535-
} catch (Throwable t) {
536+
} catch (Exception t) {
536537
if (connection != null) {
537538
connection.close();
538539
connection = null;
@@ -542,6 +543,9 @@ public void run() {
542543
}
543544
} catch (InterruptedException closed) {
544545
// stop the monitor
546+
} catch (Throwable t) {
547+
LOGGER.error(format("%s for %s stopped working. You may want to recreate the MongoClient", this, serverId), t);
548+
throw t;
545549
} finally {
546550
if (connection != null) {
547551
connection.close();

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -363,55 +363,60 @@ private void notifyWaitQueueHandler(final ServerSelectionRequest request) {
363363

364364
private final class WaitQueueHandler implements Runnable {
365365
public void run() {
366-
List<ServerSelectionRequest> timeoutList = new ArrayList<>();
367-
while (!(isClosed() || initializationCompleted)) {
368-
lock.lock();
369-
try {
370-
if (isClosed() || initializationCompleted) {
371-
break;
372-
}
373-
Timeout waitTimeNanos = Timeout.infinite();
374-
375-
for (Iterator<ServerSelectionRequest> iterator = waitQueue.iterator(); iterator.hasNext();) {
376-
ServerSelectionRequest next = iterator.next();
377-
378-
Timeout nextTimeout = next.getTimeout();
379-
Timeout waitTimeNanosFinal = waitTimeNanos;
380-
waitTimeNanos = nextTimeout.call(NANOSECONDS,
381-
() -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
382-
(ns) -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
383-
() -> {
384-
timeoutList.add(next);
385-
iterator.remove();
386-
return waitTimeNanosFinal;
387-
});
388-
}
389-
if (timeoutList.isEmpty()) {
390-
try {
391-
waitTimeNanos.awaitOn(condition, () -> "ignored");
392-
} catch (MongoInterruptedException unexpected) {
393-
fail();
366+
try {
367+
List<ServerSelectionRequest> timeoutList = new ArrayList<>();
368+
while (!(isClosed() || initializationCompleted)) {
369+
lock.lock();
370+
try {
371+
if (isClosed() || initializationCompleted) {
372+
break;
373+
}
374+
Timeout waitTimeNanos = Timeout.infinite();
375+
376+
for (Iterator<ServerSelectionRequest> iterator = waitQueue.iterator(); iterator.hasNext();) {
377+
ServerSelectionRequest next = iterator.next();
378+
379+
Timeout nextTimeout = next.getTimeout();
380+
Timeout waitTimeNanosFinal = waitTimeNanos;
381+
waitTimeNanos = nextTimeout.call(NANOSECONDS,
382+
() -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
383+
(ns) -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
384+
() -> {
385+
timeoutList.add(next);
386+
iterator.remove();
387+
return waitTimeNanosFinal;
388+
});
389+
}
390+
if (timeoutList.isEmpty()) {
391+
try {
392+
waitTimeNanos.awaitOn(condition, () -> "ignored");
393+
} catch (MongoInterruptedException unexpected) {
394+
fail();
395+
}
394396
}
397+
} finally {
398+
lock.unlock();
395399
}
396-
} finally {
397-
lock.unlock();
400+
timeoutList.forEach(request -> request.onError(createTimeoutException(request
401+
.getOperationContext()
402+
.getTimeoutContext())));
403+
timeoutList.clear();
398404
}
399-
timeoutList.forEach(request -> request.onError(createTimeoutException(request
400-
.getOperationContext()
401-
.getTimeoutContext())));
402-
timeoutList.clear();
403-
}
404405

405-
// This code is executed either after closing the LoadBalancedCluster or after initializing it. In the latter case,
406-
// waitQueue is guaranteed to be empty (as DnsSrvRecordInitializer.initialize clears it and no thread adds new elements to
407-
// it after that). So shutdownList is not empty iff LoadBalancedCluster is closed, in which case we need to complete the
408-
// requests in it.
409-
List<ServerSelectionRequest> shutdownList = Locks.withLock(lock, () -> {
410-
ArrayList<ServerSelectionRequest> result = new ArrayList<>(waitQueue);
411-
waitQueue.clear();
412-
return result;
413-
});
414-
shutdownList.forEach(request -> request.onError(createShutdownException()));
406+
// This code is executed either after closing the LoadBalancedCluster or after initializing it. In the latter case,
407+
// waitQueue is guaranteed to be empty (as DnsSrvRecordInitializer.initialize clears it and no thread adds new elements to
408+
// it after that). So shutdownList is not empty iff LoadBalancedCluster is closed, in which case we need to complete the
409+
// requests in it.
410+
List<ServerSelectionRequest> shutdownList = Locks.withLock(lock, () -> {
411+
ArrayList<ServerSelectionRequest> result = new ArrayList<>(waitQueue);
412+
waitQueue.clear();
413+
return result;
414+
});
415+
shutdownList.forEach(request -> request.onError(createShutdownException()));
416+
} catch (Throwable t) {
417+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
418+
throw t;
419+
}
415420
}
416421
}
417422

driver-core/src/main/com/mongodb/internal/connection/PowerOfTwoBufferPool.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.internal.diagnostics.logging.Logger;
20+
import com.mongodb.internal.diagnostics.logging.Loggers;
1921
import com.mongodb.internal.thread.DaemonThreadFactory;
2022
import org.bson.ByteBuf;
2123
import org.bson.ByteBufNIO;
@@ -34,6 +36,7 @@
3436
* <p>This class is not part of the public API and may be removed or changed at any time</p>
3537
*/
3638
public class PowerOfTwoBufferPool implements BufferProvider {
39+
private static final Logger LOGGER = Loggers.getLogger("connection");
3740

3841
/**
3942
* The global default pool. Pruning is enabled on this pool. Idle buffers are pruned after one minute.
@@ -137,7 +140,12 @@ public void release(final ByteBuffer buffer) {
137140
}
138141

139142
private void prune() {
140-
powerOfTwoToPoolMap.values().forEach(BufferPool::prune);
143+
try {
144+
powerOfTwoToPoolMap.values().forEach(BufferPool::prune);
145+
} catch (Throwable t) {
146+
LOGGER.error(this + " stopped pruning idle buffer pools. You may want to recreate the MongoClient", t);
147+
throw t;
148+
}
141149
}
142150

143151
static int log2(final int powerOfTwo) {

driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ void start() {
162162
LOGGER.warn("Exception in selector loop", e);
163163
}
164164
}
165+
} catch (Throwable t) {
166+
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
167+
throw t;
165168
} finally {
166169
try {
167170
selector.close();

0 commit comments

Comments
 (0)