Skip to content

Commit c3c35bc

Browse files
authored
Added offset.partition.name configuration. (#41)
Allows for custom partition naming schemes. KAFKA-158
1 parent af537fc commit c3c35bc

File tree

5 files changed

+128
-50
lines changed

5 files changed

+128
-50
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
- [KAFKA-105](https://jira.mongodb.org/browse/KAFKA-105) Improve `errors.tolerance=all` support in the sink and source connectors.
2525
- [KAFKA-106](https://jira.mongodb.org/browse/KAFKA-106) Changed `max.num.retries` default to 1. A safer default especially as the driver now has retryable writes.
2626
- [KAFKA-147](https://jira.mongodb.org/browse/KAFKA-147) Added `copy.existing.namespace.regex` configuration, that allows the filtering of namespaces to be copied.
27-
27+
- [KAFKA-158](https://jira.mongodb.org/browse/KAFKA-158) Added `offset.partition.name` configuration, which allows for custom partitioning naming strategies.
28+
Note: This can be used to start a new change stream, when an existing offset contains an invalid resume token.
2829

2930
## 1.2.0
3031
- [KAFKA-92](https://jira.mongodb.org/browse/KAFKA-92) Allow the Sink connector to use multiple tasks.

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@
7878
@ExtendWith(MockitoExtension.class)
7979
@RunWith(JUnitPlatform.class)
8080
public class MongoSourceTaskIntegrationTest extends MongoKafkaTestCase {
81+
private static final Map<String, Object> INVALID_OFFSET =
82+
singletonMap(
83+
"_id",
84+
"{\"_data\": \"825F58DDF4000000032B022C0100296E5A1004BBCFDF90907247ABA61D94DF01D76200461E5F6964002B020004\"}");
8185

8286
@Mock private SourceTaskContext context;
8387
@Mock private OffsetStorageReader offsetStorageReader;
@@ -362,10 +366,8 @@ void testSourceCanHandleInvalidResumeTokenWhenErrorToleranceIsAll() {
362366
}
363367
};
364368

365-
String offsetToken =
366-
"{\"_data\": \"825F58DDF4000000032B022C0100296E5A1004BBCFDF90907247ABA61D94DF01D76200461E5F6964002B020004\"}";
367369
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
368-
when(offsetStorageReader.offset(any())).thenReturn(singletonMap("_id", offsetToken));
370+
when(offsetStorageReader.offset(any())).thenReturn(INVALID_OFFSET);
369371
task.initialize(context);
370372
task.start(cfg);
371373

@@ -376,6 +378,62 @@ void testSourceCanHandleInvalidResumeTokenWhenErrorToleranceIsAll() {
376378
}
377379
}
378380

381+
@Test
382+
@DisplayName("Ensure source can use custom offset partition names")
383+
void testSourceCanUseCustomOffsetPartitionNames() {
384+
assumeTrue(isGreaterThanFourDotZero());
385+
try (AutoCloseableSourceTask task = createSourceTask(Logger.getLogger(MongoSourceTask.class))) {
386+
MongoCollection<Document> coll = getAndCreateCollection();
387+
388+
coll.insertOne(Document.parse("{a: 1}"));
389+
390+
HashMap<String, String> cfg =
391+
new HashMap<String, String>() {
392+
{
393+
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
394+
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
395+
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "50");
396+
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "5000");
397+
put(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG, "oldPartitionName");
398+
}
399+
};
400+
401+
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
402+
when(offsetStorageReader.offset(singletonMap("ns", "oldPartitionName")))
403+
.thenReturn(INVALID_OFFSET);
404+
task.initialize(context);
405+
task.start(cfg);
406+
407+
assertNull(task.poll());
408+
task.stop();
409+
410+
assertTrue(
411+
task.logCapture.getEvents().stream()
412+
.anyMatch(
413+
e ->
414+
e.getRenderedMessage()
415+
.contains("Resuming the change stream after the previous offset")));
416+
assertTrue(
417+
task.logCapture.getEvents().stream()
418+
.anyMatch(e -> e.getRenderedMessage().contains("Failed to resume change stream")));
419+
task.logCapture.reset();
420+
421+
when(offsetStorageReader.offset(singletonMap("ns", "newPartitionName"))).thenReturn(null);
422+
cfg.put(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG, "newPartitionName");
423+
task.start(cfg);
424+
425+
insertMany(rangeClosed(1, 50), coll);
426+
assertSourceRecordValues(createInserts(1, 50), getNextResults(task), coll);
427+
428+
assertTrue(
429+
task.logCapture.getEvents().stream()
430+
.anyMatch(
431+
e ->
432+
e.getRenderedMessage()
433+
.contains("New change stream cursor created without offset")));
434+
}
435+
}
436+
379437
@Test
380438
@DisplayName("Copy existing with a restart midway through")
381439
void testCopyingExistingWithARestartMidwayThrough() {

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,13 @@ public class MongoSourceConfig extends AbstractConfig {
283283
+ HEARTBEAT_TOPIC_NAME_DEFAULT
284284
+ "'.";
285285

286+
public static final String OFFSET_PARTITION_NAME_CONFIG = "offset.partition.name";
287+
public static final String OFFSET_PARTITION_NAME_DISPLAY = "Offset partition name";
288+
public static final String OFFSET_PARTITION_NAME_DEFAULT = "";
289+
public static final String OFFSET_PARTITION_NAME_DOC =
290+
"Use a custom offset partition name. If blank the default partition name based on the "
291+
+ "connection details will be used.";
292+
286293
public static final ConfigDef CONFIG = createConfigDef();
287294
private static final List<Consumer<MongoSourceConfig>> INITIALIZERS =
288295
singletonList(MongoSourceConfig::validateCollection);
@@ -742,6 +749,20 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
742749
Width.MEDIUM,
743750
HEARTBEAT_TOPIC_NAME_DISPLAY);
744751

752+
group = "Partition";
753+
orderInGroup = 0;
754+
755+
configDef.define(
756+
OFFSET_PARTITION_NAME_CONFIG,
757+
Type.STRING,
758+
OFFSET_PARTITION_NAME_DEFAULT,
759+
Importance.MEDIUM,
760+
OFFSET_PARTITION_NAME_DOC,
761+
group,
762+
++orderInGroup,
763+
Width.SHORT,
764+
OFFSET_PARTITION_NAME_DISPLAY);
765+
745766
return configDef;
746767
}
747768
}

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ public void start(final Map<String, String> props) {
166166
}
167167

168168
heartbeatManager = null;
169-
createPartitionMap(sourceConfig, true);
169+
partitionMap = null;
170+
createPartitionMap(sourceConfig);
170171

171172
mongoClient =
172173
MongoClients.create(
@@ -407,17 +408,14 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
407408
LOGGER.info("Namespace not found cursor closed.");
408409
} else {
409410
LOGGER.warn(
410-
"Failed to resume change stream: {} {}\n"
411+
"Failed to resume change stream: {} {}\n\n"
411412
+ "=====================================================================================\n"
412413
+ "If the resume token is no longer available then there is the potential for data loss.\n"
413414
+ "Saved resume tokens are managed by Kafka and stored with the offset data.\n\n"
414-
+ "To restarting the change stream with no resume token set `errors.tolerance=all` \n"
415-
+ "or by manually removing the old token.\n\n"
416-
+ "When running Connect in standalone mode offsets are configured using the:\n"
417-
+ "`offset.storage.file.filename` configuration.\n"
418-
+ "When running Connect in distributed mode the offsets are stored in a topic.\n\n"
419-
+ "Use the `kafka-consumer-groups.sh` tool with the `--reset-offsets` flag to reset\n"
420-
+ "offsets.\n\n"
415+
+ "To restart the change stream with no resume token either: \n"
416+
+ " * Create a new partition name using the `offset.partition.name` configuration.\n"
417+
+ " * Set `errors.tolerance=all` and ignore the erroring resume token. \n"
418+
+ " * Manually remove the old offset from its configured storage.\n\n"
421419
+ "Resetting the offset will allow for the connector to be resume from the latest resume\n"
422420
+ "token. Using `copy.existing=true` ensures that all data will be outputted by the\n"
423421
+ "connector but it will duplicate existing data.\n"
@@ -462,41 +460,40 @@ String getTopicNameFromNamespace(final String prefix, final BsonDocument namespa
462460
}
463461

464462
Map<String, Object> createPartitionMap(final MongoSourceConfig sourceConfig) {
465-
return createPartitionMap(sourceConfig, partitionMap == null);
466-
}
467-
468-
private Map<String, Object> createPartitionMap(
469-
final MongoSourceConfig sourceConfig, final boolean recreate) {
470-
if (recreate) {
471-
partitionMap = singletonMap(NS_KEY, createNamespaceString(sourceConfig, false));
463+
if (partitionMap == null) {
464+
String partitionName = sourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG);
465+
if (partitionName.isEmpty()) {
466+
partitionName = createDefaultPartitionName(sourceConfig);
467+
}
468+
partitionMap = singletonMap(NS_KEY, partitionName);
472469
}
473470
return partitionMap;
474471
}
475472

476473
Map<String, Object> createLegacyPartitionMap(final MongoSourceConfig sourceConfig) {
477-
return singletonMap(NS_KEY, createNamespaceString(sourceConfig, true));
474+
return singletonMap(NS_KEY, createLegacyPartitionName(sourceConfig));
478475
}
479476

480-
String createNamespaceString(final MongoSourceConfig sourceConfig, final boolean legacy) {
481-
if (legacy) {
482-
return format(
483-
"%s/%s.%s",
484-
sourceConfig.getString(CONNECTION_URI_CONFIG),
485-
sourceConfig.getString(DATABASE_CONFIG),
486-
sourceConfig.getString(COLLECTION_CONFIG));
487-
} else {
488-
ConnectionString connectionString = sourceConfig.getConnectionString();
489-
StringBuilder builder = new StringBuilder();
490-
builder.append(connectionString.isSrvProtocol() ? "mongodb+srv://" : "mongodb://");
491-
builder.append(String.join(",", connectionString.getHosts()));
492-
builder.append("/");
493-
builder.append(sourceConfig.getString(DATABASE_CONFIG));
494-
if (!sourceConfig.getString(COLLECTION_CONFIG).isEmpty()) {
495-
builder.append(".");
496-
builder.append(sourceConfig.getString(COLLECTION_CONFIG));
497-
}
498-
return builder.toString();
477+
String createLegacyPartitionName(final MongoSourceConfig sourceConfig) {
478+
return format(
479+
"%s/%s.%s",
480+
sourceConfig.getString(CONNECTION_URI_CONFIG),
481+
sourceConfig.getString(DATABASE_CONFIG),
482+
sourceConfig.getString(COLLECTION_CONFIG));
483+
}
484+
485+
String createDefaultPartitionName(final MongoSourceConfig sourceConfig) {
486+
ConnectionString connectionString = sourceConfig.getConnectionString();
487+
StringBuilder builder = new StringBuilder();
488+
builder.append(connectionString.isSrvProtocol() ? "mongodb+srv://" : "mongodb://");
489+
builder.append(String.join(",", connectionString.getHosts()));
490+
builder.append("/");
491+
builder.append(sourceConfig.getString(DATABASE_CONFIG));
492+
if (!sourceConfig.getString(COLLECTION_CONFIG).isEmpty()) {
493+
builder.append(".");
494+
builder.append(sourceConfig.getString(COLLECTION_CONFIG));
499495
}
496+
return builder.toString();
500497
}
501498

502499
/**
@@ -642,7 +639,8 @@ Map<String, Object> getOffset(final MongoSourceConfig sourceConfig) {
642639
if (context != null) {
643640
Map<String, Object> offset =
644641
context.offsetStorageReader().offset(createPartitionMap(sourceConfig));
645-
if (offset == null) {
642+
if (offset == null
643+
&& sourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG).isEmpty()) {
646644
offset = context.offsetStorageReader().offset(createLegacyPartitionMap(sourceConfig));
647645
}
648646
return offset;
@@ -659,7 +657,7 @@ BsonDocument getResumeToken(final MongoSourceConfig sourceConfig) {
659657
invalidatedCursor = false;
660658
} else {
661659
Map<String, Object> offset = getOffset(sourceConfig);
662-
if (offset != null && !offset.containsKey(COPY_KEY)) {
660+
if (offset != null && offset.containsKey(ID_FIELD) && !offset.containsKey(COPY_KEY)) {
663661
resumeToken = BsonDocument.parse((String) offset.get(ID_FIELD));
664662
if (offset.containsKey(HEARTBEAT_KEY)) {
665663
LOGGER.info("Resume token from heartbeat: {}", resumeToken);

src/test/java/com/mongodb/kafka/connect/source/MongoSourceTaskTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -386,31 +386,31 @@ void testCreatesTheExpectedPartitionMap() {
386386

387387
assertEquals(
388388
format("mongodb+srv://localhost/%s.%s", TEST_DATABASE, TEST_COLLECTION),
389-
task.createNamespaceString(cfg, false));
389+
task.createDefaultPartitionName(cfg));
390390
assertEquals(
391391
format("mongodb+srv://user:password@localhost//%s.%s", TEST_DATABASE, TEST_COLLECTION),
392-
task.createNamespaceString(cfg, true));
392+
task.createLegacyPartitionName(cfg));
393393

394394
cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost/");
395395
cfg = new MongoSourceConfig(cfgMap);
396396
assertEquals(
397397
format("mongodb://localhost/%s.%s", TEST_DATABASE, TEST_COLLECTION),
398-
task.createNamespaceString(cfg, false));
398+
task.createDefaultPartitionName(cfg));
399399
assertEquals(
400400
format("mongodb://localhost//%s.%s", TEST_DATABASE, TEST_COLLECTION),
401-
task.createNamespaceString(cfg, true));
401+
task.createLegacyPartitionName(cfg));
402402

403403
cfgMap.remove(COLLECTION_CONFIG);
404404
cfg = new MongoSourceConfig(cfgMap);
405405
assertEquals(
406-
format("mongodb://localhost/%s", TEST_DATABASE), task.createNamespaceString(cfg, false));
406+
format("mongodb://localhost/%s", TEST_DATABASE), task.createDefaultPartitionName(cfg));
407407
assertEquals(
408-
format("mongodb://localhost//%s.", TEST_DATABASE), task.createNamespaceString(cfg, true));
408+
format("mongodb://localhost//%s.", TEST_DATABASE), task.createLegacyPartitionName(cfg));
409409

410410
cfgMap.remove(DATABASE_CONFIG);
411411
cfg = new MongoSourceConfig(cfgMap);
412-
assertEquals("mongodb://localhost/", task.createNamespaceString(cfg, false));
413-
assertEquals("mongodb://localhost//.", task.createNamespaceString(cfg, true));
412+
assertEquals("mongodb://localhost/", task.createDefaultPartitionName(cfg));
413+
assertEquals("mongodb://localhost//.", task.createLegacyPartitionName(cfg));
414414
}
415415

416416
private void resetMocks() {

0 commit comments

Comments
 (0)