Skip to content

Commit 3b49cc9

Browse files
authored
Merge pull request boskworks#137 from prdoyle/mongo-timeouts
MongoDriver timescale setting
2 parents 6b73a67 + 4195259 commit 3b49cc9

File tree

8 files changed

+94
-29
lines changed

8 files changed

+94
-29
lines changed

bosk-mongo/src/main/java/works/bosk/drivers/mongo/ChangeReceiver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ChangeReceiver implements Closeable {
5959
ex.scheduleWithFixedDelay(
6060
this::connectionLoop,
6161
0,
62-
settings.recoveryPollingMS(),
62+
settings.timescaleMS(),
6363
MILLISECONDS
6464
);
6565
}
@@ -73,7 +73,7 @@ public void close() {
7373

7474
/**
7575
* This method has a loop to do immediate reconnections and skip the
76-
* {@link MongoDriverSettings#recoveryPollingMS() recoveryPollingMS} delay,
76+
* {@link MongoDriverSettings#timescaleMS()} delay,
7777
* but besides that, exiting this method has the same effect as continuing
7878
* around the loop.
7979
*/
@@ -200,7 +200,7 @@ private void addContextToException(Throwable x) {
200200
private MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor() {
201201
MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> result = collection
202202
.watch()
203-
.maxAwaitTime(settings.recoveryPollingMS(), MILLISECONDS)
203+
.maxAwaitTime(settings.timescaleMS(), MILLISECONDS)
204204
.cursor();
205205
LOGGER.debug("Cursor is open");
206206
return result;

bosk-mongo/src/main/java/works/bosk/drivers/mongo/FlushLock.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* that logic to include an epoch concept without touching other components.
5050
*/
5151
class FlushLock implements Closeable {
52-
private final MongoDriverSettings settings;
52+
private final long flushTimeoutMS;
5353
private final Lock queueLock = new ReentrantLock();
5454
private final PriorityBlockingQueue<Waiter> queue = new PriorityBlockingQueue<>();
5555
private volatile long alreadySeen;
@@ -60,9 +60,9 @@ class FlushLock implements Closeable {
6060
* too old, and we'll wait forever for intervening revisions that have already happened;
6161
* too new, and we'll proceed immediately without waiting for revisions that haven't happened yet.
6262
*/
63-
public FlushLock(MongoDriverSettings settings, long revisionAlreadySeen) {
63+
public FlushLock(long revisionAlreadySeen, long flushTimeoutMS) {
6464
LOGGER.debug("New flush lock at revision {} [{}]", revisionAlreadySeen, identityHashCode(this));
65-
this.settings = settings;
65+
this.flushTimeoutMS = flushTimeoutMS;
6666
this.alreadySeen = revisionAlreadySeen;
6767
}
6868

@@ -92,7 +92,7 @@ void awaitRevision(BsonInt64 revision) throws InterruptedException, FlushFailure
9292
}
9393
if (revisionValue > past) {
9494
LOGGER.debug("Awaiting revision {} > {} [{}]", revisionValue, past, identityHashCode(this));
95-
if (!semaphore.tryAcquire(settings.flushTimeoutMS(), MILLISECONDS)) {
95+
if (!semaphore.tryAcquire(flushTimeoutMS, MILLISECONDS)) {
9696
throw new FlushFailureException("Timed out waiting for revision " + revisionValue + " > " + alreadySeen);
9797
}
9898
if (isClosed) {

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MainDriver.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,22 @@
3030
import works.bosk.Identifier;
3131
import works.bosk.Reference;
3232
import works.bosk.StateTreeNode;
33-
import works.bosk.logging.MappedDiagnosticContext.MDCScope;
3433
import works.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat;
3534
import works.bosk.drivers.mongo.MongoDriverSettings.InitialDatabaseUnavailableMode;
3635
import works.bosk.drivers.mongo.bson.BsonFormatter.DocumentFields;
3736
import works.bosk.drivers.mongo.bson.BsonSerializer;
3837
import works.bosk.drivers.mongo.status.MongoStatus;
3938
import works.bosk.exceptions.FlushFailureException;
4039
import works.bosk.exceptions.InvalidTypeException;
40+
import works.bosk.logging.MappedDiagnosticContext.MDCScope;
4141

4242
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
4343
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4444
import static works.bosk.ReferenceUtils.rawClass;
4545
import static works.bosk.drivers.mongo.Formatter.REVISION_ONE;
4646
import static works.bosk.drivers.mongo.Formatter.REVISION_ZERO;
47-
import static works.bosk.logging.MappedDiagnosticContext.setupMDC;
4847
import static works.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SEQUOIA;
48+
import static works.bosk.logging.MappedDiagnosticContext.setupMDC;
4949

5050
/**
5151
* This is the driver returned to the user by {@link MongoDriver#factory}.
@@ -67,6 +67,10 @@ final class MainDriver<R extends StateTreeNode> implements MongoDriver {
6767
private final Listener listener;
6868
final Formatter formatter;
6969

70+
final long flushTimeout;
71+
final long initialRootTimeout;
72+
final long driverReinitializeTimeout;
73+
7074
/**
7175
* {@link MongoClient#close()} throws if called more than once.
7276
* {@link MongoDriver} is more civilized: subsequent calls do nothing.
@@ -117,6 +121,17 @@ final class MainDriver<R extends StateTreeNode> implements MongoDriver {
117121
this.formatter = new Formatter(boskInfo, bsonSerializer);
118122
this.receiver = new ChangeReceiver(boskInfo.name(), boskInfo.instanceID(), listener, driverSettings, rawCollection);
119123
}
124+
125+
// Flushes work by waiting for the latest version to arrive on the change stream.
126+
// If we wait twice as long as that takes, and we don't see the update, something
127+
// has gone wrong.
128+
flushTimeout = 2L * driverSettings.timescaleMS();
129+
130+
// TODO: Justify this calculation.
131+
initialRootTimeout = 5L * driverSettings.timescaleMS();
132+
133+
// TODO: Justify this calculation.
134+
driverReinitializeTimeout = 5L * driverSettings.timescaleMS();
120135
}
121136

122137
@Override
@@ -421,7 +436,7 @@ public void onConnectionSucceeded() throws
421436
private void runInitialRootAction(FutureTask<R> initialRootAction) throws InterruptedException, TimeoutException, InitialRootActionException {
422437
initialRootAction.run();
423438
try {
424-
initialRootAction.get(5 * driverSettings.recoveryPollingMS(), MILLISECONDS);
439+
initialRootAction.get(initialRootTimeout, MILLISECONDS);
425440
LOGGER.debug("initialRoot action completed successfully");
426441
} catch (ExecutionException e) {
427442
LOGGER.debug("initialRoot action failed", e);
@@ -541,7 +556,7 @@ private FormatDriver<R> newFormatDriver(long revisionAlreadySeen, DatabaseFormat
541556
collection,
542557
driverSettings,
543558
bsonSerializer,
544-
new FlushLock(driverSettings, revisionAlreadySeen),
559+
new FlushLock(revisionAlreadySeen, flushTimeout),
545560
downstream);
546561
} else if (format instanceof PandoFormat pandoFormat) {
547562
return new PandoFormatDriver<>(
@@ -550,7 +565,7 @@ private FormatDriver<R> newFormatDriver(long revisionAlreadySeen, DatabaseFormat
550565
driverSettings,
551566
pandoFormat,
552567
bsonSerializer,
553-
new FlushLock(driverSettings, revisionAlreadySeen),
568+
new FlushLock(revisionAlreadySeen, flushTimeout),
554569
downstream);
555570
}
556571
throw new IllegalArgumentException("Unexpected database format: " + format);
@@ -621,9 +636,8 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
621636
private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOperation<X, Y> operation, String description, Object... args) throws X, Y {
622637
try {
623638
formatDriverLock.lock();
624-
long waitTimeMS = 5 * driverSettings.recoveryPollingMS();
625-
LOGGER.debug("Waiting for new FormatDriver for {} ms", waitTimeMS);
626-
boolean success = formatDriverChanged.await(waitTimeMS, MILLISECONDS);
639+
LOGGER.debug("Waiting for new FormatDriver for {} ms", driverReinitializeTimeout);
640+
boolean success = formatDriverChanged.await(driverReinitializeTimeout, MILLISECONDS);
627641
if (!success) {
628642
LOGGER.warn("Timed out waiting for MongoDB to recover; will retry anyway, but the operation may fail");
629643
}

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MongoDriver.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import works.bosk.drivers.mongo.bson.BsonSerializer;
1111
import works.bosk.drivers.mongo.status.MongoStatus;
1212

13+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
14+
1315
/**
1416
* A {@link BoskDriver} that maintains the bosk state in a MongoDB database.
1517
* Multiple bosks, potentially in multiple separate processes,
@@ -71,7 +73,19 @@ static <RR extends StateTreeNode> MongoDriverFactory<RR> factory(
7173
BsonSerializer bsonSerializer
7274
) {
7375
driverSettings.validate();
74-
return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonSerializer, d);
76+
var actualClientSettings = overrideTimeouts(clientSettings, driverSettings);
77+
return (b, d) -> new MainDriver<>(b, actualClientSettings, driverSettings, bsonSerializer, d);
78+
}
79+
80+
/**
81+
* Timeouts from the driver settings take precedence over those from {@code clientSettings}.
82+
* Without this, the driver settings would be ineffective, and we'd end up waiting
83+
* longer than desired for the Mongo client operations.
84+
*/
85+
static MongoClientSettings overrideTimeouts(MongoClientSettings clientSettings, MongoDriverSettings driverSettings) {
86+
return MongoClientSettings.builder(clientSettings)
87+
.timeout(2L * driverSettings.timescaleMS(), MILLISECONDS)
88+
.build();
7589
}
7690

7791
interface MongoDriverFactory<RR extends StateTreeNode> extends DriverFactory<RR> {

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MongoDriverSettings.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,27 @@
1313
public class MongoDriverSettings {
1414
String database;
1515

16-
@Default long flushTimeoutMS = 30_000;
17-
@Default long recoveryPollingMS = 30_000;
16+
/**
17+
* The general responsiveness of the system. Changes to the database will be
18+
* "noticed" in about this many milliseconds, and other time-related behaviours
19+
* will be scaled accordingly. Lower values are more responsive, while higher
20+
* values are more efficient because they'll do less polling.
21+
* <p>
22+
* For test cases,
23+
*
24+
* <ul>
25+
* <li>
26+
* If you are exercising timeout conditions, set this to a low value
27+
* (say, 1/10 of its default) to make your tests run quickly;
28+
* </li>
29+
* <li>
30+
* otherwise, set it to a high value (say, 10x its default)
31+
* to avoid spurious test failures.
32+
* </li>
33+
* </ul>
34+
*/
35+
@Default int timescaleMS = 1000;
36+
1837
/**
1938
* @see DatabaseFormat#SEQUOIA
2039
* @see PandoFormat

bosk-mongo/src/test/java/works/bosk/drivers/mongo/MongoDriverRecoveryTest.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,28 @@ static Stream<TestParameters.ParameterSet> parameters() {
5454
return TestParameters.driverSettings(
5555
Stream.of(
5656
MongoDriverSettings.DatabaseFormat.SEQUOIA,
57+
PandoFormat.oneBigDocument(),
5758
PandoFormat.withGraftPoints("/catalog", "/sideTable")
5859
),
5960
Stream.of(TestParameters.EventTiming.NORMAL)
6061
).map(b -> b.applyDriverSettings(s -> s
61-
.recoveryPollingMS(1500) // Note that some tests can take as long as 10x this
62-
.flushTimeoutMS(2000) // A little more than recoveryPollingMS
62+
.timescaleMS(100) // Note that some tests can take as long as 25x this
6363
));
6464
}
6565

66-
enum FlushOrWait { FLUSH, WAIT }
66+
enum FlushOrWait {
67+
FLUSH,
68+
69+
/**
70+
* Technically, these tests should be using {@link BoskDriver#flush()},
71+
* but we also want to exhibit some "liveness" so that users who don't
72+
* call {@code flush} eventually see updates anyway.
73+
* <p>
74+
* This test mode inserts a delay instead of {@code flush} to ensure
75+
* updates eventually arrive.
76+
*/
77+
WAIT,
78+
}
6779

6880
@SuppressWarnings("unused")
6981
static Stream<FlushOrWait> flushOrWait() {
@@ -127,7 +139,11 @@ private void waitFor(BoskDriver driver) throws IOException, InterruptedException
127139
driver.flush();
128140
break;
129141
case WAIT:
130-
Thread.sleep(2 * driverSettings.recoveryPollingMS());
142+
// The user really has no business expecting updates to occur promptly.
143+
// Let's wait several times the timescale so that the test
144+
// can set a short timescale to make FLUSH fast without risking
145+
// failures in the WAIT tests.
146+
Thread.sleep(5L * driverSettings.timescaleMS());
131147
break;
132148
}
133149
}
@@ -274,7 +290,7 @@ private void testRecovery(Runnable disruptiveAction, Function<TestEntity, TestEn
274290
LOGGER.debug("Setup database to beforeState");
275291
TestEntity beforeState = initializeDatabase("before disruption");
276292

277-
Bosk<TestEntity> bosk = new Bosk<TestEntity>(boskName(getClass().getSimpleName()), TestEntity.class, this::initialRoot, driverFactory, Bosk.simpleRegistrar());
293+
Bosk<TestEntity> bosk = new Bosk<>(boskName(getClass().getSimpleName()), TestEntity.class, this::initialRoot, driverFactory, Bosk.simpleRegistrar());
278294

279295
try (var _ = bosk.readContext()) {
280296
assertEquals(beforeState, bosk.rootReference().value());

bosk-mongo/src/test/java/works/bosk/drivers/mongo/MongoDriverSpecialTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ static Stream<TestParameters.ParameterSet> parameters() {
6969
return TestParameters.driverSettings(
7070
Stream.of(
7171
MongoDriverSettings.DatabaseFormat.SEQUOIA,
72-
// PandoFormat.oneBigDocument(),
72+
PandoFormat.oneBigDocument(),
7373
PandoFormat.withGraftPoints("/catalog", "/sideTable")
7474
),
7575
Stream.of(TestParameters.EventTiming.NORMAL)
76-
);
76+
).map(b -> b.applyDriverSettings(s -> s
77+
.timescaleMS(100) // Note that some tests can take as long as 25x this
78+
));
7779
}
7880

7981
@ParametersByName
@@ -132,8 +134,9 @@ public <T> void submitReplacement(Reference<T> target, T newValue) {
132134
Reference<ListingEntry> ref = listingRef.then(entity123);
133135
bosk.driver().submitReplacement(ref, LISTING_ENTRY);
134136

135-
// Give the driver a bit of time to make a mistake, if it's going to
136-
long budgetMillis = 2000;
137+
// Give the driver a bit of time to make a mistake, if it's going to,
138+
// but not so long that we cause a timeout that wouldn't otherwise happen
139+
long budgetMillis = (1+driverSettings.timescaleMS()) / 2;
137140
while (budgetMillis > 0) {
138141
long startTime = currentTimeMillis();
139142
Reference<?> updatedRef = replacementsSeen.poll(budgetMillis, MILLISECONDS);

bosk-mongo/src/test/java/works/bosk/drivers/mongo/TestParameters.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ public static ParameterSet from(MongoDriverSettings.DatabaseFormat format, Event
2525
timing + "," + format,
2626
MongoDriverSettings.builder()
2727
.preferredDatabaseFormat(format)
28-
.recoveryPollingMS(3000) // Note that some tests can take as long as 10x this
29-
.flushTimeoutMS(4000) // A little more than recoveryPollingMS
28+
.timescaleMS(90_000) // Tests that actually exercise timeouts should use a much shorter value
3029
.testing(MongoDriverSettings.Testing.builder()
3130
.eventDelayMS(timing.eventDelayMS)
3231
.build())

0 commit comments

Comments
 (0)