Skip to content

Commit 5b9a5bd

Browse files
mmolimarrozza
authored andcommitted
Added Dynamic Topic Mapping for the Source
Added a new interface `TopicMapper` with a `getTopic` method. Implementations can take the Change Stream Document to determine the correct `topic` to publish the data to. The Source connector includes a single implementation: `DefaultTopicMapper`. The `DefaultTopicMapper` uses the 'ns' field to determine the topic to publish to. The following configuration options can help configure topic mapping: * `topic.prefix` - Add a prefix to the topic eg: `<prefix>.<databaseName>.<collectionName>` * `topic.suffix` - Add a suffix to the topic eg: `<databaseName>.<collectionName>.<suffix>` * `topic.namespace.map` - Set a json map to maps change stream document namespaces to topics: For example: `{\"db\": \"dbTopic\", \"db.coll\": \"dbCollTopic\"}` will map all Change Stream Documents from the `db` database to `dbTopic.<collectionName>` apart from any documents from the `db.coll` namespace which map to the `dbCollTopic` topic. * `topic.mapper` - use an alternative mapping class, users can implement their own `TopicMapper` class to handle the mapping logic. KAFKA-185
1 parent 3d799f1 commit 5b9a5bd

File tree

13 files changed

+639
-58
lines changed

13 files changed

