Skip to content

Commit ec3a73d

Browse files
committed
In FAIL_FAST, try opening change stream cursor.
I expected it to be a common mistake for people to forget to use a replica set. In FAIL_FAST mode, it seems like that ought to reliably throw an exception before the Bosk constructor returns. Still won't help much as long as the default is DISCONNECT instead of FAIL_FAST though.
1 parent 8af30d4 commit ec3a73d

File tree

8 files changed

+165
-3
lines changed

8 files changed

+165
-3
lines changed

bosk-logback/src/main/java/works/bosk/logback/BoskLogFilter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import ch.qos.logback.core.spi.FilterReply;
77
import java.util.Map;
88
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.function.Function;
910
import java.util.stream.Stream;
1011
import org.slf4j.LoggerFactory;
1112
import org.slf4j.MDC;
@@ -61,7 +62,11 @@ public static final class LogController {
6162
// We'd like to use SLF4J's "Level" but that doesn't support OFF
6263
public void setLogging(Level level, Class<?>... loggers) {
6364
// Put them all in one atomic operation
64-
overrides.putAll(Stream.of(loggers).collect(toMap(Class::getName, c->level)));
65+
overrides.putAll(Stream.of(loggers).collect(toMap(Class::getName, _->level)));
66+
}
67+
68+
public void setLogging(Level level, String... loggers) {
69+
overrides.putAll(Stream.of(loggers).collect(toMap(Function.identity(),_->level)));
6570
}
6671
}
6772

