Skip to content

Commit 0e68d97

Browse files
committed
MongoDriverSettings.timescaleMS.
Combine recoveryPollingMS and flushTimeoutMS into a single "timescale" setting. Tuning a multitude of individual timeouts is not a reasonable thing to ask users to do. Just ask them about how responsive they want the system to be, and then we can take the responsibility of balancing the actual individual timeout settings in a way that makes sense given how all the parts of this rather complex driver work together. This is something I already did in the SQL driver. I might end up copying over the patienceFactor idea too.
1 parent 6b73a67 commit 0e68d97

File tree

8 files changed

+75
-29
lines changed

8 files changed

+75
-29
lines changed

bosk-mongo/src/main/java/works/bosk/drivers/mongo/ChangeReceiver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ChangeReceiver implements Closeable {
5959
ex.scheduleWithFixedDelay(
6060
this::connectionLoop,
6161
0,
62-
settings.recoveryPollingMS(),
62+
settings.timescaleMS(),
6363
MILLISECONDS
6464
);
6565
}
@@ -73,7 +73,7 @@ public void close() {
7373

7474
/**
7575
* This method has a loop to do immediate reconnections and skip the
76-
* {@link MongoDriverSettings#recoveryPollingMS() recoveryPollingMS} delay,
76+
* {@link MongoDriverSettings#timescaleMS()} delay,
7777
* but besides that, exiting this method has the same effect as continuing
7878
* around the loop.
7979
*/
@@ -200,7 +200,7 @@ private void addContextToException(Throwable x) {
200200
private MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor() {
201201
MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> result = collection
202202
.watch()
203-
.maxAwaitTime(settings.recoveryPollingMS(), MILLISECONDS)
203+
.maxAwaitTime(settings.timescaleMS(), MILLISECONDS)
204204
.cursor();
205205
LOGGER.debug("Cursor is open");
206206
return result;

bosk-mongo/src/main/java/works/bosk/drivers/mongo/FlushLock.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* that logic to include an epoch concept without touching other components.
5050
*/
5151
class FlushLock implements Closeable {
52-
private final MongoDriverSettings settings;
52+
private final long flushTimeoutMS;
5353
private final Lock queueLock = new ReentrantLock();
5454
private final PriorityBlockingQueue<Waiter> queue = new PriorityBlockingQueue<>();
5555
private volatile long alreadySeen;
@@ -60,9 +60,9 @@ class FlushLock implements Closeable {
6060
* too old, and we'll wait forever for intervening revisions that have already happened;
6161
* too new, and we'll proceed immediately without waiting for revisions that haven't happened yet.
6262
*/
63-
public FlushLock(MongoDriverSettings settings, long revisionAlreadySeen) {
63+
public FlushLock(long revisionAlreadySeen, long flushTimeoutMS) {
6464
LOGGER.debug("New flush lock at revision {} [{}]", revisionAlreadySeen, identityHashCode(this));
65-
this.settings = settings;
65+
this.flushTimeoutMS = flushTimeoutMS;
6666
this.alreadySeen = revisionAlreadySeen;
6767
}
6868

@@ -92,7 +92,7 @@ void awaitRevision(BsonInt64 revision) throws InterruptedException, FlushFailure
9292
}
9393
if (revisionValue > past) {
9494
LOGGER.debug("Awaiting revision {} > {} [{}]", revisionValue, past, identityHashCode(this));
95-
if (!semaphore.tryAcquire(settings.flushTimeoutMS(), MILLISECONDS)) {
95+
if (!semaphore.tryAcquire(flushTimeoutMS, MILLISECONDS)) {
9696
throw new FlushFailureException("Timed out waiting for revision " + revisionValue + " > " + alreadySeen);
9797
}
9898
if (isClosed) {

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MainDriver.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,22 @@
3030
import works.bosk.Identifier;
3131
import works.bosk.Reference;
3232
import works.bosk.StateTreeNode;
33-
import works.bosk.logging.MappedDiagnosticContext.MDCScope;
3433
import works.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat;
3534
import works.bosk.drivers.mongo.MongoDriverSettings.InitialDatabaseUnavailableMode;
3635
import works.bosk.drivers.mongo.bson.BsonFormatter.DocumentFields;
3736
import works.bosk.drivers.mongo.bson.BsonSerializer;
3837
import works.bosk.drivers.mongo.status.MongoStatus;
3938
import works.bosk.exceptions.FlushFailureException;
4039
import works.bosk.exceptions.InvalidTypeException;
40+
import works.bosk.logging.MappedDiagnosticContext.MDCScope;
4141

4242
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
4343
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4444
import static works.bosk.ReferenceUtils.rawClass;
4545
import static works.bosk.drivers.mongo.Formatter.REVISION_ONE;
4646
import static works.bosk.drivers.mongo.Formatter.REVISION_ZERO;
47-
import static works.bosk.logging.MappedDiagnosticContext.setupMDC;
4847
import static works.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SEQUOIA;
48+
import static works.bosk.logging.MappedDiagnosticContext.setupMDC;
4949

5050
/**
5151
* This is the driver returned to the user by {@link MongoDriver#factory}.
@@ -67,6 +67,10 @@ final class MainDriver<R extends StateTreeNode> implements MongoDriver {
6767
private final Listener listener;
6868
final Formatter formatter;
6969

70+
final long flushTimeout;
71+
final long initialRootTimeout;
72+
final long driverReinitializeTimeout;
73+
7074
/**
7175
* {@link MongoClient#close()} throws if called more than once.
7276
* {@link MongoDriver} is more civilized: subsequent calls do nothing.
@@ -117,6 +121,17 @@ final class MainDriver<R extends StateTreeNode> implements MongoDriver {
117121
this.formatter = new Formatter(boskInfo, bsonSerializer);
118122
this.receiver = new ChangeReceiver(boskInfo.name(), boskInfo.instanceID(), listener, driverSettings, rawCollection);
119123
}
124+
125+
// Flushes work by waiting for the latest version to arrive on the change stream.
126+
// If we wait twice as long as that takes, and we don't see the update, something
127+
// has gone wrong.
128+
flushTimeout = 2L * driverSettings.timescaleMS();
129+
130+
// TODO: Justify this calculation.
131+
initialRootTimeout = 5L * driverSettings.timescaleMS();
132+
133+
// TODO: Justify this calculation.
134+
driverReinitializeTimeout = 5L * driverSettings.timescaleMS();
120135
}
121136

122137
@Override
@@ -421,7 +436,7 @@ public void onConnectionSucceeded() throws
421436
private void runInitialRootAction(FutureTask<R> initialRootAction) throws InterruptedException, TimeoutException, InitialRootActionException {
422437
initialRootAction.run();
423438
try {
424-
initialRootAction.get(5 * driverSettings.recoveryPollingMS(), MILLISECONDS);
439+
initialRootAction.get(initialRootTimeout, MILLISECONDS);
425440
LOGGER.debug("initialRoot action completed successfully");
426441
} catch (ExecutionException e) {
427442
LOGGER.debug("initialRoot action failed", e);
@@ -541,7 +556,7 @@ private FormatDriver<R> newFormatDriver(long revisionAlreadySeen, DatabaseFormat
541556
collection,
542557
driverSettings,
543558
bsonSerializer,
544-
new FlushLock(driverSettings, revisionAlreadySeen),
559+
new FlushLock(revisionAlreadySeen, flushTimeout),
545560
downstream);
546561
} else if (format instanceof PandoFormat pandoFormat) {
547562
return new PandoFormatDriver<>(
@@ -550,7 +565,7 @@ private FormatDriver<R> newFormatDriver(long revisionAlreadySeen, DatabaseFormat
550565
driverSettings,
551566
pandoFormat,
552567
bsonSerializer,
553-
new FlushLock(driverSettings, revisionAlreadySeen),
568+
new FlushLock(revisionAlreadySeen, flushTimeout),
554569
downstream);
555570
}
556571
throw new IllegalArgumentException("Unexpected database format: " + format);
@@ -621,9 +636,8 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
621636
private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOperation<X, Y> operation, String description, Object... args) throws X, Y {
622637
try {
623638
formatDriverLock.lock();
624-
long waitTimeMS = 5 * driverSettings.recoveryPollingMS();
625-
LOGGER.debug("Waiting for new FormatDriver for {} ms", waitTimeMS);
626-
boolean success = formatDriverChanged.await(waitTimeMS, MILLISECONDS);
639+
LOGGER.debug("Waiting for new FormatDriver for {} ms", driverReinitializeTimeout);
640+
boolean success = formatDriverChanged.await(driverReinitializeTimeout, MILLISECONDS);
627641
if (!success) {
628642
LOGGER.warn("Timed out waiting for MongoDB to recover; will retry anyway, but the operation may fail");
629643
}

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MongoDriver.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import works.bosk.drivers.mongo.bson.BsonSerializer;
1111
import works.bosk.drivers.mongo.status.MongoStatus;
1212

13+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
14+
1315
/**
1416
* A {@link BoskDriver} that maintains the bosk state in a MongoDB database.
1517
* Multiple bosks, potentially in multiple separate processes,
@@ -71,7 +73,19 @@ static <RR extends StateTreeNode> MongoDriverFactory<RR> factory(
7173
BsonSerializer bsonSerializer
7274
) {
7375
driverSettings.validate();
74-
return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonSerializer, d);
76+
var actualClientSettings = overrideTimeouts(clientSettings, driverSettings);
77+
return (b, d) -> new MainDriver<>(b, actualClientSettings, driverSettings, bsonSerializer, d);
78+
}
79+
80+
/**
81+
* Timeouts from the driver settings take precedence over those from {@code clientSettings}.
82+
* Without this, the driver settings would be ineffective, and we'd end up waiting
83+
* longer than desired for the Mongo client operations.
84+
*/
85+
static MongoClientSettings overrideTimeouts(MongoClientSettings clientSettings, MongoDriverSettings driverSettings) {
86+
return MongoClientSettings.builder(clientSettings)
87+
.timeout(2L * driverSettings.timescaleMS(), MILLISECONDS)
88+
.build();
7589
}
7690

7791
interface MongoDriverFactory<RR extends StateTreeNode> extends DriverFactory<RR> {

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MongoDriverSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
public class MongoDriverSettings {
1414
String database;
1515

16-
@Default long flushTimeoutMS = 30_000;
17-
@Default long recoveryPollingMS = 30_000;
16+
@Default int timescaleMS = 30_000;
17+
1818
/**
1919
* @see DatabaseFormat#SEQUOIA
2020
* @see PandoFormat

bosk-mongo/src/test/java/works/bosk/drivers/mongo/MongoDriverRecoveryTest.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,28 @@ static Stream<TestParameters.ParameterSet> parameters() {
5454
return TestParameters.driverSettings(
5555
Stream.of(
5656
MongoDriverSettings.DatabaseFormat.SEQUOIA,
57+
PandoFormat.oneBigDocument(),
5758
PandoFormat.withGraftPoints("/catalog", "/sideTable")
5859
),
5960
Stream.of(TestParameters.EventTiming.NORMAL)
6061
).map(b -> b.applyDriverSettings(s -> s
61-
.recoveryPollingMS(1500) // Note that some tests can take as long as 10x this
62-
.flushTimeoutMS(2000) // A little more than recoveryPollingMS
62+
.timescaleMS(100) // Note that some tests can take as long as 25x this
6363
));
6464
}
6565

66-
enum FlushOrWait { FLUSH, WAIT }
66+
enum FlushOrWait {
67+
FLUSH,
68+
69+
/**
70+
* Technically, these tests should be using {@link BoskDriver#flush()},
71+
* but we also want to exhibit some "liveness" so that users who don't
72+
* call {@code flush} eventually see updates anyway.
73+
* <p>
74+
* This test mode inserts a delay instead of {@code flush} to ensure
75+
* updates eventually arrive.
76+
*/
77+
WAIT,
78+
}
6779

6880
@SuppressWarnings("unused")
6981
static Stream<FlushOrWait> flushOrWait() {
@@ -127,7 +139,11 @@ private void waitFor(BoskDriver driver) throws IOException, InterruptedException
127139
driver.flush();
128140
break;
129141
case WAIT:
130-
Thread.sleep(2 * driverSettings.recoveryPollingMS());
142+
// The user really has no business expecting updates to occur promptly.
143+
// Let's wait several times the timescale so that the test
144+
// can set a short timescale to make FLUSH fast without risking
145+
// failures in the WAIT tests.
146+
Thread.sleep(5L * driverSettings.timescaleMS());
131147
break;
132148
}
133149
}
@@ -274,7 +290,7 @@ private void testRecovery(Runnable disruptiveAction, Function<TestEntity, TestEn
274290
LOGGER.debug("Setup database to beforeState");
275291
TestEntity beforeState = initializeDatabase("before disruption");
276292

277-
Bosk<TestEntity> bosk = new Bosk<TestEntity>(boskName(getClass().getSimpleName()), TestEntity.class, this::initialRoot, driverFactory, Bosk.simpleRegistrar());
293+
Bosk<TestEntity> bosk = new Bosk<>(boskName(getClass().getSimpleName()), TestEntity.class, this::initialRoot, driverFactory, Bosk.simpleRegistrar());
278294

279295
try (var _ = bosk.readContext()) {
280296
assertEquals(beforeState, bosk.rootReference().value());

bosk-mongo/src/test/java/works/bosk/drivers/mongo/MongoDriverSpecialTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ static Stream<TestParameters.ParameterSet> parameters() {
6969
return TestParameters.driverSettings(
7070
Stream.of(
7171
MongoDriverSettings.DatabaseFormat.SEQUOIA,
72-
// PandoFormat.oneBigDocument(),
72+
PandoFormat.oneBigDocument(),
7373
PandoFormat.withGraftPoints("/catalog", "/sideTable")
7474
),
7575
Stream.of(TestParameters.EventTiming.NORMAL)
76-
);
76+
).map(b -> b.applyDriverSettings(s -> s
77+
.timescaleMS(100) // Note that some tests can take as long as 25x this
78+
));
7779
}
7880

7981
@ParametersByName
@@ -132,8 +134,9 @@ public <T> void submitReplacement(Reference<T> target, T newValue) {
132134
Reference<ListingEntry> ref = listingRef.then(entity123);
133135
bosk.driver().submitReplacement(ref, LISTING_ENTRY);
134136

135-
// Give the driver a bit of time to make a mistake, if it's going to
136-
long budgetMillis = 2000;
137+
// Give the driver a bit of time to make a mistake, if it's going to,
138+
// but not so long that we cause a timeout that wouldn't otherwise happen
139+
long budgetMillis = (1+driverSettings.timescaleMS()) / 2;
137140
while (budgetMillis > 0) {
138141
long startTime = currentTimeMillis();
139142
Reference<?> updatedRef = replacementsSeen.poll(budgetMillis, MILLISECONDS);

bosk-mongo/src/test/java/works/bosk/drivers/mongo/TestParameters.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ public static ParameterSet from(MongoDriverSettings.DatabaseFormat format, Event
2525
timing + "," + format,
2626
MongoDriverSettings.builder()
2727
.preferredDatabaseFormat(format)
28-
.recoveryPollingMS(3000) // Note that some tests can take as long as 10x this
29-
.flushTimeoutMS(4000) // A little more than recoveryPollingMS
28+
.timescaleMS(90_000) // Tests that actually exercise timeouts should use a much shorter value
3029
.testing(MongoDriverSettings.Testing.builder()
3130
.eventDelayMS(timing.eventDelayMS)
3231
.build())

0 commit comments

Comments
 (0)