Skip to content

Commit d7d3d6a

Browse files
committed
Made the provided UUID code work for keys and values
KAFKA-101
1 parent f28fe94 commit d7d3d6a

File tree

6 files changed

+154
-73
lines changed

6 files changed

+154
-73
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- [KAFKA-110](https://jira.mongodb.org/browse/KAFKA-110) Added `document.id.strategy.overwrite.existing` configuration.
1010
Note: This defaults to false, which is a change of behaviour.
1111
- [KAFKA-118](https://jira.mongodb.org/browse/KAFKA-118) Made UuidStrategy configurable so can output BsonBinary Uuid values
12+
- [KAFKA-101](https://jira.mongodb.org/browse/KAFKA-101) Added `UuidProvidedInKeyStrategy` & `UuidProvidedInValueStrategy`
1213

1314
## 1.1.0
1415
- [KAFKA-45](https://jira.mongodb.org/browse/KAFKA-45) Allow the Sink connector to ignore unused source record key or value fields.

src/main/java/com/mongodb/kafka/connect/sink/processor/id/strategy/UuidInValueStrategy.java

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
package com.mongodb.kafka.connect.sink.processor.id.strategy;
19+
20+
public class UuidProvidedInKeyStrategy extends UuidProvidedStrategy {
21+
22+
UuidProvidedInKeyStrategy() {
23+
super(ProvidedIn.KEY);
24+
}
25+
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
package com.mongodb.kafka.connect.sink.processor.id.strategy;
19+
20+
public class UuidProvidedInValueStrategy extends UuidProvidedStrategy {
21+
22+
UuidProvidedInValueStrategy() {
23+
super(ProvidedIn.VALUE);
24+
}
25+
26+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package com.mongodb.kafka.connect.sink.processor.id.strategy;
18+
19+
import static java.lang.String.format;
20+
21+
import java.util.UUID;
22+
23+
import org.apache.kafka.connect.errors.DataException;
24+
import org.apache.kafka.connect.sink.SinkRecord;
25+
26+
import org.bson.BsonBinary;
27+
import org.bson.BsonBinarySubType;
28+
import org.bson.BsonValue;
29+
import org.bson.UuidRepresentation;
30+
31+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
32+
33+
class UuidProvidedStrategy extends ProvidedStrategy {
34+
35+
private static final int UUID_LENGTH = 36;
36+
private static final int UUID_LENGTH_NO_DASHES = 32;
37+
38+
UuidProvidedStrategy(final ProvidedIn where) {
39+
super(where);
40+
}
41+
42+
@Override
43+
public BsonValue generateId(final SinkDocument doc, final SinkRecord orig) {
44+
BsonValue id = super.generateId(doc, orig);
45+
46+
if (id.isBinary() && BsonBinarySubType.isUuid(id.asBinary().getType())) {
47+
return id;
48+
} else if (id.isString()) {
49+
return new BsonBinary(constructUuidObjectFromString(id.asString().getValue()), UuidRepresentation.STANDARD);
50+
}
51+
52+
throw new DataException(format("UUID cannot be constructed from provided value: `%s`", id));
53+
}
54+
55+
private UUID constructUuidObjectFromString(final String uuid) {
56+
try {
57+
if (uuid.length() == UUID_LENGTH) {
58+
return UUID.fromString(uuid);
59+
} else if (uuid.length() == UUID_LENGTH_NO_DASHES) {
60+
return UUID.fromString(uuid.replaceFirst(
61+
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)",
62+
"$1-$2-$3-$4-$5"
63+
));
64+
}
65+
} catch (Exception e) {
66+
// ignore
67+
}
68+
69+
throw new DataException(format("UUID cannot be constructed from provided value: `%s`", uuid));
70+
}
71+
}

src/test/java/com/mongodb/kafka/connect/sink/processor/id/strategy/IdStrategyTest.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,39 @@ List<DynamicTest> testIdGenerationStrategies() {
170170
assertEquals(new BsonDocument(), idS6.generateId(sdWithoutKeyDoc, null));
171171
}));
172172

173-
IdStrategy idS7 = new UuidInValueStrategy();
174-
idTests.add(dynamicTest(UuidInValueStrategy.class.getSimpleName() + " in value", () -> {
173+
IdStrategy idS7 = new UuidProvidedInKeyStrategy();
174+
idTests.add(dynamicTest(UuidProvidedInKeyStrategy.class.getSimpleName() + " in key", () -> {
175+
String idValue = "6d01622d-b3d5-466d-ae48-e414901af8f2";
176+
UUID idUuid = UUID.fromString(idValue);
177+
SinkDocument sdWithIdInKeyDoc = new SinkDocument(new BsonDocument("_id", new BsonString(idValue)), null);
178+
SinkDocument sdWithoutIdInKeyDoc = new SinkDocument(new BsonDocument(), null);
179+
SinkDocument sdWithBsonNullIdInKeyDoc = new SinkDocument(new BsonDocument(), null);
180+
SinkDocument sdWithInvalidUuidInKeyDoc = new SinkDocument(new BsonDocument("_id", new BsonString("invalid")), null);
181+
BsonValue id = idS7.generateId(sdWithIdInKeyDoc, null);
182+
183+
assertAll("id checks",
184+
() -> assertTrue(id instanceof BsonBinary),
185+
() -> {
186+
BsonBinary bin = (BsonBinary) id;
187+
UUID foundUuid = bin.asUuid(UuidRepresentation.STANDARD);
188+
assertEquals(idUuid, foundUuid);
189+
assertEquals(idValue, foundUuid.toString());
190+
}
191+
);
192+
assertThrows(DataException.class, () -> idS7.generateId(sdWithoutIdInKeyDoc, null));
193+
assertThrows(DataException.class, () -> idS7.generateId(sdWithBsonNullIdInKeyDoc, null));
194+
assertThrows(DataException.class, () -> idS7.generateId(sdWithInvalidUuidInKeyDoc, null));
195+
}));
196+
197+
IdStrategy idS8 = new UuidProvidedInValueStrategy();
198+
idTests.add(dynamicTest(UuidProvidedInValueStrategy.class.getSimpleName() + " in value", () -> {
175199
String idValue = "6d01622d-b3d5-466d-ae48-e414901af8f2";
176200
UUID idUuid = UUID.fromString(idValue);
177201
SinkDocument sdWithIdInValueDoc = new SinkDocument(null, new BsonDocument("_id", new BsonString(idValue)));
178202
SinkDocument sdWithoutIdInValueDoc = new SinkDocument(null, new BsonDocument());
179203
SinkDocument sdWithBsonNullIdInValueDoc = new SinkDocument(null, new BsonDocument());
180204
SinkDocument sdWithInvalidUuidInValueDoc = new SinkDocument(null, new BsonDocument("_id", new BsonString("invalid")));
181-
BsonValue id = idS7.generateId(sdWithIdInValueDoc, null);
205+
BsonValue id = idS8.generateId(sdWithIdInValueDoc, null);
182206

183207
assertAll("id checks",
184208
() -> assertTrue(id instanceof BsonBinary),
@@ -189,9 +213,9 @@ List<DynamicTest> testIdGenerationStrategies() {
189213
assertEquals(idValue, foundUuid.toString());
190214
}
191215
);
192-
assertThrows(DataException.class, () -> idS7.generateId(sdWithoutIdInValueDoc, null));
193-
assertThrows(DataException.class, () -> idS7.generateId(sdWithBsonNullIdInValueDoc, null));
194-
assertThrows(DataException.class, () -> idS7.generateId(sdWithInvalidUuidInValueDoc, null));
216+
assertThrows(DataException.class, () -> idS8.generateId(sdWithoutIdInValueDoc, null));
217+
assertThrows(DataException.class, () -> idS8.generateId(sdWithBsonNullIdInValueDoc, null));
218+
assertThrows(DataException.class, () -> idS8.generateId(sdWithInvalidUuidInValueDoc, null));
195219
}));
196220

197221
return idTests;

0 commit comments

Comments
 (0)