bosk-mongo/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,13 @@ containing `MongoDriver` and the associated machinery.
55
This library is documented in the [User's Guide](../docs/USERS.md);
66
see also the [unit tests](src/test) for usage examples.
77

8+
Add [MongoDriver](src/main/java/works/bosk/mongo/MongoDriver.java) to your
9+
driver stack for persistence and replication.
10+
During development, we recommend using `InitialDatabaseUnavailableMode.FAIL_FAST`
11+
to get helpful possible errors when the database is misconfigured or unavailable;
12+
in production, you should use the default `DISCONNECT` mode for better fault tolerance.
13+
14+
Bosk uses change streams, so it requires MongoDB to be configured as a replica set.
15+
Our unit tests demonstrate how to do this with TestContainers.
16+
817
See the [javadocs](https://javadoc.io/doc/works.bosk/bosk-mongo/latest/works.bosk.mongo/module-summary.html) for more information.

bosk-mongo/src/main/java/works/bosk/drivers/mongo/MongoDriverSettings.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Builder;
44
import lombok.Builder.Default;
55
import lombok.Value;
6+
import works.bosk.Bosk;
67
import works.bosk.BoskDriver;
78

89
import static works.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SEQUOIA;
@@ -39,6 +40,12 @@ public class MongoDriverSettings {
3940
* @see PandoFormat
4041
*/
4142
@Default DatabaseFormat preferredDatabaseFormat = SEQUOIA;
43+
44+
/**
45+
* Default is {@link InitialDatabaseUnavailableMode#DISCONNECT DISCONNECT}
46+
* because it simplifies production deployments and repairs,
47+
* but these very fault-tolerance features can be confusing during development.
48+
*/
4249
@Default InitialDatabaseUnavailableMode initialDatabaseUnavailableMode = InitialDatabaseUnavailableMode.DISCONNECT;
4350

4451
@Default Experimental experimental = Experimental.builder().build();
@@ -102,7 +109,7 @@ public enum InitialDatabaseUnavailableMode {
102109

103110
/**
104111
* If the database state can't be loaded during {@link BoskDriver#initialRoot},
105-
* throw an exception.
112+
* throw an exception from the {@link Bosk#Bosk Bosk constructor} call.
106113
* This is probably the desired behaviour during development,
107114
* but in production, it creates a boot sequencing dependency between the application and the database.
108115
*/
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package works.bosk.drivers.mongo.exceptions;
2+
3+
public class InitialCursorCommandException extends InitialRootFailureException {
4+
public InitialCursorCommandException(String message) {
5+
super(message);
6+
}
7+
8+
public InitialCursorCommandException(String message, Throwable cause) {
9+
super(message, cause);
10+
}
11+
12+
public InitialCursorCommandException(Throwable cause) {
13+
super(cause);
14+
}
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package works.bosk.drivers.mongo.exceptions;
2+
3+
public class InitialCursorTimeoutException extends InitialRootFailureException {
4+
public InitialCursorTimeoutException(String message) {
5+
super(message);
6+
}
7+
8+
public InitialCursorTimeoutException(String message, Throwable cause) {
9+
super(message, cause);
10+
}
11+
12+
public InitialCursorTimeoutException(Throwable cause) {
13+
super(cause);
14+
}
15+
}

bosk-mongo/src/main/java/works/bosk/drivers/mongo/internal/ChangeReceiver.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package works.bosk.drivers.mongo.internal;
22

3+
import com.mongodb.MongoCommandException;
4+
import com.mongodb.MongoException;
35
import com.mongodb.MongoInterruptedException;
6+
import com.mongodb.MongoOperationTimeoutException;
47
import com.mongodb.client.MongoChangeStreamCursor;
58
import com.mongodb.client.MongoCollection;
69
import com.mongodb.client.model.changestream.ChangeStreamDocument;
@@ -18,10 +21,14 @@
1821
import works.bosk.Identifier;
1922
import works.bosk.drivers.mongo.MongoDriverSettings;
2023
import works.bosk.drivers.mongo.exceptions.DisconnectedException;
24+
import works.bosk.drivers.mongo.exceptions.InitialCursorCommandException;
25+
import works.bosk.drivers.mongo.exceptions.InitialRootFailureException;
26+
import works.bosk.drivers.mongo.exceptions.InitialCursorTimeoutException;
2127
import works.bosk.logging.MdcKeys;
2228

2329
import static java.lang.Thread.currentThread;
2430
import static java.util.concurrent.TimeUnit.MILLISECONDS;
31+
import static works.bosk.drivers.mongo.MongoDriverSettings.InitialDatabaseUnavailableMode.FAIL_FAST;
2532
import static works.bosk.logging.MappedDiagnosticContext.MDCScope;
2633
import static works.bosk.logging.MappedDiagnosticContext.setupMDC;
2734

@@ -58,6 +65,11 @@ class ChangeReceiver implements Closeable {
5865
this.settings = settings;
5966
this.collection = collection;
6067
this.creationPoint = new Exception("Additional context: ChangeReceiver creation stack trace:");
68+
if (settings.initialDatabaseUnavailableMode() == FAIL_FAST) {
69+
// User requested fail-fast behaviour; try to open the cursor right away
70+
// to ensure the database is set up for change streams.
71+
probeChangeStreamCursor();
72+
}
6173
ex.scheduleWithFixedDelay(
6274
this::connectionLoop,
6375
0,
@@ -66,6 +78,18 @@ class ChangeReceiver implements Closeable {
6678
);
6779
}
6880

81+
private void probeChangeStreamCursor() {
82+
try (var _ = openCursor()) {
83+
LOGGER.debug("Successfully opened MongoDB cursor");
84+
} catch (MongoOperationTimeoutException e) {
85+
throw new InitialCursorTimeoutException("Timed out attempting to open MongoDB cursor; check database connectivity", e);
86+
} catch (MongoCommandException e) {
87+
throw new InitialCursorCommandException("Failed to open change stream cursor; ensure MongoDB server configured as a replica set", e);
88+
} catch (MongoException e) {
89+
throw new InitialRootFailureException("Unexpected failure opening MongoDB cursor", e);
90+
}
91+
}
92+
6993
@Override
7094
public void close() {
7195
isClosed = true;
@@ -193,6 +217,7 @@ private void disconnect(String description, String remedy, Throwable e) {
193217
private void addContextToException(Throwable x) {
194218
x.addSuppressed(creationPoint);
195219
}
220+
196221
private MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor() {
197222
MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> result = collection
198223
.watch()

bosk-mongo/src/test/java/works/bosk/drivers/mongo/internal/MongoService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MongoService implements Closeable {
4545
// We do logging in some static initializers, so this needs to be initialized first
4646
private static final Logger LOGGER = LoggerFactory.getLogger(MongoService.class);
4747

48+
public static final DockerImageName MONGODB_IMAGE_NAME = DockerImageName.parse("mongo:8.0");
49+
4850
private final MongoClient mongoClient = MongoClients.create(normalClientSettings);
4951

5052
// Expensive stuff shared among instances as much as possible, hence static
@@ -100,7 +102,7 @@ public void close() {
100102
}
101103

102104
private static MongoDBContainer mongoContainer() {
103-
MongoDBContainer container = new MongoDBContainer(DockerImageName.parse("mongo:8.0"))
105+
MongoDBContainer container = new MongoDBContainer(MONGODB_IMAGE_NAME)
104106
.withReplicaSet()
105107
.withTmpFs(Map.of("/data/db", "rw"))
106108
.withNetwork(NETWORK);
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package works.bosk.drivers.mongo.internal;
2+
3+
import com.mongodb.MongoClientSettings;
4+
import com.mongodb.ServerAddress;
5+
import java.io.IOException;
6+
import java.net.ServerSocket;
7+
import org.junit.jupiter.api.Test;
8+
import org.testcontainers.mongodb.MongoDBContainer;
9+
import works.bosk.Bosk;
10+
import works.bosk.BoskConfig;
11+
import works.bosk.DriverStack;
12+
import works.bosk.drivers.mongo.BsonSerializer;
13+
import works.bosk.drivers.mongo.MongoDriver;
14+
import works.bosk.drivers.mongo.MongoDriverSettings;
15+
import works.bosk.drivers.mongo.exceptions.InitialCursorCommandException;
16+
import works.bosk.drivers.mongo.exceptions.InitialCursorTimeoutException;
17+
import works.bosk.logback.BoskLogFilter;
18+
import works.bosk.testing.drivers.state.TestEntity;
19+
20+
import static ch.qos.logback.classic.Level.ERROR;
21+
import static org.junit.jupiter.api.Assertions.assertThrows;
22+
import static works.bosk.drivers.mongo.MongoDriverSettings.InitialDatabaseUnavailableMode.FAIL_FAST;
23+
import static works.bosk.drivers.mongo.internal.MongoService.MONGODB_IMAGE_NAME;
24+
25+
public class ServerMisconfigurationTest {
26+
27+
@Test
28+
void unreachable() throws IOException {
29+
int port;
30+
try (var socket = new ServerSocket(0)) {
31+
port = socket.getLocalPort();
32+
}
33+
MongoClientSettings clientSettings = MongoService.mongoClientSettings(
34+
new ServerAddress("localhost", port)
35+
);
36+
assertThrows(InitialCursorTimeoutException.class, () -> createBosk(clientSettings));
37+
}
38+
39+
/**
40+
* This is an important case: it doesn't fail "naturally" because
41+
* a standalone MongoDB server will accept connections and return the initial state,
42+
* but will then fail when attempting to open a change stream.
43+
* Hence, special effort is required to detect this case and pass this test.
44+
*/
45+
@Test
46+
void notAReplicaSet() {
47+
try (var mongo = new MongoDBContainer(MONGODB_IMAGE_NAME)) {
48+
mongo.start();
49+
MongoClientSettings clientSettings = MongoService.mongoClientSettings(
50+
new ServerAddress(
51+
mongo.getHost(),
52+
mongo.getFirstMappedPort()
53+
)
54+
);
55+
assertThrows(InitialCursorCommandException.class, () -> createBosk(clientSettings));
56+
}
57+
}
58+
59+
private Bosk<TestEntity> createBosk(MongoClientSettings clientSettings) {
60+
// We're expecting some timeout warnings from the Mongo driver;
61+
BoskLogFilter.LogController logController = new BoskLogFilter.LogController();
62+
logController.setLogging(ERROR, "com.mongodb");
63+
64+
return new Bosk<>(
65+
ServerMisconfigurationTest.class.getSimpleName(),
66+
TestEntity.class,
67+
AbstractMongoDriverTest::initialRootWithEmptyCatalog,
68+
BoskConfig.<TestEntity>builder()
69+
.driverFactory(DriverStack.of(
70+
BoskLogFilter.withController(logController),
71+
MongoDriver.factory(
72+
clientSettings,
73+
MongoDriverSettings.builder()
74+
.database("bosk_" + getClass().getSimpleName())
75+
.timescaleMS(100)
76+
.initialDatabaseUnavailableMode(FAIL_FAST)
77+
.build(),
78+
new BsonSerializer()
79+
)
80+
))
81+
.build()
82+
);
83+
}
84+
}

0 commit comments

Comments
 (0)