Skip to content

Commit a78c23e

Browse files
committed
Improve infer schema usability
All fields are now optional Field order is now naturally sorted by key KAFKA-163
1 parent 7c3d1ec commit a78c23e

File tree

4 files changed

+121
-75
lines changed

4 files changed

+121
-75
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/FullDocumentRoundTripIntegrationTest.java

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COLLECTION_CONFIG;
1919
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_CONFIG;
2020
import static com.mongodb.kafka.connect.source.MongoSourceConfig.DATABASE_CONFIG;
21+
import static com.mongodb.kafka.connect.source.MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG;
22+
import static com.mongodb.kafka.connect.source.MongoSourceConfig.ERRORS_TOLERANCE_CONFIG;
2123
import static com.mongodb.kafka.connect.source.MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG;
2224
import static com.mongodb.kafka.connect.source.MongoSourceConfig.OUTPUT_JSON_FORMATTER_CONFIG;
2325
import static com.mongodb.kafka.connect.source.MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG;
@@ -26,27 +28,31 @@
2628
import static com.mongodb.kafka.connect.source.MongoSourceConfig.TOPIC_PREFIX_CONFIG;
2729
import static java.lang.String.format;
2830
import static java.util.stream.Collectors.toList;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
2932
import static org.junit.jupiter.api.Assumptions.assumeTrue;
3033

3134
import java.util.ArrayList;
3235
import java.util.List;
3336
import java.util.Properties;
3437
import java.util.stream.IntStream;
38+
import java.util.stream.Stream;
3539

3640
import org.apache.kafka.connect.converters.ByteArrayConverter;
3741
import org.apache.kafka.connect.storage.StringConverter;
42+
import org.apache.log4j.Logger;
3843
import org.junit.jupiter.api.AfterEach;
3944
import org.junit.jupiter.api.BeforeEach;
4045
import org.junit.jupiter.api.DisplayName;
4146
import org.junit.jupiter.api.Test;
4247

4348
import org.bson.BsonDocument;
44-
import org.bson.BsonString;
4549

4650
import com.mongodb.client.MongoCollection;
4751
import com.mongodb.client.MongoDatabase;
4852

53+
import com.mongodb.kafka.connect.log.LogCapture;
4954
import com.mongodb.kafka.connect.mongodb.MongoKafkaTestCase;
55+
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
5056
import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
5157

