Skip to content

Commit d2e2927

Browse files
committed
Fix inferred schema naming conventions
Ensures that schemas can be backwards compatible if optional fields don't exist. KAFKA-210
1 parent 2971fbe commit d2e2927

File tree

5 files changed

+110
-81
lines changed

5 files changed

+110
-81
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- [KAFKA-203](https://jira.mongodb.org/browse/KAFKA-203) Fixed sink NPE issue when using with confluent connect 6.1.0
1919
- [KAFKA-209](https://jira.mongodb.org/browse/KAFKA-209) Fixed `_id` always being projected even if not explicitly allowed or blocked.
2020
Log a warning message when there the `_id` value and the id strategy is configured not to overwrite the `_id`.
21+
- [KAFKA-210](https://jira.mongodb.org/browse/KAFKA-210) Fix inferred schema naming convention and ensure schemas can be backwards compatible.
2122

2223
## 1.4.0
2324

src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ final class InferSchemaAndValueProducer implements SchemaAndValueProducer {
3535
@Override
3636
public SchemaAndValue get(final BsonDocument changeStreamDocument) {
3737
return bsonValueToSchemaAndValue.toSchemaAndValue(
38-
inferDocumentSchema(changeStreamDocument, false, "default"), changeStreamDocument);
38+
inferDocumentSchema(changeStreamDocument), changeStreamDocument);
3939
}
4040
}

src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.mongodb.kafka.connect.source.schema;
1818

19-
import static java.lang.String.format;
20-
2119
import java.util.List;
2220
import java.util.Map;
2321
import java.util.Objects;
@@ -34,26 +32,35 @@ public final class BsonDocumentToSchema {
3432

3533
private static final String ID_FIELD = "_id";
3634
private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.OPTIONAL_STRING_SCHEMA;
37-
public static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s";
35+
public static final String DEFAULT_FIELD_NAME = "default";
36+
37+
public static Schema inferDocumentSchema(final BsonDocument document) {
38+
return createSchemaBuilder(DEFAULT_FIELD_NAME, document).required().build();
39+
}
3840

39-
public static Schema inferDocumentSchema(
40-
final BsonDocument document, final boolean optional, final String fieldName) {
41+
private static Schema inferDocumentSchema(final String fieldPath, final BsonDocument document) {
42+
return createSchemaBuilder(fieldPath, document).optional().build();
43+
}
44+
45+
private static SchemaBuilder createSchemaBuilder(
46+
final String fieldPath, final BsonDocument document) {
4147
SchemaBuilder builder = SchemaBuilder.struct();
48+
builder.name(fieldPath);
4249
if (document.containsKey(ID_FIELD)) {
43-
builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD), ID_FIELD));
50+
builder.field(ID_FIELD, inferSchema(ID_FIELD, document.get(ID_FIELD)));
4451
}
4552
document.entrySet().stream()
4653
.filter(kv -> !kv.getKey().equals(ID_FIELD))
4754
.sorted(Map.Entry.comparingByKey())
48-
.forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue(), kv.getKey())));
49-
builder.name(generateName(builder, fieldName));
50-
if (optional) {
51-
builder.optional();
52-
}
53-
return builder.build();
55+
.forEach(
56+
kv ->
57+
builder.field(
58+
kv.getKey(),
59+
inferSchema(createFieldPath(fieldPath, kv.getKey()), kv.getValue())));
60+
return builder;
5461
}
5562

