Skip to content

Commit 571f47e

Browse files
nareshmaharaj-consultantrozza
authored andcommitted
Added UpdateOneBusinessKeyTimestampStrategy
KAFKA-114
1 parent d7d3d6a commit 571f47e

File tree

4 files changed

+105
-0
lines changed

4 files changed

+105
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Note: This defaults to false, which is a change of behaviour.
1111
- [KAFKA-118](https://jira.mongodb.org/browse/KAFKA-118) Made UuidStrategy configurable so can output BsonBinary Uuid values
1212
- [KAFKA-101](https://jira.mongodb.org/browse/KAFKA-101) Added `UuidProvidedInKeyStrategy` & `UuidProvidedInValueStrategy`
13+
- [KAFKA-114](https://jira.mongodb.org/browse/KAFKA-114) Added `UpdateOneBusinessKeyTimestampStrategy` write model strategy`
1314

1415
## 1.1.0
1516
- [KAFKA-45](https://jira.mongodb.org/browse/KAFKA-45) Allow the Sink connector to ignore unused source record key or value fields.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.writemodel.strategy;
18+
19+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
20+
21+
import java.time.Instant;
22+
23+
import org.apache.kafka.connect.errors.DataException;
24+
25+
import org.bson.BSONException;
26+
import org.bson.BsonDateTime;
27+
import org.bson.BsonDocument;
28+
29+
import com.mongodb.client.model.UpdateOneModel;
30+
import com.mongodb.client.model.UpdateOptions;
31+
import com.mongodb.client.model.WriteModel;
32+
33+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
34+
35+
public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy {
36+
37+
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
38+
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
39+
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
40+
41+
@Override
42+
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
43+
BsonDocument vd = document.getValueDoc().orElseThrow(
44+
() -> new DataException("Error: cannot build the WriteModel since the value document was missing unexpectedly"));
45+
46+
BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());
47+
48+
try {
49+
BsonDocument businessKey = vd.getDocument(ID_FIELD);
50+
vd.remove(ID_FIELD);
51+
52+
return new UpdateOneModel<>(
53+
businessKey,
54+
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
55+
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
56+
UPDATE_OPTIONS);
57+
58+
} catch (BSONException e) {
59+
throw new DataException("Error: cannot build the WriteModel since the value document does not contain an _id field of"
60+
+ " type BsonDocument which holds the business key fields");
61+
}
62+
}
63+
64+
}

src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy;
9797
import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy;
9898
import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy;
99+
import com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy;
99100
import com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy;
100101
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
101102

@@ -415,6 +416,7 @@ Collection<DynamicTest> testGetSingleValidWriteModelStrategy() {
415416
put(ReplaceOneBusinessKeyStrategy.class.getName(), ReplaceOneBusinessKeyStrategy.class);
416417
put(ReplaceOneDefaultStrategy.class.getName(), ReplaceOneDefaultStrategy.class);
417418
put(UpdateOneTimestampsStrategy.class.getName(), UpdateOneTimestampsStrategy.class);
419+
put(UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class);
418420
}};
419421

420422
candidates.forEach((key, value) -> {
@@ -442,6 +444,7 @@ void testGetMultipleCollectionSpecificValidWriteModelStrategy() {
442444
put("topic-1", ReplaceOneDefaultStrategy.class);
443445
put("topic-2", ReplaceOneBusinessKeyStrategy.class);
444446
put("topic-3", UpdateOneTimestampsStrategy.class);
447+
put("topic-5", UpdateOneBusinessKeyTimestampStrategy.class);
445448
put("topic-4", DeleteOneDefaultStrategy.class);
446449
}};
447450

src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class WriteModelStrategyTest {
4646
private static final ReplaceOneBusinessKeyStrategy REPLACE_ONE_BUSINESS_KEY_STRATEGY = new ReplaceOneBusinessKeyStrategy();
4747
private static final UpdateOneTimestampsStrategy UPDATE_ONE_TIMESTAMPS_STRATEGY = new UpdateOneTimestampsStrategy();
4848

49+
private static final UpdateOneBusinessKeyTimestampStrategy
50+
UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_STRATEGY = new UpdateOneBusinessKeyTimestampStrategy();
51+
4952
private static final BsonDocument FILTER_DOC_DELETE_DEFAULT = BsonDocument.parse("{_id: {id: 1234}}");
5053
private static final BsonDocument FILTER_DOC_REPLACE_DEFAULT = BsonDocument.parse("{_id: 1234}");
5154
private static final BsonDocument REPLACEMENT_DOC_DEFAULT = BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper'}");
@@ -189,4 +192,38 @@ void testUpdateOneTimestampsStrategyWithValidSinkDocument() {
189192
assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
190193
assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
191194
}
195+
196+
@Test
197+
@DisplayName("when value document is missing for UpdateOneBusinessKeyTimestampStrategy then DataException")
198+
void testUpdateOneBusinessKeyTimestampsStrategyWithMissingValueDocument() {
199+
assertThrows(DataException.class,
200+
() -> UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(new BsonDocument(), null)));
201+
}
202+
203+
@Test
204+
@DisplayName("when sink document is valid for UpdateOneBusinessKeyTimestampStrategy then correct UpdateOneModel")
205+
void testUpdateOneBusinessKeyTimestampsStrategyWithValidSinkDocument() {
206+
BsonDocument valueDoc = BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper', active: false}");
207+
208+
WriteModel<BsonDocument> result = UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
209+
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
210+
211+
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
212+
213+
//NOTE: This test case can only check:
214+
// i) for both fields to be available
215+
// ii) having the correct BSON type (BsonDateTime)
216+
// iii) and be initially equal
217+
// The exact dateTime value is not directly testable here.
218+
BsonDocument updateDoc = (BsonDocument) writeModel.getUpdate();
219+
220+
BsonDateTime modifiedTS = updateDoc.getDocument("$set").getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_MODIFIED_TS);
221+
BsonDateTime insertedTS = updateDoc.getDocument("$setOnInsert")
222+
.getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_INSERTED_TS);
223+
224+
assertEquals(insertedTS, modifiedTS, "modified and inserted timestamps must initially be equal");
225+
assertTrue(writeModel.getFilter() instanceof BsonDocument, "filter expected to be of type BsonDocument");
226+
assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
227+
assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
228+
}
192229
}

0 commit comments

Comments
 (0)