5258
public class FullDocumentRoundTripIntegrationTest extends MongoKafkaTestCase {
@@ -196,36 +202,54 @@ void testRoundTripSchema() {
196202
@Test
197203
@DisplayName("Ensure collection round trip inferring schema value")
198204
void testRoundTripInferSchemaValue() {
199-
Properties sourceProperties = new Properties();
200-
sourceProperties.put(
201-
OUTPUT_JSON_FORMATTER_CONFIG,
202-
"com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson");
203-
sourceProperties.put(OUTPUT_FORMAT_VALUE_CONFIG, OutputFormat.SCHEMA.name());
204-
sourceProperties.put(OUTPUT_SCHEMA_INFER_VALUE_CONFIG, "true");
205-
sourceProperties.put(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG, "true");
206-
sourceProperties.put("value.converter", "io.confluent.connect.avro.AvroConverter");
207-
sourceProperties.put("value.converter.schema.registry.url", KAFKA.schemaRegistryUrl());
208-
209-
Properties sinkProperties = new Properties();
210-
sinkProperties.put("value.converter", "io.confluent.connect.avro.AvroConverter");
211-
sinkProperties.put("value.converter.schema.registry.url", KAFKA.schemaRegistryUrl());
212-
213-
assertRoundTrip(
214-
IntStream.range(1, 100)
215-
.mapToObj(i -> BsonDocument.parse(format(FULL_DOCUMENT_JSON, i)))
216-
.collect(toList()),
217-
IntStream.range(1, 100)
218-
.mapToObj(
219-
i -> {
220-
BsonDocument doc = BsonDocument.parse(format(FULL_DOCUMENT_JSON, i));
221-
doc.put(
222-
"myObjectId",
223-
new BsonString(doc.getObjectId("myObjectId").getValue().toHexString()));
224-
return doc;
225-
})
226-
.collect(toList()),
227-
sourceProperties,
228-
sinkProperties);
205+
try (LogCapture logCapture =
206+
new LogCapture(
207+
Logger.getLogger("io.confluent.rest.exceptions.DebuggableExceptionMapper"))) {
208+
Properties sourceProperties = new Properties();
209+
sourceProperties.put(
210+
OUTPUT_JSON_FORMATTER_CONFIG,
211+
"com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson");
212+
sourceProperties.put(OUTPUT_FORMAT_VALUE_CONFIG, OutputFormat.SCHEMA.name());
213+
sourceProperties.put(OUTPUT_SCHEMA_INFER_VALUE_CONFIG, "true");
214+
sourceProperties.put(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG, "true");
215+
sourceProperties.put("value.converter", "io.confluent.connect.avro.AvroConverter");
216+
sourceProperties.put("value.converter.schema.registry.url", KAFKA.schemaRegistryUrl());
217+
sourceProperties.put(ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
218+
sourceProperties.put(ERRORS_LOG_ENABLE_CONFIG, "true");
219+
220+
Properties sinkProperties = new Properties();
221+
sinkProperties.put("value.converter", "io.confluent.connect.avro.AvroConverter");
222+
sinkProperties.put("value.converter.schema.registry.url", KAFKA.schemaRegistryUrl());
223+
224+
List<BsonDocument> originals =
225+
Stream.of(
226+
"{_id: 1, a: 1, b: 1}",
227+
"{b: 1, _id: 2, a: 1}", // Different field order
228+
"{_id: 3, b: 1, c: 1, d: 1}", // Missing a field and added two new fields
229+
"{_id: 4, E: 1, f: 1, g: 1, h: {h1: 2, h2: '2'}}", // All new fields
230+
"{_id: 5, h: {h2: '3', h1: 2, h4: [1]}}", // Nested field order
231+
"{_id: 10, h: ['1']}", // Invalid schema ignored due to errors.tolerance
232+
"{_id: 6, g: 3, a: 2, h: {h1: 2, h2: '2'}}" // Different field order
233+
)
234+
.map(BsonDocument::parse)
235+
.collect(toList());
236+
237+
List<BsonDocument> expected =
238+
originals.stream().filter(d -> d.getInt32("_id").getValue() != 10).collect(toList());
239+
240+
assertRoundTrip(originals, expected, sourceProperties, sinkProperties);
241+
242+
assertTrue(
243+
logCapture.getEvents().stream()
244+
.filter(e -> e.getThrowableInformation() != null)
245+
.anyMatch(
246+
e ->
247+
e.getThrowableInformation()
248+
.getThrowable()
249+
.getMessage()
250+
.equals(
251+
"Schema being registered is incompatible with an earlier schema")));
252+
}
229253
}
230254

231255
void assertRoundTrip(final List<BsonDocument> originals) {

src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,54 +19,67 @@
1919
import static java.lang.String.format;
2020

2121
import java.util.List;
22+
import java.util.Map;
2223
import java.util.Objects;
2324

2425
import org.apache.kafka.connect.data.Decimal;
2526
import org.apache.kafka.connect.data.Schema;
2627
import org.apache.kafka.connect.data.SchemaBuilder;
2728
import org.apache.kafka.connect.data.Timestamp;
2829

30+
import org.bson.BsonDocument;
2931
import org.bson.BsonValue;
3032

3133
public final class BsonDocumentToSchema {
3234

33-
private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.STRING_SCHEMA;
35+
private static final String ID_FIELD = "_id";
36+
private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.OPTIONAL_STRING_SCHEMA;
3437
public static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s";
3538

3639
public static Schema inferSchema(final BsonValue bsonValue) {
3740
switch (bsonValue.getBsonType()) {
3841
case BOOLEAN:
39-
return Schema.BOOLEAN_SCHEMA;
42+
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
4043
case INT32:
41-
return Schema.INT32_SCHEMA;
44+
return Schema.OPTIONAL_INT32_SCHEMA;
4245
case INT64:
43-
return Schema.INT64_SCHEMA;
46+
return Schema.OPTIONAL_INT64_SCHEMA;
4447
case DOUBLE:
45-
return Schema.FLOAT64_SCHEMA;
48+
return Schema.OPTIONAL_FLOAT64_SCHEMA;
4649
case DECIMAL128:
47-
return Decimal.schema(bsonValue.asDecimal128().getValue().bigDecimalValue().scale());
50+
return Decimal.builder(bsonValue.asDecimal128().getValue().bigDecimalValue().scale())
51+
.optional()
52+
.build();
4853
case DATE_TIME:
4954
case TIMESTAMP:
50-
return Timestamp.SCHEMA;
55+
return Timestamp.builder().optional().build();
5156
case DOCUMENT:
5257
SchemaBuilder builder = SchemaBuilder.struct();
53-
bsonValue.asDocument().forEach((k, v) -> builder.field(k, inferSchema(v)));
58+
BsonDocument document = bsonValue.asDocument();
59+
if (document.containsKey(ID_FIELD)) {
60+
builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD)));
61+
}
62+
document.entrySet().stream()
63+
.filter(kv -> !kv.getKey().equals(ID_FIELD))
64+
.sorted(Map.Entry.comparingByKey())
65+
.forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue())));
5466
builder.name(generateName(builder));
55-
return builder.build();
67+
return builder.optional().build();
5668
case ARRAY:
5769
List<BsonValue> values = bsonValue.asArray().getValues();
5870
Schema firstItemSchema =
5971
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0));
6072
if (values.isEmpty()
6173
|| values.stream().anyMatch(bv -> !Objects.equals(inferSchema(bv), firstItemSchema))) {
62-
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).build();
74+
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).optional().build();
6375
}
64-
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0))).build();
76+
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0)))
77+
.optional()
78+
.build();
6579
case BINARY:
66-
return Schema.BYTES_SCHEMA;
80+
return Schema.OPTIONAL_BYTES_SCHEMA;
6781
case SYMBOL:
6882
case STRING:
69-
return Schema.STRING_SCHEMA;
7083
case NULL:
7184
return Schema.OPTIONAL_STRING_SCHEMA;
7285
case OBJECT_ID:

src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -159,41 +159,49 @@ void testInferSchemaAndValueProducer() {
159159
Schema expectedSchema =
160160
nameAndBuildSchema(
161161
SchemaBuilder.struct()
162-
.field("arrayEmpty", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
163-
.field("arraySimple", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
164162
.field(
165163
"arrayComplex",
166164
SchemaBuilder.array(
167165
nameAndBuildSchema(
168-
SchemaBuilder.struct().field("a", Schema.INT32_SCHEMA)))
166+
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
167+
.optional()
169168
.build())
170-
.field("arrayMixedTypes", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
171-
.field("arrayComplexMixedTypes", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
172-
.field("binary", Schema.BYTES_SCHEMA)
173-
.field("boolean", Schema.BOOLEAN_SCHEMA)
174-
.field("code", Schema.STRING_SCHEMA)
175-
.field("codeWithScope", Schema.STRING_SCHEMA)
176-
.field("dateTime", Timestamp.SCHEMA)
177-
.field("decimal128", Decimal.schema(1))
169+
.field(
170+
"arrayComplexMixedTypes",
171+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
172+
.field(
173+
"arrayEmpty",
174+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
175+
.field(
176+
"arrayMixedTypes",
177+
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
178+
.field(
179+
"arraySimple",
180+
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build())
181+
.field("binary", Schema.OPTIONAL_BYTES_SCHEMA)
182+
.field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA)
183+
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
184+
.field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA)
185+
.field("dateTime", Timestamp.builder().optional().build())
186+
.field("decimal128", Decimal.builder(1).optional().build())
178187
.field(
179188
"document",
180-
nameAndBuildSchema(SchemaBuilder.struct().field("a", Schema.INT32_SCHEMA)))
181-
.field("double", Schema.FLOAT64_SCHEMA)
182-
.field("int32", Schema.INT32_SCHEMA)
183-
.field("int64", Schema.INT64_SCHEMA)
184-
.field("maxKey", Schema.STRING_SCHEMA)
185-
.field("minKey", Schema.STRING_SCHEMA)
189+
nameAndBuildSchema(
190+
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
191+
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
192+
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
193+
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
194+
.field("maxKey", Schema.OPTIONAL_STRING_SCHEMA)
195+
.field("minKey", Schema.OPTIONAL_STRING_SCHEMA)
186196
.field("null", Schema.OPTIONAL_STRING_SCHEMA)
187-
.field("objectId", Schema.STRING_SCHEMA)
188-
.field("regex", Schema.STRING_SCHEMA)
189-
.field("string", Schema.STRING_SCHEMA)
190-
.field("symbol", Schema.STRING_SCHEMA)
191-
.field("timestamp", Timestamp.SCHEMA)
192-
.field("undefined", Schema.STRING_SCHEMA));
197+
.field("objectId", Schema.OPTIONAL_STRING_SCHEMA)
198+
.field("regex", Schema.OPTIONAL_STRING_SCHEMA)
199+
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
200+
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
201+
.field("timestamp", Timestamp.builder().optional().build())
202+
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA));
193203

194204
Schema arrayComplexValueSchema = expectedSchema.field("arrayComplex").schema().valueSchema();
195-
Schema arrayComplexMixedTypesValueSchema =
196-
expectedSchema.field("arrayComplexMixedTypes").schema().valueSchema();
197205
Schema documentSchema = expectedSchema.field("document").schema();
198206

199207
SchemaAndValue expectedValue =
@@ -305,7 +313,7 @@ static Struct generateExpectedValue(final boolean simplified) {
305313
}
306314

307315
static Schema nameAndBuildSchema(final SchemaBuilder builder) {
308-
return builder.name(generateName(builder)).build();
316+
return builder.name(generateName(builder)).optional().build();
309317
}
310318

311319
static String getFullDocument(final boolean simplified) {

src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ private static Object getStructData(final Struct value) {
7575
}
7676

7777
public static void assertSchemaEquals(final Schema expected, final Schema actual) {
78-
assertEquals(expected.isOptional(), actual.isOptional(), "Optional value differs");
79-
assertEquals(expected.version(), actual.version(), "version differs");
80-
assertEquals(expected.name(), actual.name(), "name differs");
78+
assertEquals(
79+
expected.isOptional(), actual.isOptional(), "Optional value differs: " + actual.schema());
80+
assertEquals(expected.version(), actual.version(), "version differs: " + actual.schema());
8181

82-
assertEquals(expected.doc(), actual.doc(), "docs differ");
83-
assertEquals(expected.type(), actual.type(), "type differs");
82+
assertEquals(expected.doc(), actual.doc(), "docs differ: " + actual.schema());
83+
assertEquals(expected.type(), actual.type(), "type differs: " + actual.schema());
8484

8585
Object expectedDefaultValue = expected.defaultValue();
8686
Object actualDefaultValue = actual.defaultValue();
@@ -96,6 +96,7 @@ public static void assertSchemaEquals(final Schema expected, final Schema actual
9696
assertStructSchemaEquals(expected, actual);
9797
}
9898
assertEquals(expected.parameters(), actual.parameters(), "parameters differs");
99+
assertEquals(expected.name(), actual.name(), "name differs: " + actual.schema());
99100
}
100101

101102
static void assertStructSchemaEquals(final Schema expected, final Schema actual) {

0 commit comments

Comments
 (0)