56-
public static Schema inferSchema(final BsonValue bsonValue, final String fieldName) {
63+
private static Schema inferSchema(final String fieldPath, final BsonValue bsonValue) {
5764
switch (bsonValue.getBsonType()) {
5865
case BOOLEAN:
5966
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
@@ -71,17 +78,18 @@ public static Schema inferSchema(final BsonValue bsonValue, final String fieldNa
7178
case TIMESTAMP:
7279
return Timestamp.builder().optional().build();
7380
case DOCUMENT:
74-
return inferDocumentSchema(bsonValue.asDocument(), true, fieldName);
81+
return inferDocumentSchema(fieldPath, bsonValue.asDocument());
7582
case ARRAY:
7683
List<BsonValue> values = bsonValue.asArray().getValues();
7784
Schema firstItemSchema =
78-
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0), fieldName);
85+
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(fieldPath, values.get(0));
7986
if (values.isEmpty()
8087
|| values.stream()
81-
.anyMatch(bv -> !Objects.equals(inferSchema(bv, fieldName), firstItemSchema))) {
82-
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).optional().build();
88+
.anyMatch(bv -> !Objects.equals(inferSchema(fieldPath, bv), firstItemSchema))) {
89+
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).name(fieldPath).optional().build();
8390
}
84-
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0), fieldName))
91+
return SchemaBuilder.array(inferSchema(fieldPath, bsonValue.asArray().getValues().get(0)))
92+
.name(fieldPath)
8593
.optional()
8694
.build();
8795
case BINARY:
@@ -103,9 +111,12 @@ public static Schema inferSchema(final BsonValue bsonValue, final String fieldNa
103111
}
104112
}
105113

106-
public static String generateName(final SchemaBuilder builder, final String fieldName) {
107-
builder.build();
108-
return format(SCHEMA_NAME_TEMPLATE, fieldName);
114+
private static String createFieldPath(final String fieldPath, final String fieldName) {
115+
if (fieldPath.equals(DEFAULT_FIELD_NAME)) {
116+
return fieldName;
117+
} else {
118+
return fieldPath + "_" + fieldName;
119+
}
109120
}
110121

111122
private BsonDocumentToSchema() {}

src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java

Lines changed: 66 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_AVRO_VALUE_SCHEMA;
2121
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_KEY_SCHEMA;
2222
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_VALUE_SCHEMA;
23-
import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.generateName;
23+
import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.DEFAULT_FIELD_NAME;
2424
import static com.mongodb.kafka.connect.source.schema.SchemaUtils.assertSchemaAndValueEquals;
2525
import static java.lang.String.format;
2626
import static java.util.Arrays.asList;
@@ -155,60 +155,75 @@ void testAvroSchemaAndValueProducer() {
155155
@Test
156156
@DisplayName("test infer schema and value producer")
157157
void testInferSchemaAndValueProducer() {
158-
159-
String fieldName = "default";
160158
Schema expectedSchema =
161-
nameAndBuildSchema(
162-
SchemaBuilder.struct()
163-
.field(
164-
"arrayComplex",
165-
SchemaBuilder.array(
166-
nameAndBuildOptionalSchema(
167-
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA),
168-
"a"))
169-
.optional()
170-
.build())
171-
.field(
172-
"arrayComplexMixedTypes",
173-
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
174-
.field(
175-
"arrayEmpty",
176-
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
177-
.field(
178-
"arrayMixedTypes",
179-
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
180-
.field(
181-
"arraySimple",
182-
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build())
183-
.field("binary", Schema.OPTIONAL_BYTES_SCHEMA)
184-
.field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA)
185-
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
186-
.field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA)
187-
.field("dateTime", Timestamp.builder().optional().build())
188-
.field("decimal128", Decimal.builder(1).optional().build())
189-
.field(
190-
"document",
191-
nameAndBuildOptionalSchema(
192-
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA),
193-
"document"))
194-
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
195-
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
196-
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
197-
.field("maxKey", Schema.OPTIONAL_STRING_SCHEMA)
198-
.field("minKey", Schema.OPTIONAL_STRING_SCHEMA)
199-
.field("null", Schema.OPTIONAL_STRING_SCHEMA)
200-
.field("objectId", Schema.OPTIONAL_STRING_SCHEMA)
201-
.field("regex", Schema.OPTIONAL_STRING_SCHEMA)
202-
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
203-
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
204-
.field("timestamp", Timestamp.builder().optional().build())
205-
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA),
206-
fieldName);
159+
SchemaBuilder.struct()
160+
.name(DEFAULT_FIELD_NAME)
161+
.field(
162+
"arrayComplex",
163+
SchemaBuilder.array(
164+
SchemaBuilder.struct()
165+
.field("a", Schema.OPTIONAL_INT32_SCHEMA)
166+
.name("arrayComplex_a")
167+
.optional()
168+
.build())
169+
.optional()
170+
.name("arrayComplex")
171+
.build())
172+
.field(
173+
"arrayComplexMixedTypes",
174+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
175+
.optional()
176+
.name("arrayComplexMixedTypes")
177+
.build())
178+
.field(
179+
"arrayEmpty",
180+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
181+
.optional()
182+
.name("arrayEmpty")
183+
.build())
184+
.field(
185+
"arrayMixedTypes",
186+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
187+
.optional()
188+
.name("arrayMixedTypes")
189+
.build())
190+
.field(
191+
"arraySimple",
192+
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA)
193+
.optional()
194+
.name("arraySimple")
195+
.build())
196+
.field("binary", Schema.OPTIONAL_BYTES_SCHEMA)
197+
.field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA)
198+
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
199+
.field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA)
200+
.field("dateTime", Timestamp.builder().optional().build())
201+
.field("decimal128", Decimal.builder(1).optional().build())
202+
.field(
203+
"document",
204+
SchemaBuilder.struct()
205+
.field("a", Schema.OPTIONAL_INT32_SCHEMA)
206+
.name("document")
207+
.optional()
208+
.build())
209+
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
210+
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
211+
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
212+
.field("maxKey", Schema.OPTIONAL_STRING_SCHEMA)
213+
.field("minKey", Schema.OPTIONAL_STRING_SCHEMA)
214+
.field("null", Schema.OPTIONAL_STRING_SCHEMA)
215+
.field("objectId", Schema.OPTIONAL_STRING_SCHEMA)
216+
.field("regex", Schema.OPTIONAL_STRING_SCHEMA)
217+
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
218+
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
219+
.field("timestamp", Timestamp.builder().optional().build())
220+
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA)
221+
.build();
207222