+639
-58
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [KAFKA-167](https://jira.mongodb.org/browse/KAFKA-167) Updated MongoDB Java Driver to 4.1.
99
- [KAFKA-51](https://jira.mongodb.org/browse/KAFKA-51) Added sink support for MongoDB Changestream events.
1010
- [KAFKA-159](https://jira.mongodb.org/browse/KAFKA-159) Added dynamic namespace mapping for the sink connector.
11+
- [KAFKA-185](https://jira.mongodb.org/browse/KAFKA-185) Added topic mapping for the source connector.
1112

1213
### Bug Fixes
1314
- [KAFKA-171](https://jira.mongodb.org/browse/KAFKA-171) Fixed bug which made the top level inferred schema optional

config/MongoSourceConnector.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ database=test
88
collection=source
99

1010
topic.prefix=
11+
topic.suffix=
1112
poll.max.batch.size=1000
1213
poll.await.time.ms=5000
1314

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createDropDatabase;
2222
import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createInserts;
2323
import static com.mongodb.kafka.connect.source.schema.SchemaUtils.assertStructsEquals;
24+
import static java.lang.String.format;
2425
import static java.util.Collections.singletonList;
2526
import static java.util.Collections.singletonMap;
2627
import static java.util.stream.Collectors.toList;
@@ -379,6 +380,49 @@ void testSourceCanHandleInvalidResumeTokenWhenErrorToleranceIsAll() {
379380
}
380381
}
381382

383+
@Test
384+
@DisplayName("Ensure source sets the expected topic mapping")
385+
void testSourceTopicMapping() {
386+
assumeTrue(isGreaterThanThreeDotSix());
387+
try (AutoCloseableSourceTask task = createSourceTask()) {
388+
389+
MongoDatabase db = getDatabaseWithPostfix();
390+
MongoCollection<Document> coll1 = db.getCollection("coll1");
391+
MongoCollection<Document> coll2 = db.getCollection("coll2");
392+
393+
insertMany(rangeClosed(1, 50), coll1, coll2);
394+
395+
HashMap<String, String> cfg =
396+
new HashMap<String, String>() {
397+
{
398+
put(MongoSourceConfig.DATABASE_CONFIG, db.getName());
399+
put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
400+
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "100");
401+
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
402+
put(
403+
MongoSourceConfig.TOPIC_NAMESPACE_MAP_CONFIG,
404+
format(
405+
"{'%s': 'myDB', '%s': 'altDB.altColl'}",
406+
db.getName(), coll2.getNamespace().getFullName()));
407+
}
408+
};
409+
410+
task.start(cfg);
411+
412+
List<SourceRecord> firstPoll = getNextResults(task);
413+
assertAll(
414+
() -> assertSourceRecordValues(createInserts(1, 50), firstPoll, "myDB.coll1"),
415+
() -> assertSourceRecordValues(createInserts(1, 50), firstPoll, "altDB.altColl"));
416+
417+
insertMany(rangeClosed(51, 100), coll1, coll2);
418+
419+
List<SourceRecord> secondPoll = getNextResults(task);
420+
assertAll(
421+
() -> assertSourceRecordValues(createInserts(51, 100), secondPoll, "myDB.coll1"),
422+
() -> assertSourceRecordValues(createInserts(51, 100), secondPoll, "altDB.altColl"));
423+
}
424+
}
425+
382426
@Test
383427
@DisplayName("Ensure source can use custom offset partition names")
384428
void testSourceCanUseCustomOffsetPartitionNames() {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.source;
18+
19+
public interface Configurable {
20+
default void configure(MongoSourceConfig configuration) {}
21+
}

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static com.mongodb.kafka.connect.util.Validators.emptyString;
2626
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
2727
import static java.lang.String.format;
28+
import static java.util.Arrays.asList;
2829
import static java.util.Collections.emptyList;
2930
import static java.util.Collections.singletonList;
3031
import static org.apache.kafka.common.config.ConfigDef.Width;
@@ -51,6 +52,7 @@
5152

5253
import com.mongodb.kafka.connect.source.json.formatter.JsonWriterSettingsProvider;
5354
import com.mongodb.kafka.connect.source.schema.AvroSchema;
55+
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
5456
import com.mongodb.kafka.connect.util.ConfigHelper;
5557
import com.mongodb.kafka.connect.util.ConnectConfigException;
5658
import com.mongodb.kafka.connect.util.Validators;
@@ -126,13 +128,43 @@ public class MongoSourceConfig extends AbstractConfig {
126128
private static final String OUTPUT_SCHEMA_INFER_VALUE_DISPLAY =
127129
"Enable Infer Schemas for the value";
128130

131+
public static final String TOPIC_MAPPER_CONFIG = "topic.mapper";
132+
private static final String TOPIC_MAPPER_DISPLAY = "The topic mapper class";
133+
private static final String TOPIC_MAPPER_DOC =
134+
"The class that determines the topic to write the source data to. "
135+
+ "By default this will be based on the 'ns' field in the change stream document, "
136+
+ "along with any configured prefix and suffix.";
137+
private static final String TOPIC_MAPPER_DEFAULT =
138+
"com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper";
139+
129140
public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
130141
private static final String TOPIC_PREFIX_DOC =
131142
"Prefix to prepend to database & collection names to generate the name of the Kafka "
132-
+ "topic to publish data to.";
143+
+ "topic to publish data to. Used by the 'DefaultTopicMapper'.";
133144
private static final String TOPIC_PREFIX_DISPLAY = "Topic Prefix";
134145
private static final String TOPIC_PREFIX_DEFAULT = "";
135146

147+
public static final String TOPIC_SUFFIX_CONFIG = "topic.suffix";
148+
private static final String TOPIC_SUFFIX_DOC =
149+
"Suffix to append to database & collection names to generate the name of the Kafka "
150+
+ "topic to publish data to. Used by the 'DefaultTopicMapper'.";
151+
private static final String TOPIC_SUFFIX_DISPLAY = "Topic Suffix";
152+
private static final String TOPIC_SUFFIX_DEFAULT = "";
153+
154+
public static final String TOPIC_NAMESPACE_MAP_CONFIG = "topic.namespace.map";
155+
private static final String TOPIC_NAMESPACE_MAP_DISPLAY = "The namespace to topic map";
156+
private static final String TOPIC_NAMESPACE_MAP_DOC =
157+
"A json map that maps change stream document namespaces to topics.\n"
158+
+ "For example: `{\"db\": \"dbTopic\", \"db.coll\": \"dbCollTopic\"}` will map all "
159+
+ "change stream documents from the `db` database to `dbTopic.<collectionName>` apart from"
160+
+ "any documents from the `db.coll` namespace which map to the `dbCollTopic` topic.\n"
161+
+ "If you want to map all messages to a single topic use `*`: "
162+
+ "For example: `{\"*\": \"everyThingTopic\", \"db.coll\": \"exceptionToTheRuleTopic\"}` "
163+
+ "will map all change stream documents to the `everyThingTopic` apart from the `db.coll` "
164+
+ "messages."
165+
+ "Note: Any prefix and suffix configuration will still apply.";
166+
private static final String TOPIC_NAMESPACE_MAP_DEFAULT = "";
167+
136168
public static final String PIPELINE_CONFIG = "pipeline";
137169
private static final String PIPELINE_DISPLAY = "The pipeline to apply to the change stream";
138170
private static final String PIPELINE_DOC =
@@ -292,7 +324,7 @@ public class MongoSourceConfig extends AbstractConfig {
292324

293325
public static final ConfigDef CONFIG = createConfigDef();
294326
private static final List<Consumer<MongoSourceConfig>> INITIALIZERS =
295-
singletonList(MongoSourceConfig::validateCollection);
327+
asList(MongoSourceConfig::validateCollection, MongoSourceConfig::getTopicMapper);
296328

297329
public enum OutputFormat {
298330
JSON,
@@ -314,6 +346,7 @@ public String value() {
314346
}
315347

316348
private final ConnectionString connectionString;
349+
private TopicMapper topicMapper;
317350

318351
public MongoSourceConfig(final Map<?, ?> originals) {
319352
this(originals, true);
@@ -371,6 +404,16 @@ private void validateCollection() {
371404
}
372405
}
373406

407+
public TopicMapper getTopicMapper() {
408+
if (topicMapper == null) {
409+
topicMapper =
410+
configureInstance(
411+
createInstance(
412+
TOPIC_MAPPER_CONFIG, getString(TOPIC_MAPPER_CONFIG), TopicMapper.class));
413+
}
414+
return topicMapper;
415+
}
416+
374417
public JsonWriterSettings getJsonWriterSettings() {
375418
return createInstance(
376419
OUTPUT_JSON_FORMATTER_CONFIG,
@@ -388,6 +431,11 @@ public boolean tolerateErrors() {
388431
.equals(ErrorTolerance.ALL);
389432
}
390433

434+
private <T extends Configurable> T configureInstance(final T instance) {
435+
instance.configure(this);
436+
return instance;
437+
}
438+
391439
private static ConfigDef createConfigDef() {
392440
ConfigDef configDef =
393441
new ConfigDef() {
@@ -516,18 +564,6 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
516564
Width.MEDIUM,
517565
COLLATION_DISPLAY);
518566

519-
configDef.define(
520-
TOPIC_PREFIX_CONFIG,
521-
Type.STRING,
522-
TOPIC_PREFIX_DEFAULT,
523-
null,
524-
Importance.LOW,
525-
TOPIC_PREFIX_DOC,
526-
group,
527-
++orderInGroup,
528-
Width.MEDIUM,
529-
TOPIC_PREFIX_DISPLAY);
530-
531567
configDef.define(
532568
POLL_MAX_BATCH_SIZE_CONFIG,
533569
Type.INT,
@@ -552,6 +588,54 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
552588
Width.MEDIUM,
553589
POLL_AWAIT_TIME_MS_DISPLAY);
554590

591+
group = "Topic mapping";
592+
orderInGroup = 0;
593+
configDef.define(
594+
TOPIC_MAPPER_CONFIG,
595+
ConfigDef.Type.STRING,
596+
TOPIC_MAPPER_DEFAULT,
597+
Validators.matching(FULLY_QUALIFIED_CLASS_NAME),
598+
ConfigDef.Importance.HIGH,
599+
TOPIC_MAPPER_DOC,
600+
group,
601+
++orderInGroup,
602+
ConfigDef.Width.LONG,
603+
TOPIC_MAPPER_DISPLAY);
604+
605+
configDef.define(
606+
TOPIC_PREFIX_CONFIG,
607+
Type.STRING,
608+
TOPIC_PREFIX_DEFAULT,
609+
null,
610+
Importance.LOW,
611+
TOPIC_PREFIX_DOC,
612+
group,
613+
++orderInGroup,
614+
Width.MEDIUM,
615+
TOPIC_PREFIX_DISPLAY);
616+
configDef.define(
617+
TOPIC_SUFFIX_CONFIG,
618+
Type.STRING,
619+
TOPIC_SUFFIX_DEFAULT,
620+
null,
621+
Importance.LOW,
622+
TOPIC_SUFFIX_DOC,
623+
group,
624+
++orderInGroup,
625+
Width.MEDIUM,
626+
TOPIC_SUFFIX_DISPLAY);
627+
configDef.define(
628+
TOPIC_NAMESPACE_MAP_CONFIG,
629+
Type.STRING,
630+
TOPIC_NAMESPACE_MAP_DEFAULT,
631+
errorCheckingValueValidator("A valid JSON document", ConfigHelper::documentFromString),
632+
Importance.HIGH,
633+
TOPIC_NAMESPACE_MAP_DOC,
634+
group,
635+
++orderInGroup,
636+
Width.MEDIUM,
637+
TOPIC_NAMESPACE_MAP_DISPLAY);
638+
555639
group = "Schema";
556640
orderInGroup = 0;
557641

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static com.mongodb.kafka.connect.source.MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG;
2727
import static com.mongodb.kafka.connect.source.MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG;
2828
import static com.mongodb.kafka.connect.source.MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG;
29-
import static com.mongodb.kafka.connect.source.MongoSourceConfig.TOPIC_PREFIX_CONFIG;
3029
import static com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager.HEARTBEAT_KEY;
3130
import static com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers.createKeySchemaAndValueProvider;
3231
import static com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers.createValueSchemaAndValueProvider;
@@ -74,6 +73,7 @@
7473
import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
7574
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
7675
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducer;
76+
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
7777

7878
/**
7979
* A Kafka Connect source task that uses change streams to broadcast changes to the collection,
@@ -112,8 +112,6 @@ public final class MongoSourceTask extends SourceTask {
112112
private static final String CONNECTOR_TYPE = "source";
113113
public static final String ID_FIELD = "_id";
114114
private static final String COPY_KEY = "copy";
115-
private static final String DB_KEY = "db";
116-
private static final String COLL_KEY = "coll";
117115
private static final String NS_KEY = "ns";
118116
private static final String FULL_DOCUMENT = "fullDocument";
119117
private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
@@ -187,10 +185,10 @@ public List<SourceRecord> poll() {
187185
final long startPoll = time.milliseconds();
188186
LOGGER.debug("Polling Start: {}", startPoll);
189187
List<SourceRecord> sourceRecords = new ArrayList<>();
188+
TopicMapper topicMapper = sourceConfig.getTopicMapper();
190189
boolean publishFullDocumentOnly = sourceConfig.getBoolean(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG);
191190
int maxBatchSize = sourceConfig.getInt(POLL_MAX_BATCH_SIZE_CONFIG);
192191
long nextUpdate = startPoll + sourceConfig.getLong(POLL_AWAIT_TIME_MS_CONFIG);
193-
String prefix = sourceConfig.getString(TOPIC_PREFIX_CONFIG);
194192
Map<String, Object> partition = createPartitionMap(sourceConfig);
195193

196194
SchemaAndValueProducer keySchemaAndValueProducer =
@@ -224,10 +222,7 @@ public List<SourceRecord> poll() {
224222
sourceOffset.put(COPY_KEY, "true");
225223
}
226224

227-
String topicName =
228-
getTopicNameFromNamespace(
229-
prefix, changeStreamDocument.getDocument("ns", new BsonDocument()));
230-
225+
String topicName = topicMapper.getTopic(changeStreamDocument);
231226
if (topicName.isEmpty()) {
232227
LOGGER.warn(
233228
"No topic set. Could not publish the message: {}", changeStreamDocument.toJson());
@@ -447,17 +442,6 @@ private boolean resumeTokenNotFound(final MongoCommandException e) {
447442
|| errorMessage.contains(INVALID_RESUME_TOKEN));
448443
}
449444

450-
String getTopicNameFromNamespace(final String prefix, final BsonDocument namespaceDocument) {
451-
String topicName = "";
452-
if (namespaceDocument.containsKey(DB_KEY)) {
453-
topicName = namespaceDocument.getString(DB_KEY).getValue();
454-
if (namespaceDocument.containsKey(COLL_KEY)) {
455-
topicName = format("%s.%s", topicName, namespaceDocument.getString(COLL_KEY).getValue());
456-
}
457-
}
458-
return prefix.isEmpty() ? topicName : format("%s.%s", prefix, topicName);
459-
}
460-
461445
Map<String, Object> createPartitionMap(final MongoSourceConfig sourceConfig) {
462446
if (partitionMap == null) {
463447
String partitionName = sourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG);

0 commit comments

Comments
 (0)