Skip to content

Commit 57e0cc8

Browse files
authored
Fix business key update strategies to use dot notation for filters (#39)
KAFKA-155
1 parent b64a11b commit 57e0cc8

File tree

6 files changed

+293
-59
lines changed

6 files changed

+293
-59
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- [KAFKA-154](https://jira.mongodb.org/browse/KAFKA-154) Improve the handling and error messaging for Json array config values.
2121
- [KAFKA-78](https://jira.mongodb.org/browse/KAFKA-78) Added dead letter queue support for the source connector.
2222
- [KAFKA-157](https://jira.mongodb.org/browse/KAFKA-157) Improved error message for business key errors.
23+
- [KAFKA-155](https://jira.mongodb.org/browse/KAFKA-155) Fix business key update strategies to use dot notation for filters
2324

2425

2526
## 1.2.0

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.mongodb.kafka.connect.sink.writemodel.strategy;
2020

2121
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
22+
import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys;
2223

2324
import org.apache.kafka.connect.errors.DataException;
2425

@@ -29,11 +30,17 @@
2930
import com.mongodb.client.model.ReplaceOptions;
3031
import com.mongodb.client.model.WriteModel;
3132

33+
import com.mongodb.kafka.connect.sink.Configurable;
34+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
3235
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
36+
import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy;
37+
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy;
38+
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy;
3339

34-
public class ReplaceOneBusinessKeyStrategy implements WriteModelStrategy {
40+
public class ReplaceOneBusinessKeyStrategy implements WriteModelStrategy, Configurable {
3541

3642
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
43+
private boolean isPartialId = false;
3744

3845
@Override
3946
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
@@ -48,6 +55,9 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
4855
try {
4956
BsonDocument businessKey = vd.getDocument(ID_FIELD);
5057
vd.remove(ID_FIELD);
58+
if (isPartialId) {
59+
businessKey = flattenKeys(businessKey);
60+
}
5161
return new ReplaceOneModel<>(businessKey, vd, REPLACE_OPTIONS);
5262
} catch (BSONException e) {
5363
throw new DataException(
@@ -56,4 +66,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
5666
+ " existing `_id` value in the business key then ensure `document.id.strategy.overwrite.existing=true`.");
5767
}
5868
}
69+
70+
@Override
71+
public void configure(final MongoSinkTopicConfig configuration) {
72+
IdStrategy idStrategy = configuration.getIdStrategy();
73+
isPartialId =
74+
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
75+
}
5976
}

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.kafka.connect.sink.writemodel.strategy;
1818

1919
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
20+
import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys;
2021

2122
import java.time.Instant;
2223

@@ -30,13 +31,19 @@
3031
import com.mongodb.client.model.UpdateOptions;
3132
import com.mongodb.client.model.WriteModel;
3233

34+
import com.mongodb.kafka.connect.sink.Configurable;
35+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
3336
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
37+
import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy;
38+
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy;
39+
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy;
3440

35-
public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy {
41+
public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy, Configurable {
3642

3743
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
3844
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
3945
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
46+
private boolean isPartialId = false;
4047

4148
@Override
4249
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
@@ -53,7 +60,9 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
5360
try {
5461
BsonDocument businessKey = vd.getDocument(ID_FIELD);
5562
vd.remove(ID_FIELD);
56-
63+
if (isPartialId) {
64+
businessKey = flattenKeys(businessKey);
65+
}
5766
return new UpdateOneModel<>(
5867
businessKey,
5968
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
@@ -67,4 +76,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
6776
+ "existing `_id` value in the business key then ensure `document.id.strategy.overwrite.existing=true`.");
6877
}
6978
}
79+
80+
@Override
81+
public void configure(final MongoSinkTopicConfig configuration) {
82+
IdStrategy idStrategy = configuration.getIdStrategy();
83+
isPartialId =
84+
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
85+
}
7086
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.writemodel.strategy;
18+
19+
import static java.lang.String.format;
20+
import static java.util.Collections.singletonList;
21+
import static java.util.stream.Collectors.toList;
22+
23+
import java.util.List;
24+
25+
import org.bson.BsonDocument;
26+
import org.bson.BsonElement;
27+
28+
final class WriteModelHelper {
29+
30+
private static final String CREATE_PREFIX = "%s%s.";
31+
private static final String ELEMENT_NAME_PREFIX = "%s%s";
32+
33+
static BsonDocument flattenKeys(final BsonDocument original) {
34+
BsonDocument businessKey = new BsonDocument();
35+
original.forEach(
36+
(key, value) ->
37+
flattenBsonElement("", new BsonElement(key, value))
38+
.forEach(b -> businessKey.append(b.getName(), b.getValue())));
39+
return businessKey;
40+
}
41+
42+
static List<BsonElement> flattenBsonElement(final String prefix, final BsonElement element) {
43+
if (element.getValue().isDocument()) {
44+
return element.getValue().asDocument().entrySet().stream()
45+
.flatMap(
46+
e ->
47+
flattenBsonElement(
48+
format(CREATE_PREFIX, prefix, element.getName()),
49+
new BsonElement(e.getKey(), e.getValue()))
50+
.stream())
51+
.collect(toList());
52+
}
53+
return singletonList(
54+
new BsonElement(
55+
format(ELEMENT_NAME_PREFIX, prefix, element.getName()), element.getValue()));
56+
}
57+
58+
private WriteModelHelper() {}
59+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.writemodel.strategy;
18+
19+
import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys;
20+
import static org.junit.jupiter.api.Assertions.assertAll;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
23+
import org.junit.jupiter.api.DisplayName;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.platform.runner.JUnitPlatform;
26+
import org.junit.runner.RunWith;
27+
28+
import org.bson.BsonDocument;
29+
30+
@RunWith(JUnitPlatform.class)
31+
class WriteModelHelperTest {
32+
33+
@Test
34+
@DisplayName("flatten keys works as expected")
35+
void testFlattenKeys() {
36+
assertAll(
37+
() -> assertEquals(BsonDocument.parse("{}"), flattenKeys(BsonDocument.parse("{}"))),
38+
() ->
39+
assertEquals(
40+
BsonDocument.parse("{a: 1, b: 1, c: [1]}"),
41+
flattenKeys(BsonDocument.parse("{a: 1, b: 1, c: [1]}"))),
42+
() ->
43+
assertEquals(
44+
BsonDocument.parse(
45+
"{'a.a': 1, 'b.b': 1, 'b.c.d': 1, 'b.c.e.f': 1, g: [{h: 1, i: {j: 1}}]}"),
46+
flattenKeys(
47+
BsonDocument.parse(
48+
"{a: {a: 1}, b: {b: 1, c: {d: 1, e: {f: 1}}}, g: [{h: 1, i: {j: 1}}]}"))));
49+
}
50+
}

0 commit comments

Comments
 (0)