Skip to content

Commit 3d799f1

Browse files
committed
Added Dynamic Namespace Mapping for the Sink
Added a new interface `NamespaceMapper` with a `getNamespace` method. Implementations can use either the raw `SinkRecord` or the `SinkDocument` to determine the correct `MongoNamespace` to sink the data to. The sink connector includes two implementations: * `DefaultNamespaceMapper` Uses the configured database and the collection or topic name if no collection configured as the namespace. * `FieldPathNamespaceMapper` Uses a string from either the key or value document as the database or collection name. KAFKA-159
1 parent a791b0d commit 3d799f1

19 files changed

+1533
-853
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Improvements
88
- [KAFKA-167](https://jira.mongodb.org/browse/KAFKA-167) Updated MongoDB Java Driver to 4.1.
99
- [KAFKA-51](https://jira.mongodb.org/browse/KAFKA-51) Added sink support for MongoDB Changestream events.
10+
- [KAFKA-159](https://jira.mongodb.org/browse/KAFKA-159) Added dynamic namespace mapping for the sink connector.
1011

1112
### Bug Fixes
1213
- [KAFKA-171](https://jira.mongodb.org/browse/KAFKA-171) Fixed bug which made the top level inferred schema optional
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink;
18+
19+
import java.util.Optional;
20+
import java.util.function.Supplier;
21+
22+
import org.apache.kafka.connect.errors.DataException;
23+
import org.apache.kafka.connect.sink.SinkRecord;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import org.bson.BsonDocument;
28+
29+
import com.mongodb.MongoNamespace;
30+
import com.mongodb.client.model.WriteModel;
31+
32+
import com.mongodb.kafka.connect.sink.converter.SinkConverter;
33+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
34+
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategyHelper;
35+
36+
final class MongoProcessedSinkRecordData {
37+
private static final Logger LOGGER = LoggerFactory.getLogger(MongoProcessedSinkRecordData.class);
38+
private static final SinkConverter SINK_CONVERTER = new SinkConverter();
39+
40+
private final MongoSinkTopicConfig config;
41+
private final MongoNamespace namespace;
42+
private final SinkRecord sinkRecord;
43+
private final SinkDocument sinkDocument;
44+
private final WriteModel<BsonDocument> writeModel;
45+
46+
MongoProcessedSinkRecordData(final SinkRecord sinkRecord, final MongoSinkConfig sinkConfig) {
47+
this.sinkRecord = sinkRecord;
48+
this.config = sinkConfig.getMongoSinkTopicConfig(sinkRecord.topic());
49+
this.sinkDocument = SINK_CONVERTER.convert(sinkRecord);
50+
this.namespace = createNamespace();
51+
this.writeModel = createWriteModel();
52+
}
53+
54+
public MongoSinkTopicConfig getConfig() {
55+
return config;
56+
}
57+
58+
public MongoNamespace getNamespace() {
59+
return namespace;
60+
}
61+
62+
public SinkRecord getSinkRecord() {
63+
return sinkRecord;
64+
}
65+
66+
public WriteModel<BsonDocument> getWriteModel() {
67+
if (writeModel == null) {
68+
throw new DataException("Unable to create a valid WriteModel for the SinkRecord");
69+
}
70+
return writeModel;
71+
}
72+
73+
public boolean canProcess() {
74+
return namespace != null && writeModel != null;
75+
}
76+
77+
private MongoNamespace createNamespace() {
78+
return tryProcess(
79+
() -> Optional.of(config.getNamespaceMapper().getNamespace(sinkRecord, sinkDocument)))
80+
.orElse(null);
81+
}
82+
83+
private WriteModel<BsonDocument> createWriteModel() {
84+
return config.getCdcHandler().isPresent() ? buildWriteModelCDC() : buildWriteModel();
85+
}
86+
87+
private WriteModel<BsonDocument> buildWriteModel() {
88+
return tryProcess(
89+
() -> {
90+
config
91+
.getPostProcessors()
92+
.getPostProcessorList()
93+
.forEach(pp -> pp.process(sinkDocument, sinkRecord));
94+
return WriteModelStrategyHelper.createWriteModel(config, sinkDocument);
95+
})
96+
.orElse(null);
97+
}
98+
99+
private WriteModel<BsonDocument> buildWriteModelCDC() {
100+
return tryProcess(
101+
() -> config.getCdcHandler().flatMap(cdcHandler -> cdcHandler.handle(sinkDocument)))
102+
.orElse(null);
103+
}
104+
105+
private <T> Optional<T> tryProcess(final Supplier<Optional<T>> supplier) {
106+
try {
107+
return supplier.get();
108+
} catch (Exception e) {
109+
if (config.logErrors()) {
110+
LOGGER.error("Unable to process record {}", sinkRecord, e);
111+
}
112+
if (!config.tolerateErrors()) {
113+
throw e;
114+
}
115+
}
116+
return Optional.empty();
117+
}
118+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink;
18+
19+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.MAX_BATCH_SIZE_CONFIG;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.List;
24+
25+
import org.apache.kafka.connect.sink.SinkRecord;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
final class MongoSinkRecordProcessor {
30+
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkRecordProcessor.class);
31+
32+
static List<List<MongoProcessedSinkRecordData>> orderedGroupByTopicAndNamespace(
33+
final Collection<SinkRecord> records, final MongoSinkConfig sinkConfig) {
34+
LOGGER.debug("Number of sink records to process: {}", records.size());
35+
36+
List<List<MongoProcessedSinkRecordData>> orderedProcessedSinkRecordData = new ArrayList<>();
37+
List<MongoProcessedSinkRecordData> currentGroup = new ArrayList<>();
38+
MongoProcessedSinkRecordData previous = null;
39+
40+
for (SinkRecord record : records) {
41+
MongoProcessedSinkRecordData processedData =
42+
new MongoProcessedSinkRecordData(record, sinkConfig);
43+
44+
if (!processedData.canProcess()) {
45+
continue;
46+
}
47+
48+
if (previous == null) {
49+
previous = processedData;
50+
}
51+
52+
int maxBatchSize = processedData.getConfig().getInt(MAX_BATCH_SIZE_CONFIG);
53+
if (maxBatchSize > 0 && currentGroup.size() == maxBatchSize
54+
|| !previous.getSinkRecord().topic().equals(processedData.getSinkRecord().topic())
55+
|| !previous.getNamespace().equals(processedData.getNamespace())) {
56+
57+
orderedProcessedSinkRecordData.add(currentGroup);
58+
currentGroup = new ArrayList<>();
59+
}
60+
previous = processedData;
61+
currentGroup.add(processedData);
62+
}
63+
64+
if (!currentGroup.isEmpty()) {
65+
orderedProcessedSinkRecordData.add(currentGroup);
66+
}
67+
return orderedProcessedSinkRecordData;
68+
}
69+
70+
private MongoSinkRecordProcessor() {}
71+
}

0 commit comments

Comments
 (0)