Skip to content

Commit 7fb677e

Browse files
authored
Fixed bug in LazyBsonDocument#clone ignoring any changes made once unwrapped.
KAFKA-218
1 parent 9ecef4f commit 7fb677e

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
the dependencies needed for running the connector with confluent. `mongo-kafka-connect-<version>-all.jar` now also includes `Avro`
1111
dependencies for ease of deployment for alternative Kafka connect runtimes.
1212

13+
### Bug Fixes
14+
- [KAFKA-218](https://jira.mongodb.org/browse/KAFKA-218) Fixed bug in LazyBsonDocument#clone ignoring any changes made once unwrapped.
15+
1316
## 1.5.1
1417

1518
### Bug Fixes

src/main/java/com/mongodb/kafka/connect/sink/converter/LazyBsonDocument.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ public String toString() {
145145

146146
@Override
147147
public BsonDocument clone() {
148-
return new LazyBsonDocument(record, dataType, converter);
148+
return unwrapped != null
149+
? unwrapped.clone()
150+
: new LazyBsonDocument(record, dataType, converter);
149151
}
150152

151153
private BsonDocument getUnwrapped() {

src/test/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordDataTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.junit.jupiter.api.Assertions.assertNotNull;
3333
import static org.junit.jupiter.api.Assertions.assertNull;
3434
import static org.junit.jupiter.api.Assertions.assertThrows;
35+
import static org.junit.jupiter.api.Assertions.assertTrue;
3536

3637
import org.apache.kafka.connect.data.Schema;
3738
import org.apache.kafka.connect.errors.DataException;
@@ -46,6 +47,7 @@
4647
import com.mongodb.MongoNamespace;
4748
import com.mongodb.client.model.ReplaceOneModel;
4849
import com.mongodb.client.model.ReplaceOptions;
50+
import com.mongodb.client.model.UpdateOneModel;
4951

5052
import com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler;
5153
import com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper;
@@ -197,6 +199,52 @@ INVALID_SINK_RECORD, createSinkConfig(ERRORS_TOLERANCE_CONFIG, "all"))
197199
FIELD_NAMESPACE_MAPPER_ERROR_IF_INVALID_CONFIG)))));
198200
}
199201

202+
@Test
203+
@DisplayName("Rename _id handling")
204+
void testRenameIdHandling() {
205+
SinkRecord sinkRecord =
206+
new SinkRecord(
207+
TEST_TOPIC,
208+
0,
209+
Schema.STRING_SCHEMA,
210+
"{_id: 1}",
211+
Schema.STRING_SCHEMA,
212+
"{a: 'a', b: 'b', c: 'c', d: 'd'}",
213+
1);
214+
215+
String topicConfig =
216+
"{"
217+
+ "'writemodel.strategy': 'com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy',"
218+
+ "'post.processor.chain': 'com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping, "
219+
+ "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder',"
220+
+ "'field.renamer.mapping': '[{oldName: \"value.c\", newName: \"_id\"}]',"
221+
+ "'document.id.strategy': 'com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy',"
222+
+ "'document.id.strategy.overwrite.existing': 'true',"
223+
+ "'document.id.strategy.partial.value.projection.type': 'allowlist',"
224+
+ "'document.id.strategy.partial.value.projection.list': 'a, b, _id'"
225+
+ "}";
226+
227+
MongoProcessedSinkRecordData processedData =
228+
new MongoProcessedSinkRecordData(sinkRecord, createSinkConfig(topicConfig));
229+
assertNull(processedData.getException());
230+
UpdateOneModel<BsonDocument> writeModel =
231+
(UpdateOneModel<BsonDocument>) processedData.getWriteModel();
232+
assertTrue(writeModel.getOptions().isUpsert());
233+
assertEquals(BsonDocument.parse("{'a': 'a', 'b': 'b', '_id': 'c'}"), writeModel.getFilter());
234+
235+
BsonDocument update = (BsonDocument) writeModel.getUpdate();
236+
assertNotNull(update);
237+
BsonDocument setDocument = update.getDocument("$set", new BsonDocument());
238+
assertTrue(setDocument.containsKey("_modifiedTS"));
239+
setDocument.remove("_modifiedTS");
240+
assertEquals(BsonDocument.parse("{'a': 'a', 'b': 'b', 'd': 'd'}"), setDocument);
241+
242+
BsonDocument setOnInsertDocument = update.getDocument("$setOnInsert", new BsonDocument());
243+
assertTrue(setOnInsertDocument.containsKey("_insertedTS"));
244+
setOnInsertDocument.remove("_insertedTS");
245+
assertTrue(setOnInsertDocument.isEmpty());
246+
}
247+
200248
void assertWriteModel(final MongoProcessedSinkRecordData processedData) {
201249
assertWriteModel(processedData, EXPECTED_WRITE_MODEL);
202250
}

src/test/java/com/mongodb/kafka/connect/sink/converter/LazyBsonDocumentTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class LazyBsonDocumentTest {
4949
new SinkRecord("topic", 0, Schema.STRING_SCHEMA, NOT_JSON, Schema.STRING_SCHEMA, JSON, 1L);
5050
private static final SinkRecord SINK_RECORD_ALT_VALUE =
5151
new SinkRecord("topic", 0, Schema.STRING_SCHEMA, JSON, Schema.STRING_SCHEMA, NOT_JSON, 1L);
52+
private static final SinkRecord VALID_SINK_RECORD =
53+
new SinkRecord("topic", 0, Schema.STRING_SCHEMA, JSON, Schema.STRING_SCHEMA, JSON, 1L);
5254

5355
@Test
5456
@DisplayName("test invalid LazyBsonDocuments")
@@ -154,6 +156,16 @@ void testClone() {
154156
Type.VALUE,
155157
(Schema schema, Object data) -> BsonDocument.parse(data.toString()));
156158
assertDoesNotThrow(lazyBsonDocument::clone);
159+
},
160+
() -> {
161+
LazyBsonDocument lazyBsonDocument =
162+
new LazyBsonDocument(
163+
VALID_SINK_RECORD,
164+
Type.VALUE,
165+
(Schema schema, Object data) -> BsonDocument.parse(data.toString()));
166+
lazyBsonDocument.remove("_id");
167+
assertEquals(lazyBsonDocument, lazyBsonDocument.clone());
168+
assertEquals(BsonDocument.parse("{a: 'a', b: 'b', c: 'c'}"), lazyBsonDocument.clone());
157169
});
158170
}
159171
}

0 commit comments

Comments
 (0)