Skip to content

Commit f654812

Browse files
authored
Merge pull request boskworks#235 from prdoyle/timeouts
Revamp Mongo timeouts
2 parents f1da858 + 3eb7c77 commit f654812

File tree

10 files changed

+380
-58
lines changed

10 files changed

+380
-58
lines changed

bosk-mongo/src/main/java/works/bosk/drivers/mongo/internal/ChangeReceiver.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.TimeoutException;
1616
import java.util.concurrent.atomic.AtomicLong;
1717
import org.bson.BsonDocument;
18+
import org.jetbrains.annotations.Nullable;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021
import org.slf4j.MDC;
@@ -56,6 +57,7 @@ class ChangeReceiver implements Closeable {
5657
private final MongoCollection<BsonDocument> collection;
5758
private final ScheduledExecutorService ex = Executors.newScheduledThreadPool(1);
5859
private final Exception creationPoint;
60+
private volatile @Nullable Thread thread = null;
5961
private volatile boolean isClosed = false;
6062

6163
ChangeReceiver(String boskName, Identifier boskID, ChangeListener listener, MongoDriverSettings settings, MongoCollection<BsonDocument> collection) {
@@ -70,6 +72,7 @@ class ChangeReceiver implements Closeable {
7072
// to ensure the database is set up for change streams.
7173
probeChangeStreamCursor();
7274
}
75+
LOGGER.debug("Scheduling ChangeReceiver connectionLoop task with {} ms interval", settings.timescaleMS());
7376
ex.scheduleWithFixedDelay(
7477
this::connectionLoop,
7578
0,
@@ -78,6 +81,23 @@ class ChangeReceiver implements Closeable {
7881
);
7982
}
8083

84+
/**
85+
* If the connectionLoop is running, interrupt it.
86+
* (Otherwise it's going to reinitialize on its own anyway,
87+
* so there's no need to do anything because the effect is the same.)
88+
*/
89+
public void interrupt() {
90+
Thread t = thread;
91+
if (t == null) {
92+
LOGGER.debug("ChangeReceiver thread is not running; no need to interrupt");
93+
} else if (t == currentThread()) {
94+
LOGGER.debug("ChangeReceiver thread is doing the disconnecting; no need to interrupt");
95+
} else {
96+
LOGGER.debug("Interrupting ChangeReceiver thread {}", t.getName());
97+
t.interrupt();
98+
}
99+
}
100+
81101
private void probeChangeStreamCursor() {
82102
try (var _ = openCursor()) {
83103
LOGGER.debug("Successfully opened MongoDB cursor");
@@ -109,6 +129,7 @@ private void connectionLoop() {
109129
try (MDCScope _ = setupMDC(boskName, boskID)) {
110130
LOGGER.debug("Starting connectionLoop task");
111131
try {
132+
thread = currentThread();
112133
while (!isClosed) {
113134
// Design notes:
114135
//
@@ -197,6 +218,7 @@ private void connectionLoop() {
197218
} finally {
198219
LOGGER.debug("Ending connectionLoop task; isClosed={}", isClosed);
199220
currentThread().setName(oldThreadName);
221+
thread = null;
200222
}
201223
} catch (RuntimeException e) {
202224
addContextToException(e);
@@ -221,7 +243,6 @@ private void addContextToException(Throwable x) {
221243
private MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor() {
222244
MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> result = collection
223245
.watch()
224-
.maxAwaitTime(settings.timescaleMS(), MILLISECONDS)
225246
.cursor();
226247
LOGGER.debug("Cursor is open");
227248
return result;

bosk-mongo/src/main/java/works/bosk/drivers/mongo/internal/MainDriver.java

Lines changed: 86 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public final class MainDriver<R extends StateTreeNode> implements MongoDriver {
7878
final Formatter formatter;
7979

8080
final long flushTimeout;
81-
final long initialRootTimeout;
82-
final long driverReinitializeTimeout;
81+
final long initializeTimeout;
82+
final long reinitializationTimeout;
8383

8484
/**
8585
* {@link MongoClient#close()} throws if called more than once.
@@ -123,26 +123,77 @@ public MainDriver(
123123
this.bsonSerializer = bsonSerializer;
124124
this.downstream = downstream;
125125

126-
MongoClientSettings.Builder builder = MongoClientSettings
127-
.builder(clientSettings);
126+
// Flushes work by waiting for the latest version to arrive on the change stream.
127+
// If we wait for two heartbeats and don't see the update, something has gone wrong.
128+
//
129+
// (Note that flush does a retry, so it will actually take
130+
// twice as long as this before throwing a FlushFailureException.)
131+
flushTimeout = 2L * driverSettings.timescaleMS();
132+
133+
// Initialization must wait for a heartbeat to succeed, so we wait twice that long,
134+
// plus one more for the network connection and initial query.
135+
initializeTimeout = 3L * driverSettings.timescaleMS();
136+
137+
// The sum of the steps required to reinitialize after a disconnect,
138+
// plus a little extra to make sure we don't cut it off when it's about to succeed.
139+
reinitializationTimeout =
140+
2L * driverSettings.timescaleMS() // ChangeStream reconnect
141+
+ initializeTimeout // Initialize after reconnecting
142+
+ driverSettings.timescaleMS() // Extra buffer
143+
;
144+
145+
MongoClientSettings.Builder commonSettingsBuilder = MongoClientSettings
146+
.builder(clientSettings)
147+
.applyToServerSettings(s ->
148+
// If timescaleMS is shorter than the default min heartbeat,
149+
// then we need to reduce this setting to prevent the client
150+
// from using a stale view of the server state for too long.
151+
// If timescaleMS is longer, then the user has told us
152+
// they don't mind longer delays and want the increased
153+
// efficiency of fewer heartbeats.
154+
// Either way, timescaleMS is the right value for this setting.
155+
//
156+
// Note that this doesn't set the heartbeat frequency itself.
157+
// That is left at the default value, since it is only used
158+
// to "notice" connectivity problems when the driver is quiescent,
159+
// which is not time-critical and is not governed by timescaleMS:
160+
// the actual behaviour of the bosk during a network partition
161+
// is that its contents remain fixed, and it doesn't matter much
162+
// whether that is achieved by formally disconnecting or simply
163+
// by doing nothing.
164+
s.minHeartbeatFrequency(driverSettings.timescaleMS(), MILLISECONDS))
165+
;
128166

129167
// By default, we deal only with durable data that won't get rolled back.
130168
// In some circumstances, we need the very latest possible data for correctness,
131169
// so we override the ReadConcern in those cases.
132-
builder
170+
commonSettingsBuilder
133171
.readConcern(ReadConcern.MAJORITY)
134172
.writeConcern(WriteConcern.MAJORITY);
135173

174+
var changeStreamSettingsBuilder = MongoClientSettings.builder(commonSettingsBuilder.build())
175+
.applyToClusterSettings(c ->
176+
c.serverSelectionTimeout(initializeTimeout, MILLISECONDS))
177+
.applyToSocketSettings(s ->
178+
s.connectTimeout(initializeTimeout, MILLISECONDS)
179+
// No read timeout for change streams; they can be idle indefinitely
180+
.readTimeout(0, MILLISECONDS))
181+
;
182+
183+
var changeStreamClient = MongoClients.create(changeStreamSettingsBuilder.build());
184+
closeables.addFirst(changeStreamClient);
185+
136186
// Override timeouts to make them compatible with driverSettings.timescaleMS()
137-
builder
138-
.timeout(2L * driverSettings.timescaleMS(), MILLISECONDS);
187+
var querySettingsBuilder = MongoClientSettings.builder(commonSettingsBuilder.build());
188+
querySettingsBuilder
189+
.timeout(flushTimeout, MILLISECONDS);
190+
191+
var queryClient = MongoClients.create(querySettingsBuilder.build());
192+
closeables.addFirst(queryClient);
139193

140-
var mongoClient = MongoClients.create(builder.build());
141-
closeables.addFirst(mongoClient);
142-
MongoCollection<BsonDocument> changeStreamCollection = mongoClient
194+
this.queryCollection = TransactionalCollection.of(queryClient
143195
.getDatabase(driverSettings.database())
144-
.getCollection(COLLECTION_NAME, BsonDocument.class);
145-
this.queryCollection = TransactionalCollection.of(changeStreamCollection, mongoClient);
196+
.getCollection(COLLECTION_NAME, BsonDocument.class), queryClient);
146197
LOGGER.debug("Using database \"{}\" collection \"{}\"", driverSettings.database(), COLLECTION_NAME);
147198

148199
this.formatter = new Formatter(boskInfo, bsonSerializer);
@@ -153,20 +204,13 @@ public MainDriver(
153204
if (factory != null) {
154205
listener = factory.apply(listener);
155206
}
207+
208+
MongoCollection<BsonDocument> changeStreamCollection = changeStreamClient
209+
.getDatabase(driverSettings.database())
210+
.getCollection(COLLECTION_NAME, BsonDocument.class);
156211
this.receiver = new ChangeReceiver(boskInfo.name(), boskInfo.instanceID(), listener, driverSettings, changeStreamCollection);
157212
}
158213

159-
// Flushes work by waiting for the latest version to arrive on the change stream.
160-
// If we wait twice as long as that takes, and we don't see the update, something
161-
// has gone wrong.
162-
flushTimeout = 2L * driverSettings.timescaleMS();
163-
164-
// TODO: Justify this calculation.
165-
initialRootTimeout = 5L * driverSettings.timescaleMS();
166-
167-
// The ChangeStream resets itself after timescaleMS, so it needs
168-
// several times that long to restore itself and publish a new driver.
169-
driverReinitializeTimeout = 5L * driverSettings.timescaleMS();
170214
}
171215

172216
@Override
@@ -478,7 +522,7 @@ public void onConnectionSucceeded() throws
478522
private void runInitialRootAction(FutureTask<R> initialRootAction) throws InterruptedException, TimeoutException, InitialRootActionException {
479523
initialRootAction.run();
480524
try {
481-
initialRootAction.get(initialRootTimeout, MILLISECONDS);
525+
initialRootAction.get(initializeTimeout, MILLISECONDS);
482526
LOGGER.debug("initialRoot action completed successfully");
483527
} catch (ExecutionException e) {
484528
LOGGER.debug("initialRoot action failed", e);
@@ -544,6 +588,7 @@ private FormatDriver<R> newPreferredFormatDriver() {
544588
}
545589

546590
private FormatDriver<R> detectFormat() throws UninitializedCollectionException, UnrecognizedFormatException {
591+
LOGGER.debug("Detecting format");
547592
Manifest manifest = loadManifest();
548593
DatabaseFormat format = manifest.pando().isPresent()? manifest.pando().get() : SEQUOIA;
549594
BsonString documentId = (format == SEQUOIA)
@@ -621,7 +666,7 @@ private MDCScope beginDriverOperation(String description, Object... args) {
621666
throw new IllegalStateException("Driver is closed");
622667
}
623668
MDCScope ex = setupMDC(boskInfo.name(), boskInfo.instanceID());
624-
LOGGER.debug(description, args);
669+
LOGGER.debug(description + " w/" + this.formatDriver.getClass().getSimpleName(), args);
625670
if (driverSettings.testing().eventDelayMS() < 0) {
626671
LOGGER.debug("| eventDelayMS {}ms ", driverSettings.testing().eventDelayMS());
627672
try {
@@ -655,8 +700,9 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
655700
throw new DisconnectedException(e);
656701
}
657702
} else {
658-
LOGGER.debug("MongoException is not recoverable; rethrowing", e);
659-
throw e;
703+
LOGGER.debug("MongoException is not recoverable; disconnecting", e);
704+
setDisconnectedDriver(e);
705+
throw new DisconnectedException(e);
660706
}
661707
}
662708
break;
@@ -680,8 +726,8 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
680726
private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOperation<X, Y> operation, String description, Object... args) throws X, Y {
681727
try {
682728
formatDriverLock.lock();
683-
LOGGER.debug("Waiting for new FormatDriver for {} ms", driverReinitializeTimeout);
684-
boolean success = formatDriverChanged.await(driverReinitializeTimeout, MILLISECONDS);
729+
LOGGER.debug("Waiting for new FormatDriver for {} ms", reinitializationTimeout);
730+
boolean success = formatDriverChanged.await(reinitializationTimeout, MILLISECONDS);
685731
if (!success) {
686732
LOGGER.warn("Timed out waiting for MongoDB to recover; will retry anyway, but the operation may fail");
687733
}
@@ -705,7 +751,7 @@ private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOp
705751
} finally {
706752
formatDriverLock.unlock();
707753
}
708-
LOGGER.debug("Retrying " + description, args);
754+
LOGGER.debug("Retrying " + description + " w/" + this.formatDriver.getClass().getSimpleName(), args);
709755
operation.run();
710756
}
711757

@@ -716,14 +762,22 @@ private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOp
716762
* better driver to arrive instead.
717763
*/
718764
void setDisconnectedDriver(Throwable reason) {
719-
LOGGER.debug("quietlySetDisconnectedDriver({}) (previously {})", reason.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
765+
LOGGER.debug("setDisconnectedDriver({}) (previously {})", reason.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
766+
FormatDriver<R> oldDriver;
720767
try {
721768
formatDriverLock.lock();
722-
formatDriver.close();
769+
oldDriver = formatDriver;
770+
oldDriver.close();
723771
formatDriver = new DisconnectedDriver<>(reason);
724772
} finally {
725773
formatDriverLock.unlock();
726774
}
775+
776+
if (!(oldDriver instanceof DisconnectedDriver<?>)) {
777+
// The receiver is what reconnects us. Poke it to make sure it knows things
778+
// have gone south, and we need to try to reconnect.
779+
receiver.interrupt();
780+
}
727781
}
728782

729783
/**
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package works.bosk.drivers.mongo.internal;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.TimeoutException;
6+
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
class ErrorRecordingChangeListener extends ForwardingChangeListener {
10+
final ErrorRecorder errorRecorder;
11+
12+
public static class ErrorRecorder {
13+
public int failureCount = 0;
14+
public final List<Throwable> disconnections = new ArrayList<>();
15+
16+
public void assertAllClear(String description) {
17+
assertTrue(failureCount == 0 && disconnections.isEmpty(),
18+
"Expected no errors " + description + "; found " + this);
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return "ErrorRecorder{" +
24+
"failureCount=" + failureCount +
25+
", disconnections=" + disconnections +
26+
'}';
27+
}
28+
}
29+
30+
public ErrorRecordingChangeListener(ErrorRecorder errorRecorder, ChangeListener downstream) {
31+
super(downstream);
32+
this.errorRecorder = errorRecorder;
33+
}
34+
35+
@Override
36+
public void onConnectionFailed() throws InterruptedException, TimeoutException {
37+
errorRecorder.failureCount++;
38+
super.onConnectionFailed();
39+
}
40+
41+
@Override
42+
public void onDisconnect(Throwable e) {
43+
errorRecorder.disconnections.add(e);
44+
super.onDisconnect(e);
45+
}
46+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package works.bosk.drivers.mongo.internal;
2+
3+
import com.mongodb.client.model.changestream.ChangeStreamDocument;
4+
import java.io.IOException;
5+
import java.util.concurrent.TimeoutException;
6+
import org.bson.BsonDocument;
7+
8+
class ForwardingChangeListener implements ChangeListener {
9+
final ChangeListener downstream;
10+
11+
public ForwardingChangeListener(ChangeListener downstream) {
12+
this.downstream = downstream;
13+
}
14+
15+
@Override
16+
public void onConnectionSucceeded() throws UnrecognizedFormatException, UninitializedCollectionException, InterruptedException, IOException, InitialRootActionException, TimeoutException, FailedSessionException {
17+
downstream.onConnectionSucceeded();
18+
}
19+
20+
@Override
21+
public void onEvent(ChangeStreamDocument<BsonDocument> event) throws UnprocessableEventException {
22+
downstream.onEvent(event);
23+
}
24+
25+
@Override
26+
public void onConnectionFailed() throws InterruptedException, TimeoutException {
27+
downstream.onConnectionFailed();
28+
}
29+
30+
@Override
31+
public void onDisconnect(Throwable e) {
32+
downstream.onDisconnect(e);
33+
}
34+
}

bosk-mongo/src/test/java/works/bosk/drivers/mongo/internal/MongoDriverConformanceTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ class MongoDriverConformanceTest extends SharedDriverConformanceTest {
3434
private static MongoService mongoService;
3535
private final MongoDriverSettings driverSettings;
3636
private final AtomicInteger numOpenDrivers = new AtomicInteger(0);
37+
private ErrorRecordingChangeListener.ErrorRecorder errorRecorder;
38+
39+
@BeforeEach
40+
void setupErrorRecording() {
41+
errorRecorder = new ErrorRecordingChangeListener.ErrorRecorder();
42+
MainDriver.LISTENER_FACTORY.set(downstream -> new ErrorRecordingChangeListener(errorRecorder, downstream));
43+
}
44+
45+
@AfterEach
46+
void teardownErrorRecording() {
47+
errorRecorder.assertAllClear("after test");
48+
MainDriver.LISTENER_FACTORY.remove();
49+
}
3750

3851
public MongoDriverConformanceTest(ParameterSet parameters) {
3952
this.driverSettings = parameters.driverSettingsBuilder().build();

0 commit comments

Comments
 (0)