Skip to content

Commit f28fe94

Browse files
patwakeemrozza
authored andcommitted
Add UuidInValueStrategy
Update IdStrategyTest KAFKA-101
1 parent 38356f1 commit f28fe94

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
package com.mongodb.kafka.connect.sink.processor.id.strategy;
19+
20+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
21+
22+
import java.util.Optional;
23+
import java.util.UUID;
24+
25+
import org.apache.kafka.connect.errors.DataException;
26+
import org.apache.kafka.connect.sink.SinkRecord;
27+
28+
import org.bson.BsonBinary;
29+
import org.bson.BsonDocument;
30+
import org.bson.BsonNull;
31+
import org.bson.BsonValue;
32+
import org.bson.UuidRepresentation;
33+
34+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
35+
36+
public class UuidInValueStrategy implements IdStrategy {
37+
38+
private static final int UUID_LENGTH = 36;
39+
private static final int UUID_LENGTH_NO_DASHES = 32;
40+
41+
@Override
42+
public BsonValue generateId(final SinkDocument doc, final SinkRecord orig) {
43+
Optional<BsonDocument> optionalDoc = doc.getValueDoc();
44+
BsonValue id = optionalDoc.map(d -> d.get(ID_FIELD))
45+
.orElseThrow(() -> new DataException("Error: provided id strategy is used but the document structure either contained"
46+
+ " no _id field or it was null"));
47+
48+
if (id instanceof BsonNull) {
49+
throw new DataException("Error: provided id strategy used but the document structure contained an _id of type BsonNull");
50+
}
51+
52+
return new BsonBinary(constructUuidObjectFromString(id.asString().getValue()), UuidRepresentation.STANDARD);
53+
}
54+
55+
private UUID constructUuidObjectFromString(String uuid) {
56+
if (uuid.length() == UUID_LENGTH) {
57+
return UUID.fromString(uuid);
58+
} else if (uuid.length() == UUID_LENGTH_NO_DASHES) {
59+
return UUID.fromString(uuid.replaceFirst(
60+
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)",
61+
"$1-$2-$3-$4-$5"
62+
));
63+
}
64+
65+
throw new DataException("UUID cannot be constructed from provided string.");
66+
}
67+
}

src/test/java/com/mongodb/kafka/connect/sink/processor/id/strategy/IdStrategyTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.util.ArrayList;
3636
import java.util.List;
37+
import java.util.UUID;
3738

3839
import org.apache.kafka.connect.errors.DataException;
3940
import org.apache.kafka.connect.sink.SinkRecord;
@@ -44,12 +45,14 @@
4445
import org.junit.platform.runner.JUnitPlatform;
4546
import org.junit.runner.RunWith;
4647

48+
import org.bson.BsonBinary;
4749
import org.bson.BsonDocument;
4850
import org.bson.BsonInt32;
4951
import org.bson.BsonNull;
5052
import org.bson.BsonObjectId;
5153
import org.bson.BsonString;
5254
import org.bson.BsonValue;
55+
import org.bson.UuidRepresentation;
5356

5457
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
5558
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
@@ -166,6 +169,31 @@ List<DynamicTest> testIdGenerationStrategies() {
166169
);
167170
assertEquals(new BsonDocument(), idS6.generateId(sdWithoutKeyDoc, null));
168171
}));
172+
173+
IdStrategy idS7 = new UuidInValueStrategy();
174+
idTests.add(dynamicTest(UuidInValueStrategy.class.getSimpleName() + " in value", () -> {
175+
String idValue = "6d01622d-b3d5-466d-ae48-e414901af8f2";
176+
UUID idUuid = UUID.fromString(idValue);
177+
SinkDocument sdWithIdInValueDoc = new SinkDocument(null, new BsonDocument("_id", new BsonString(idValue)));
178+
SinkDocument sdWithoutIdInValueDoc = new SinkDocument(null, new BsonDocument());
179+
SinkDocument sdWithBsonNullIdInValueDoc = new SinkDocument(null, new BsonDocument());
180+
SinkDocument sdWithInvalidUuidInValueDoc = new SinkDocument(null, new BsonDocument("_id", new BsonString("invalid")));
181+
BsonValue id = idS7.generateId(sdWithIdInValueDoc, null);
182+
183+
assertAll("id checks",
184+
() -> assertTrue(id instanceof BsonBinary),
185+
() -> {
186+
BsonBinary bin = (BsonBinary) id;
187+
UUID foundUuid = bin.asUuid(UuidRepresentation.STANDARD);
188+
assertEquals(idUuid, foundUuid);
189+
assertEquals(idValue, foundUuid.toString());
190+
}
191+
);
192+
assertThrows(DataException.class, () -> idS7.generateId(sdWithoutIdInValueDoc, null));
193+
assertThrows(DataException.class, () -> idS7.generateId(sdWithBsonNullIdInValueDoc, null));
194+
assertThrows(DataException.class, () -> idS7.generateId(sdWithInvalidUuidInValueDoc, null));
195+
}));
196+
169197
return idTests;
170198
}
171199

0 commit comments

Comments
 (0)