208223
Schema arrayComplexValueSchema = expectedSchema.field("arrayComplex").schema().valueSchema();
209224
Schema documentSchema = expectedSchema.field("document").schema();
210225

211-
SchemaAndValue expectedValue =
226+
SchemaAndValue expectedSchemaAndValue =
212227
new SchemaAndValue(
213228
expectedSchema,
214229
new Struct(expectedSchema)
@@ -247,7 +262,7 @@ void testInferSchemaAndValueProducer() {
247262
new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS);
248263

249264
assertSchemaAndValueEquals(
250-
expectedValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
265+
expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
251266
}
252267

253268
@Test
@@ -340,14 +355,6 @@ static Struct generateExpectedValue(final boolean simplified) {
340355
};
341356
}
342357

343-
static Schema nameAndBuildSchema(final SchemaBuilder builder, final String fieldName) {
344-
return builder.name(generateName(builder, fieldName)).build();
345-
}
346-
347-
static Schema nameAndBuildOptionalSchema(final SchemaBuilder builder, final String fieldName) {
348-
return builder.name(generateName(builder, fieldName)).optional().build();
349-
}
350-
351358
static String getFullDocument(final boolean simplified) {
352359
return simplified ? SIMPLIFIED_FULL_DOCUMENT_JSON : FULL_DOCUMENT_JSON;
353360
}

src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.stream.Collectors;
2526

2627
import org.apache.kafka.connect.data.Field;
2728
import org.apache.kafka.connect.data.Schema;
@@ -57,6 +58,11 @@ private static Object convertData(final Object value) {
5758
// Doing equals on Struct just tests instance equals and not the actual values
5859
return getStructData((Struct) value);
5960
}
61+
62+
if (value instanceof List<?>) {
63+
// List values can contain Structs which need converting
64+
return getListData((List<?>) value);
65+
}
6066
return value;
6167
}
6268

@@ -74,6 +80,10 @@ private static Object getStructData(final Struct value) {
7480
return structValues;
7581
}
7682

83+
private static List<?> getListData(final List<?> value) {
84+
return value.stream().map(SchemaUtils::convertData).collect(Collectors.toList());
85+
}
86+
7787
public static void assertSchemaEquals(final Schema expected, final Schema actual) {
7888
assertEquals(
7989
expected.isOptional(), actual.isOptional(), "Optional value differs: " + actual.schema());

0 commit comments

Comments
 (0)