Skip to content

Commit 491c485

Browse files
committed
Added key to SourceRecord
KAFKA-49
1 parent 6a5dbec commit 491c485

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ public List<SourceRecord> poll() {
133133

134134
jsonDocument.ifPresent((json) -> {
135135
LOGGER.trace("Adding {} to {}", json, topicName);
136-
sourceRecords.add(new SourceRecord(partition, sourceOffset, topicName, Schema.STRING_SCHEMA, json));
136+
String keyJson = new BsonDocument("_id", changeStreamDocument.get("_id")).toJson();
137+
sourceRecords.add(new SourceRecord(partition, sourceOffset, topicName, Schema.STRING_SCHEMA, keyJson,
138+
Schema.STRING_SCHEMA, json));
137139
});
138140

139141
// If the cursor is invalidated add the record and return calls

0 commit comments

Comments
 (0)