Skip to content

Commit 63fab6e

Browse files
committed
CDC: Remove oplog $v field from update
KAFKA-44
1 parent 065737c commit 63fab6e

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdate.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
public class MongoDbUpdate implements CdcOperation {
3838
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
3939
private static final String JSON_DOC_FIELD_PATH = "patch";
40+
public static final String INTERNAL_OPLOG_FIELD_V = "$v";
4041

4142
@Override
4243
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
@@ -47,6 +48,13 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4748

4849
try {
4950
BsonDocument updateDoc = BsonDocument.parse(valueDoc.getString(JSON_DOC_FIELD_PATH).getValue());
51+
52+
//Check if the internal "$v" field is contained which was added to the
53+
//oplog format in 3.6+ If so, then we simply remove it for now since
54+
//it's not used by the sink connector at the moment and would break
55+
//CDC-mode based "replication".
56+
updateDoc.remove(INTERNAL_OPLOG_FIELD_V);
57+
5058
//patch contains full new document for replacement
5159
if (updateDoc.containsKey(ID_FIELD)) {
5260
BsonDocument filterDoc = new BsonDocument(ID_FIELD, updateDoc.get(ID_FIELD));

src/test/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb/MongoDbUpdateTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.junit.runner.RunWith;
3030

3131
import org.bson.BsonDocument;
32+
import org.bson.BsonInt32;
3233
import org.bson.BsonString;
3334

3435
import com.mongodb.client.model.ReplaceOneModel;
@@ -43,6 +44,7 @@ class MongoDbUpdateTest {
4344
private static final BsonDocument FILTER_DOC = BsonDocument.parse("{_id: 1234}");
4445
private static final BsonDocument REPLACEMENT_DOC = BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper'}");
4546
private static final BsonDocument UPDATE_DOC = BsonDocument.parse("{$set: {first_name: 'Grace', last_name: 'Hopper'}}");
47+
private static final BsonDocument UPDATE_DOC_WITH_OPLOG_INTERNALS = UPDATE_DOC.clone().append("$v", new BsonInt32(1));
4648

4749
@Test
4850
@DisplayName("when valid doc replace cdc event then correct ReplaceOneModel")
@@ -79,6 +81,22 @@ void testValidSinkDocumentForUpdate() {
7981
assertEquals(FILTER_DOC, writeModel.getFilter());
8082
}
8183

84+
@Test
85+
@DisplayName("when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
86+
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
87+
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
88+
BsonDocument valueDoc = new BsonDocument("op", new BsonString("u"))
89+
.append("patch", new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));
90+
91+
WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
92+
assertTrue(result instanceof UpdateOneModel, () -> "result expected to be of type UpdateOneModel");
93+
94+
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
95+
assertEquals(UPDATE_DOC, writeModel.getUpdate(), () -> "update doc not matching what is expected");
96+
assertTrue(writeModel.getFilter() instanceof BsonDocument, () -> "filter expected to be of type BsonDocument");
97+
assertEquals(FILTER_DOC, writeModel.getFilter());
98+
}
99+
82100
@Test
83101
@DisplayName("when missing value doc then DataException")
84102
void testMissingValueDocument() {

0 commit comments

Comments
 (0)