Skip to content

Commit 08f1a01

Browse files
authored
Add support for handling empty schemas and records (#237)
Sometimes record types contain fields of type that do not have any fields defined (null objects, marker classes etc.). Converters must omit them since it's impossible to map empty structures into BigQuery table schema and rows.
1 parent 3da2ae2 commit 08f1a01

File tree

4 files changed

+175
-56
lines changed

4 files changed

+175
-56
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,17 @@ private Map<String, Object> convertStruct(Object kafkaConnectObject, Schema kafk
178178
List<Field> kafkaConnectSchemaFields = kafkaConnectSchema.fields();
179179
Struct kafkaConnectStruct = (Struct) kafkaConnectObject;
180180
for (Field kafkaConnectField : kafkaConnectSchemaFields) {
181-
Object bigQueryObject = convertObject(
182-
kafkaConnectStruct.get(kafkaConnectField.name()),
183-
kafkaConnectField.schema()
184-
);
185-
if (bigQueryObject != null) {
186-
bigQueryRecord.put(kafkaConnectField.name(), bigQueryObject);
181+
// ignore empty structures
182+
boolean isEmptyStruct = kafkaConnectField.schema().type() == Schema.Type.STRUCT &&
183+
kafkaConnectField.schema().fields().isEmpty();
184+
if (!isEmptyStruct) {
185+
Object bigQueryObject = convertObject(
186+
kafkaConnectStruct.get(kafkaConnectField.name()),
187+
kafkaConnectField.schema()
188+
);
189+
if (bigQueryObject != null) {
190+
bigQueryRecord.put(kafkaConnectField.name(), bigQueryObject);
191+
}
187192
}
188193
}
189194
return bigQueryRecord;

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverter.java

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import com.wepay.kafka.connect.bigquery.convert.logicaltype.LogicalTypeConverter;
2828
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
2929

30-
import org.apache.kafka.connect.data.Field;
3130
import org.apache.kafka.connect.data.Schema;
3231

33-
import java.util.ArrayList;
3432
import java.util.HashMap;
35-
import java.util.LinkedList;
3633
import java.util.List;
3734
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
3838

3939
/**
4040
* Class for converting from {@link Schema Kafka Connect Schemas} to
@@ -102,25 +102,26 @@ public com.google.cloud.bigquery.Schema convertSchema(Schema kafkaConnectSchema)
102102
ConversionConnectException("Top-level Kafka Connect schema must be of type 'struct'");
103103
}
104104

105-
List<com.google.cloud.bigquery.Field> fields = new LinkedList<>();
106-
107-
for (Field kafkaConnectField : kafkaConnectSchema.fields()) {
108-
com.google.cloud.bigquery.Field bigQuerySchemaField =
109-
convertField(kafkaConnectField.schema(), kafkaConnectField.name()).build();
110-
fields.add(bigQuerySchemaField);
111-
}
105+
List<com.google.cloud.bigquery.Field> fields = kafkaConnectSchema.fields().stream()
106+
.flatMap(kafkaConnectField ->
107+
convertField(kafkaConnectField.schema(), kafkaConnectField.name())
108+
.map(Stream::of)
109+
.orElse(Stream.empty())
110+
)
111+
.map(com.google.cloud.bigquery.Field.Builder::build)
112+
.collect(Collectors.toList());
112113

113114
return com.google.cloud.bigquery.Schema.of(fields);
114115
}
115116

116-
private com.google.cloud.bigquery.Field.Builder convertField(Schema kafkaConnectSchema,
117-
String fieldName) {
118-
com.google.cloud.bigquery.Field.Builder result;
117+
private Optional<com.google.cloud.bigquery.Field.Builder> convertField(Schema kafkaConnectSchema,
118+
String fieldName) {
119+
Optional<com.google.cloud.bigquery.Field.Builder> result;
119120
Schema.Type kafkaConnectSchemaType = kafkaConnectSchema.type();
120121
if (LogicalConverterRegistry.isRegisteredLogicalType(kafkaConnectSchema.name())) {
121-
result = convertLogical(kafkaConnectSchema, fieldName);
122+
result = Optional.of(convertLogical(kafkaConnectSchema, fieldName));
122123
} else if (PRIMITIVE_TYPE_MAP.containsKey(kafkaConnectSchemaType)) {
123-
result = convertPrimitive(kafkaConnectSchema, fieldName);
124+
result = Optional.of(convertPrimitive(kafkaConnectSchema, fieldName));
124125
} else {
125126
switch (kafkaConnectSchemaType) {
126127
case STRUCT:
@@ -138,11 +139,13 @@ private com.google.cloud.bigquery.Field.Builder convertField(Schema kafkaConnect
138139
);
139140
}
140141
}
141-
setNullability(kafkaConnectSchema, result);
142-
if (kafkaConnectSchema.doc() != null) {
143-
result.setDescription(kafkaConnectSchema.doc());
144-
}
145-
return result;
142+
return result.map(res -> {
143+
setNullability(kafkaConnectSchema, res);
144+
if (kafkaConnectSchema.doc() != null) {
145+
res.setDescription(kafkaConnectSchema.doc());
146+
}
147+
return res;
148+
});
146149
}
147150

148151
private void setNullability(Schema kafkaConnectSchema,
@@ -160,48 +163,54 @@ private void setNullability(Schema kafkaConnectSchema,
160163
}
161164
}
162165

163-
private com.google.cloud.bigquery.Field.Builder convertStruct(Schema kafkaConnectSchema,
164-
String fieldName) {
165-
List<com.google.cloud.bigquery.Field> bigQueryRecordFields = new ArrayList<>();
166-
167-
for (Field kafkaConnectField : kafkaConnectSchema.fields()) {
168-
com.google.cloud.bigquery.Field.Builder bigQueryRecordFieldBuilder =
169-
convertField(kafkaConnectField.schema(), kafkaConnectField.name());
170-
bigQueryRecordFields.add(bigQueryRecordFieldBuilder.build());
166+
private Optional<com.google.cloud.bigquery.Field.Builder> convertStruct(Schema kafkaConnectSchema,
167+
String fieldName) {
168+
List<com.google.cloud.bigquery.Field> bigQueryRecordFields = kafkaConnectSchema.fields()
169+
.stream()
170+
.flatMap(kafkaConnectField ->
171+
convertField(kafkaConnectField.schema(), kafkaConnectField.name())
172+
.map(Stream::of)
173+
.orElse(Stream.empty())
174+
)
175+
.map(com.google.cloud.bigquery.Field.Builder::build)
176+
.collect(Collectors.toList());
177+
if (bigQueryRecordFields.isEmpty()) {
178+
return Optional.empty();
171179
}
172180

173181
FieldList fieldList = FieldList.of(bigQueryRecordFields);
174182

175-
return com.google.cloud.bigquery.Field.newBuilder(fieldName,
176-
LegacySQLTypeName.RECORD,
177-
fieldList);
183+
return Optional.of(com.google.cloud.bigquery.Field.newBuilder(fieldName,
184+
LegacySQLTypeName.RECORD,
185+
fieldList));
178186
}
179187

180-
private com.google.cloud.bigquery.Field.Builder convertArray(Schema kafkaConnectSchema,
181-
String fieldName) {
188+
private Optional<com.google.cloud.bigquery.Field.Builder> convertArray(Schema kafkaConnectSchema,
189+
String fieldName) {
182190
Schema elementSchema = kafkaConnectSchema.valueSchema();
183-
com.google.cloud.bigquery.Field.Builder elementFieldBuilder =
184-
convertField(elementSchema, fieldName);
185-
return elementFieldBuilder.setMode(com.google.cloud.bigquery.Field.Mode.REPEATED);
191+
return convertField(elementSchema, fieldName)
192+
.map(builder -> builder.setMode(com.google.cloud.bigquery.Field.Mode.REPEATED));
186193
}
187194

188-
private com.google.cloud.bigquery.Field.Builder convertMap(Schema kafkaConnectSchema,
189-
String fieldName) {
195+
private Optional<com.google.cloud.bigquery.Field.Builder> convertMap(Schema kafkaConnectSchema,
196+
String fieldName) {
190197
Schema keySchema = kafkaConnectSchema.keySchema();
191198
Schema valueSchema = kafkaConnectSchema.valueSchema();
192199

193-
com.google.cloud.bigquery.Field keyField =
194-
convertField(keySchema, MAP_KEY_FIELD_NAME).build();
195-
com.google.cloud.bigquery.Field valueField =
196-
convertField(valueSchema, MAP_VALUE_FIELD_NAME).build();
197-
198-
com.google.cloud.bigquery.Field.Builder bigQueryRecordBuilder =
199-
com.google.cloud.bigquery.Field.newBuilder(fieldName,
200-
LegacySQLTypeName.RECORD,
201-
keyField,
202-
valueField);
203-
204-
return bigQueryRecordBuilder.setMode(com.google.cloud.bigquery.Field.Mode.REPEATED);
200+
Optional<com.google.cloud.bigquery.Field> maybeKeyField = convertField(keySchema, MAP_KEY_FIELD_NAME)
201+
.map(com.google.cloud.bigquery.Field.Builder::build);
202+
Optional<com.google.cloud.bigquery.Field> maybeValueField = convertField(valueSchema, MAP_VALUE_FIELD_NAME)
203+
.map(com.google.cloud.bigquery.Field.Builder::build);
204+
205+
return maybeKeyField.flatMap(keyField ->
206+
maybeValueField.map(valueField ->
207+
com.google.cloud.bigquery.Field.newBuilder(fieldName,
208+
LegacySQLTypeName.RECORD,
209+
keyField,
210+
valueField)
211+
.setMode(com.google.cloud.bigquery.Field.Mode.REPEATED)
212+
)
213+
);
205214
}
206215

207216
private com.google.cloud.bigquery.Field.Builder convertPrimitive(Schema kafkaConnectSchema,

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,56 @@ public void testStruct() {
313313
assertEquals(bigQueryExpectedOuterRecord, bigQueryTestOuterRecord);
314314
}
315315

316+
@Test
317+
public void testEmptyStruct() {
318+
Schema kafkaConnectInnerSchema = SchemaBuilder
319+
.struct()
320+
.build();
321+
322+
Struct kafkaConnectInnerStruct = new Struct(kafkaConnectInnerSchema);
323+
324+
SinkRecord kafkaConnectSinkRecord =
325+
spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, false);
326+
Map<String, Object> bigQueryTestInnerRecord =
327+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
328+
.convertRecord(kafkaConnectSinkRecord, KafkaSchemaRecordType.VALUE);
329+
assertEquals(new HashMap<String, Object>(), bigQueryTestInnerRecord);
330+
}
331+
332+
@Test
333+
public void testEmptyInnerStruct() {
334+
final String innerFieldStructName = "InnerStruct";
335+
final String innerFieldStringName = "InnerString";
336+
final String innerStringValue = "forty two";
337+
338+
Schema kafkaConnectInnerSchema = SchemaBuilder
339+
.struct()
340+
.build();
341+
342+
Struct kafkaConnectInnerStruct = new Struct(kafkaConnectInnerSchema);
343+
344+
Schema kafkaConnectOuterSchema = SchemaBuilder
345+
.struct()
346+
.field(innerFieldStructName, kafkaConnectInnerSchema)
347+
.field(innerFieldStringName, Schema.STRING_SCHEMA)
348+
.build();
349+
350+
Struct kafkaConnectOuterStruct = new Struct(kafkaConnectOuterSchema);
351+
kafkaConnectOuterStruct.put(innerFieldStructName, kafkaConnectInnerStruct);
352+
kafkaConnectOuterStruct.put(innerFieldStringName, innerStringValue);
353+
354+
Map<String, Object> bigQueryExpectedOuterRecord = new HashMap<>();
355+
bigQueryExpectedOuterRecord.put(innerFieldStringName, innerStringValue);
356+
357+
SinkRecord kafkaConnectOuterSinkRecord =
358+
spoofSinkRecord(kafkaConnectOuterSchema, kafkaConnectOuterStruct, false);
359+
Map<String, Object> bigQueryTestOuterRecord =
360+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
361+
.convertRecord(kafkaConnectOuterSinkRecord, KafkaSchemaRecordType.VALUE);
362+
363+
assertEquals(bigQueryExpectedOuterRecord, bigQueryTestOuterRecord);
364+
}
365+
316366
@Test
317367
public void testMap() {
318368
final String fieldName = "StringIntegerMap";

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverterTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,61 @@ public void testStruct() { // Struct in a struct in a struct (wrapped in a struc
275275
assertEquals(bigQueryExpectedOuterSchema, bigQueryTestOuterSchema);
276276
}
277277

278+
@Test
279+
public void testEmptyStruct() { // Empty struct
280+
com.google.cloud.bigquery.Schema bigQueryTestOuterSchema =
281+
new BigQuerySchemaConverter(false).convertSchema(
282+
SchemaBuilder
283+
.struct()
284+
.build()
285+
);
286+
assertEquals(com.google.cloud.bigquery.Schema.of(), bigQueryTestOuterSchema);
287+
}
288+
289+
@Test
290+
public void testEmptyInnerStruct() { // Empty nested struct
291+
final String outerFieldStructName = "OuterStruct";
292+
final String innerFieldStructName = "InnerStruct";
293+
final String innerFieldStringName = "InnerString";
294+
295+
Schema kafkaConnectInnerSchema = SchemaBuilder
296+
.struct()
297+
.build();
298+
299+
com.google.cloud.bigquery.Field bigQueryInnerString = com.google.cloud.bigquery.Field.newBuilder(
300+
innerFieldStringName,
301+
LegacySQLTypeName.STRING
302+
).setMode(
303+
com.google.cloud.bigquery.Field.Mode.REQUIRED
304+
).build();
305+
306+
com.google.cloud.bigquery.Field bigQueryOuterRecord =
307+
com.google.cloud.bigquery.Field.newBuilder(
308+
outerFieldStructName,
309+
LegacySQLTypeName.RECORD,
310+
bigQueryInnerString
311+
).setMode(
312+
com.google.cloud.bigquery.Field.Mode.REQUIRED
313+
).build();
314+
315+
Schema kafkaConnectOuterSchema = SchemaBuilder
316+
.struct()
317+
.field(innerFieldStructName, kafkaConnectInnerSchema)
318+
.field(innerFieldStringName, Schema.STRING_SCHEMA)
319+
.build();
320+
321+
com.google.cloud.bigquery.Schema bigQueryExpectedOuterSchema =
322+
com.google.cloud.bigquery.Schema.of(bigQueryOuterRecord);
323+
com.google.cloud.bigquery.Schema bigQueryTestOuterSchema =
324+
new BigQuerySchemaConverter(false).convertSchema(
325+
SchemaBuilder
326+
.struct()
327+
.field(outerFieldStructName, kafkaConnectOuterSchema)
328+
.build()
329+
);
330+
assertEquals(bigQueryExpectedOuterSchema, bigQueryTestOuterSchema);
331+
}
332+
278333
@Test
279334
public void testMap() {
280335
final String fieldName = "StringIntegerMap";

0 commit comments

Comments
 (0)