diff --git a/src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java b/src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java index 4f2c80143..4d8d76c1c 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java @@ -16,6 +16,8 @@ package com.mongodb.kafka.connect.sink; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.function.Supplier; @@ -40,7 +42,7 @@ final class MongoProcessedSinkRecordData { private final MongoNamespace namespace; private final SinkRecord sinkRecord; private final SinkDocument sinkDocument; - private final WriteModel writeModel; + private final List> writeModelList; private Exception exception; MongoProcessedSinkRecordData(final SinkRecord sinkRecord, final MongoSinkConfig sinkConfig) { @@ -48,7 +50,7 @@ final class MongoProcessedSinkRecordData { this.config = sinkConfig.getMongoSinkTopicConfig(sinkRecord.topic()); this.sinkDocument = SINK_CONVERTER.convert(sinkRecord); this.namespace = createNamespace(); - this.writeModel = createWriteModel(); + this.writeModelList = createMultiRowWriteModel(); } public MongoSinkTopicConfig getConfig() { @@ -63,8 +65,8 @@ public SinkRecord getSinkRecord() { return sinkRecord; } - public WriteModel getWriteModel() { - return writeModel; + public List> getWriteModelList() { + return writeModelList; } public Exception getException() { @@ -77,8 +79,21 @@ private MongoNamespace createNamespace() { .orElse(null); } - private WriteModel createWriteModel() { - return config.getCdcHandler().isPresent() ? buildWriteModelCDC() : buildWriteModel(); + private List> createMultiRowWriteModel() { + return config.getCdcMultiRowHandler().isPresent() + ? buildMultiRowWriteModelCDC() + : config.getCdcHandler().isPresent() + ? Arrays.asList(buildWriteModelCDC()) + : Arrays.asList(buildWriteModel()); + } + + private List> buildMultiRowWriteModelCDC() { + return tryProcess( + () -> + config + .getCdcMultiRowHandler() + .flatMap(cdcHandler -> cdcHandler.handle(sinkDocument))) + .orElse(null); } private WriteModel buildWriteModel() { diff --git a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkRecordProcessor.java b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkRecordProcessor.java index 09938dd53..082d85a68 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkRecordProcessor.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkRecordProcessor.java @@ -48,7 +48,8 @@ static List> orderedGroupByTopicAndNamespace( if (processedData.getException() != null) { errorReporter.report(processedData.getSinkRecord(), processedData.getException()); continue; - } else if (processedData.getNamespace() == null || processedData.getWriteModel() == null) { + } else if (processedData.getNamespace() == null + || processedData.getWriteModelList() == null) { // Some CDC events can be Noops (eg tombstone events) continue; } diff --git a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java index 98c62d84c..60a6139a5 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java @@ -50,6 +50,7 @@ import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.kafka.connect.sink.cdc.CdcHandler; +import com.mongodb.kafka.connect.sink.cdc.CdcMultiRowHandler; import com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper; import com.mongodb.kafka.connect.sink.processor.PostProcessors; import com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy; @@ -335,6 +336,8 @@ public String value() { // Change Data Capture public static final String CHANGE_DATA_CAPTURE_HANDLER_CONFIG = "change.data.capture.handler"; + public static final String CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG = + "change.data.capture.multi.row.handler"; private static final String CHANGE_DATA_CAPTURE_HANDLER_DISPLAY = "The CDC handler"; private static final String CHANGE_DATA_CAPTURE_HANDLER_DOC = "The class name of the CDC handler to use for processing"; @@ -417,7 +420,8 @@ public String value() { MongoSinkTopicConfig::getWriteModelStrategy, MongoSinkTopicConfig::getDeleteOneWriteModelStrategy, MongoSinkTopicConfig::getRateLimitSettings, - MongoSinkTopicConfig::getCdcHandler); + MongoSinkTopicConfig::getCdcHandler, + MongoSinkTopicConfig::getCdcMultiRowHandler); private final String topic; private NamespaceMapper namespaceMapper; @@ -427,6 +431,7 @@ public String value() { private WriteModelStrategy deleteOneWriteModelStrategy; private RateLimitSettings rateLimitSettings; private CdcHandler cdcHandler; + private CdcMultiRowHandler cdcMultiRowHandler; MongoSinkTopicConfig(final String topic, final Map originals) { this(topic, originals, true); @@ -571,6 +576,27 @@ Optional getCdcHandler() { return Optional.of(this.cdcHandler); } + Optional getCdcMultiRowHandler() { + String cdcMultiRowHandler = getString(CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG); + if (cdcMultiRowHandler.isEmpty()) { + return Optional.empty(); + } + + if (this.cdcMultiRowHandler == null) { + this.cdcMultiRowHandler = + createInstance( + CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG, + cdcMultiRowHandler, + CdcMultiRowHandler.class, + () -> + (CdcMultiRowHandler) + Class.forName(cdcMultiRowHandler) + .getConstructor(MongoSinkTopicConfig.class) + .newInstance(this)); + } + return Optional.of(this.cdcMultiRowHandler); + } + RateLimitSettings getRateLimitSettings() { if (rateLimitSettings == null) { rateLimitSettings = @@ -1083,6 +1109,18 @@ private static ConfigDef createConfigDef() { ConfigDef.Width.MEDIUM, CHANGE_DATA_CAPTURE_HANDLER_DISPLAY); + configDef.define( + CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG, + ConfigDef.Type.STRING, + CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, + Validators.emptyString().or(Validators.matching(FULLY_QUALIFIED_CLASS_NAME)), + ConfigDef.Importance.LOW, + CHANGE_DATA_CAPTURE_HANDLER_DOC, + group, + ++orderInGroup, + ConfigDef.Width.MEDIUM, + CHANGE_DATA_CAPTURE_HANDLER_DISPLAY); + group = "Time series"; orderInGroup = 0; configDef.define( diff --git a/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java b/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java index d3e3b55c6..11c9d38e4 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java @@ -138,7 +138,8 @@ private void bulkWriteBatch(final List batch) { List> writeModels = batch.stream() - .map(MongoProcessedSinkRecordData::getWriteModel) + .map(MongoProcessedSinkRecordData::getWriteModelList) + .flatMap(list -> list.stream()) .collect(Collectors.toList()); boolean bulkWriteOrdered = config.getBoolean(BULK_WRITE_ORDERED_CONFIG); diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiOperation.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiOperation.java new file mode 100644 index 000000000..d08fb1743 --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiOperation.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl. + */ + +package com.mongodb.kafka.connect.sink.cdc; + +import java.util.List; + +import org.bson.BsonDocument; + +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.converter.SinkDocument; + +public interface CdcMultiOperation { + + List> perform(SinkDocument doc); +} diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiRowHandler.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiRowHandler.java new file mode 100644 index 000000000..a15c343e3 --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcMultiRowHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl. + */ + +package com.mongodb.kafka.connect.sink.cdc; + +import java.util.List; +import java.util.Optional; + +import org.bson.BsonDocument; + +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; +import com.mongodb.kafka.connect.sink.converter.SinkDocument; + +public abstract class CdcMultiRowHandler { + + private final MongoSinkTopicConfig config; + + public CdcMultiRowHandler(final MongoSinkTopicConfig config) { + this.config = config; + } + + public MongoSinkTopicConfig getConfig() { + return config; + } + + public abstract Optional>> handle(SinkDocument doc); +} diff --git a/src/test/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordDataTest.java b/src/test/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordDataTest.java index 87abd2ba5..90b22966c 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordDataTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordDataTest.java @@ -225,7 +225,7 @@ void testRenameIdHandling() { new MongoProcessedSinkRecordData(sinkRecord, createSinkConfig(topicConfig)); assertNull(processedData.getException()); UpdateOneModel writeModel = - (UpdateOneModel) processedData.getWriteModel(); + (UpdateOneModel) processedData.getWriteModelList().get(0); assertTrue(writeModel.getOptions().isUpsert()); assertEquals(BsonDocument.parse("{'a': 'a', 'b': 'b', '_id': 'c'}"), writeModel.getFilter()); @@ -251,7 +251,7 @@ void assertWriteModel( final ReplaceOneModel expectedWriteModel) { assertNull(processedData.getException()); ReplaceOneModel writeModel = - (ReplaceOneModel) processedData.getWriteModel(); + (ReplaceOneModel) processedData.getWriteModelList().get(0); assertEquals(expectedWriteModel.getFilter(), writeModel.getFilter()); assertEquals(expectedWriteModel.getReplacement(), writeModel.getReplacement()); assertEquals( diff --git a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java index df9d8adfb..ed5d02cb8 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java @@ -22,26 +22,7 @@ import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPICS_REGEX_CONFIG; import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG; import static com.mongodb.kafka.connect.sink.MongoSinkConfig.createOverrideKey; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.CHANGE_DATA_CAPTURE_HANDLER_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.COLLECTION_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.DATABASE_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FIELD_RENAMER_MAPPING_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FIELD_RENAMER_REGEXP_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_LIST_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_TYPE_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.NAMESPACE_MAPPER_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.POST_PROCESSOR_CHAIN_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_GRANULARITY_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TOPIC_OVERRIDE_PREFIX; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_TYPE_CONFIG; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.WRITEMODEL_STRATEGY_CONFIG; +import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.*; import static com.mongodb.kafka.connect.sink.SinkConfigSoftValidator.OBSOLETE_CONFIGS; import static com.mongodb.kafka.connect.sink.SinkTestHelper.CLIENT_URI_AUTH_SETTINGS; import static com.mongodb.kafka.connect.sink.SinkTestHelper.CLIENT_URI_DEFAULT_SETTINGS; @@ -72,6 +53,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -83,10 +65,16 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; +import org.bson.BsonDocument; + +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.cdc.CdcMultiRowHandler; import com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler; import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.RdbmsHandler; import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler; import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler; +import com.mongodb.kafka.connect.sink.converter.SinkDocument; import com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper; import com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector; import com.mongodb.kafka.connect.sink.processor.DocumentIdAdder; @@ -515,6 +503,47 @@ Collection testValidChangeDataCaptureHandlerNames() { return tests; } + public static class TestMultiRowHandler extends CdcMultiRowHandler { + public TestMultiRowHandler() { + super(null); + } + + public TestMultiRowHandler(MongoSinkTopicConfig config) { + super(config); + } + + @Override + public Optional>> handle(SinkDocument doc) { + return Optional.empty(); + } + } + + @TestFactory + @DisplayName("test valid change data capture multi row handler names") + Collection testValidChangeDataCaptureMultiRowHandlerNames() { + List tests = new ArrayList<>(); + String json = "{'%s': '%s'}"; + List cdcHandlers = asList(TestMultiRowHandler.class.getName()); + cdcHandlers.forEach( + s -> + tests.add( + dynamicTest( + "cdc multi row Handler for " + s, + () -> { + MongoSinkConfig cfg = + createSinkConfig( + format(json, CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG, s)); + assertEquals( + cfg.getMongoSinkTopicConfig(TEST_TOPIC) + .getCdcMultiRowHandler() + .get() + .getClass() + .getName(), + s); + }))); + return tests; + } + @Test @DisplayName("test invalid change data capture handler names") void testInvalidChangeDataCaptureHandlerNames() {