Skip to content

Commit 2971fbe

Browse files
rajdangwalrozza
authored andcommitted
Fix Mongo inferred names for schema records
1 parent 1d722d7 commit 2971fbe

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ final class InferSchemaAndValueProducer implements SchemaAndValueProducer {
3535
@Override
3636
public SchemaAndValue get(final BsonDocument changeStreamDocument) {
3737
return bsonValueToSchemaAndValue.toSchemaAndValue(
38-
inferDocumentSchema(changeStreamDocument, false), changeStreamDocument);
38+
inferDocumentSchema(changeStreamDocument, false, "default"), changeStreamDocument);
3939
}
4040
}

src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,24 @@ public final class BsonDocumentToSchema {
3636
private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.OPTIONAL_STRING_SCHEMA;
3737
public static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s";
3838

39-
public static Schema inferDocumentSchema(final BsonDocument document, final boolean optional) {
39+
public static Schema inferDocumentSchema(
40+
final BsonDocument document, final boolean optional, final String fieldName) {
4041
SchemaBuilder builder = SchemaBuilder.struct();
4142
if (document.containsKey(ID_FIELD)) {
42-
builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD)));
43+
builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD), ID_FIELD));
4344
}
4445
document.entrySet().stream()
4546
.filter(kv -> !kv.getKey().equals(ID_FIELD))
4647
.sorted(Map.Entry.comparingByKey())
47-
.forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue())));
48-
builder.name(generateName(builder));
48+
.forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue(), kv.getKey())));
49+
builder.name(generateName(builder, fieldName));
4950
if (optional) {
5051
builder.optional();
5152
}
5253
return builder.build();
5354
}
5455

55-
public static Schema inferSchema(final BsonValue bsonValue) {
56+
public static Schema inferSchema(final BsonValue bsonValue, final String fieldName) {
5657
switch (bsonValue.getBsonType()) {
5758
case BOOLEAN:
5859
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
@@ -70,16 +71,17 @@ public static Schema inferSchema(final BsonValue bsonValue) {
7071
case TIMESTAMP:
7172
return Timestamp.builder().optional().build();
7273
case DOCUMENT:
73-
return inferDocumentSchema(bsonValue.asDocument(), true);
74+
return inferDocumentSchema(bsonValue.asDocument(), true, fieldName);
7475
case ARRAY:
7576
List<BsonValue> values = bsonValue.asArray().getValues();
7677
Schema firstItemSchema =
77-
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0));
78+
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0), fieldName);
7879
if (values.isEmpty()
79-
|| values.stream().anyMatch(bv -> !Objects.equals(inferSchema(bv), firstItemSchema))) {
80+
|| values.stream()
81+
.anyMatch(bv -> !Objects.equals(inferSchema(bv, fieldName), firstItemSchema))) {
8082
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).optional().build();
8183
}
82-
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0)))
84+
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0), fieldName))
8385
.optional()
8486
.build();
8587
case BINARY:
@@ -101,8 +103,9 @@ public static Schema inferSchema(final BsonValue bsonValue) {
101103
}
102104
}
103105

104-
public static String generateName(final SchemaBuilder builder) {
105-
return format(SCHEMA_NAME_TEMPLATE, Objects.hashCode(builder.build())).replace("-", "_");
106+
public static String generateName(final SchemaBuilder builder, final String fieldName) {
107+
builder.build();
108+
return format(SCHEMA_NAME_TEMPLATE, fieldName);
106109
}
107110

108111
private BsonDocumentToSchema() {}

src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,16 @@ void testAvroSchemaAndValueProducer() {
156156
@DisplayName("test infer schema and value producer")
157157
void testInferSchemaAndValueProducer() {
158158

159+
String fieldName = "default";
159160
Schema expectedSchema =
160161
nameAndBuildSchema(
161162
SchemaBuilder.struct()
162163
.field(
163164
"arrayComplex",
164165
SchemaBuilder.array(
165166
nameAndBuildOptionalSchema(
166-
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
167+
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA),
168+
"a"))
167169
.optional()
168170
.build())
169171
.field(
@@ -187,7 +189,8 @@ void testInferSchemaAndValueProducer() {
187189
.field(
188190
"document",
189191
nameAndBuildOptionalSchema(
190-
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
192+
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA),
193+
"document"))
191194
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
192195
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
193196
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
@@ -199,7 +202,8 @@ void testInferSchemaAndValueProducer() {
199202
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
200203
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
201204
.field("timestamp", Timestamp.builder().optional().build())
202-
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA));
205+
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA),
206+
fieldName);
203207

204208
Schema arrayComplexValueSchema = expectedSchema.field("arrayComplex").schema().valueSchema();
205209
Schema documentSchema = expectedSchema.field("document").schema();
@@ -336,12 +340,12 @@ static Struct generateExpectedValue(final boolean simplified) {
336340
};
337341
}
338342

339-
static Schema nameAndBuildSchema(final SchemaBuilder builder) {
340-
return builder.name(generateName(builder)).build();
343+
static Schema nameAndBuildSchema(final SchemaBuilder builder, final String fieldName) {
344+
return builder.name(generateName(builder, fieldName)).build();
341345
}
342346

343-
static Schema nameAndBuildOptionalSchema(final SchemaBuilder builder) {
344-
return builder.name(generateName(builder)).optional().build();
347+
static Schema nameAndBuildOptionalSchema(final SchemaBuilder builder, final String fieldName) {
348+
return builder.name(generateName(builder, fieldName)).optional().build();
345349
}
346350

347351
static String getFullDocument(final boolean simplified) {

0 commit comments

Comments
 (0)