Skip to content

Commit 47835f1

Browse files
committed
Fixed using ns field in copy.existing pipeline bug
KAFKA-217
1 parent 983173e commit 47835f1

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
### Bug Fixes
88
- [KAFKA-227](https://jira.mongodb.org/browse/KAFKA-195) Fixed wrapping nullable value returned from WriteModelStrategy
9+
- [KAFKA-217](https://jira.mongodb.org/browse/KAFKA-217) Fixed using `ns` field in `copy.existing` pipeline bug
910

1011
## 1.5.0
1112

src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@
6565
*/
6666
class MongoCopyDataManager implements AutoCloseable {
6767
private static final Logger LOGGER = LoggerFactory.getLogger(MongoCopyDataManager.class);
68-
static final String NAMESPACE_FIELD = "__";
69-
private static final byte[] NAMESPACE_BYTES = "ns".getBytes(StandardCharsets.UTF_8);
68+
private static final String NAMESPACE_FIELD = "ns";
69+
static final String ALT_NAMESPACE_FIELD = "__";
70+
private static final byte[] NAMESPACE_BYTES = NAMESPACE_FIELD.getBytes(StandardCharsets.UTF_8);
7071

7172
private static final String PIPELINE_TEMPLATE =
7273
format(
@@ -80,6 +81,12 @@ class MongoCopyDataManager implements AutoCloseable {
8081
+ "}",
8182
NAMESPACE_FIELD);
8283

84+
private static final BsonDocument ADD_ALT_NAMESPACE_STAGE =
85+
BsonDocument.parse(
86+
format("{'$addFields': {'%s': '$%s'}}", ALT_NAMESPACE_FIELD, NAMESPACE_FIELD));
87+
private static final BsonDocument UNSET_ORIGINAL_NAMESPACE_STAGE =
88+
BsonDocument.parse(format("{'$project': {'%s': 0}}", NAMESPACE_FIELD));
89+
8390
private volatile boolean closed;
8491
private volatile Exception errorException;
8592
private final AtomicInteger namespacesToCopy;
@@ -187,6 +194,8 @@ static List<Bson> createPipeline(final MongoSourceConfig cfg, final MongoNamespa
187194
BsonDocument.parse(
188195
format(PIPELINE_TEMPLATE, namespace.getDatabaseName(), namespace.getCollectionName())));
189196
cfg.getPipeline().map(pipeline::addAll);
197+
pipeline.add(ADD_ALT_NAMESPACE_STAGE);
198+
pipeline.add(UNSET_ORIGINAL_NAMESPACE_STAGE);
190199
return pipeline;
191200
}
192201

@@ -196,7 +205,7 @@ static RawBsonDocument convertDocument(final RawBsonDocument original) {
196205
int currentPosition = 0;
197206
reader.readStartDocument();
198207
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
199-
if (reader.readName().equals(NAMESPACE_FIELD)) {
208+
if (reader.readName().equals(ALT_NAMESPACE_FIELD)) {
200209
currentPosition++; // Adjust the current position to include the bson type
201210
byte[] sourceBytes = sourceBuffer.array();
202211
// Convert the namespace field in situ

src/test/java/com/mongodb/kafka/connect/source/MongoCopyDataManagerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.mongodb.kafka.connect.source;
1717

18-
import static com.mongodb.kafka.connect.source.MongoCopyDataManager.NAMESPACE_FIELD;
18+
import static com.mongodb.kafka.connect.source.MongoCopyDataManager.ALT_NAMESPACE_FIELD;
1919
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COLLECTION_CONFIG;
2020
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG;
2121
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG;
@@ -436,7 +436,7 @@ private static String createTemplate(final int id, final String dbName, final St
436436
}
437437

438438
private static RawBsonDocument createInput(final String json) {
439-
return RawBsonDocument.parse(format(json, NAMESPACE_FIELD));
439+
return RawBsonDocument.parse(format(json, ALT_NAMESPACE_FIELD));
440440
}
441441

442442
private static Optional<BsonDocument> createOutput(final String json) {

0 commit comments

Comments
 (0)