Skip to content

Commit e0c08f7

Browse files
authored
[hotfix][kafka] Fix Debezium schema generation for complex types (ARRAY, MAP, ROW) (#4240)
1 parent ea30fd6 commit e0c08f7

File tree

4 files changed

+277
-13
lines changed

4 files changed

+277
-13
lines changed

docs/content/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,36 +285,42 @@ Data Type Mapping
285285
<td>TINYINT</td>
286286
<td>INT16</td>
287287
<td></td>
288+
<td></td>
288289
</tr>
289290
<tr>
290291
<td>SMALLINT</td>
291292
<td>SMALLINT</td>
292293
<td>INT16</td>
293294
<td></td>
295+
<td></td>
294296
</tr>
295297
<tr>
296298
<td>INT</td>
297299
<td>INT</td>
298300
<td>INT32</td>
299301
<td></td>
302+
<td></td>
300303
</tr>
301304
<tr>
302305
<td>BIGINT</td>
303306
<td>BIGINT</td>
304307
<td>INT64</td>
305308
<td></td>
309+
<td></td>
306310
</tr>
307311
<tr>
308312
<td>FLOAT</td>
309313
<td>FLOAT</td>
310314
<td>FLOAT</td>
311315
<td></td>
316+
<td></td>
312317
</tr>
313318
<tr>
314319
<td>DOUBLE</td>
315320
<td>DOUBLE</td>
316321
<td>DOUBLE</td>
317322
<td></td>
323+
<td></td>
318324
</tr>
319325
<tr>
320326
<td>DECIMAL(p, s)</td>
@@ -328,6 +334,7 @@ Data Type Mapping
328334
<td>BOOLEAN</td>
329335
<td>BOOLEAN</td>
330336
<td></td>
337+
<td></td>
331338
</tr>
332339
<tr>
333340
<td>DATE</td>
@@ -355,12 +362,35 @@ Data Type Mapping
355362
<td>CHAR(n)</td>
356363
<td>STRING</td>
357364
<td></td>
365+
<td></td>
358366
</tr>
359367
<tr>
360368
<td>VARCHAR(n)</td>
361369
<td>VARCHAR(n)</td>
362370
<td>STRING</td>
363371
<td></td>
372+
<td></td>
373+
</tr>
374+
<tr>
375+
<td>ARRAY</td>
376+
<td>ARRAY</td>
377+
<td>ARRAY</td>
378+
<td></td>
379+
<td>Serialized as JSON array. Element types are recursively converted according to this table.</td>
380+
</tr>
381+
<tr>
382+
<td>MAP</td>
383+
<td>MAP</td>
384+
<td>MAP</td>
385+
<td></td>
386+
<td>Serialized as JSON object. Key and value types are recursively converted according to this table.</td>
387+
</tr>
388+
<tr>
389+
<td>ROW</td>
390+
<td>ROW</td>
391+
<td>STRUCT</td>
392+
<td></td>
393+
<td>Serialized as JSON object. Field types are recursively converted according to this table.</td>
364394
</tr>
365395
</tbody>
366396
</table>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import org.apache.flink.cdc.common.event.TableId;
2626
import org.apache.flink.cdc.common.schema.Column;
2727
import org.apache.flink.cdc.common.schema.Schema;
28+
import org.apache.flink.cdc.common.types.ArrayType;
29+
import org.apache.flink.cdc.common.types.DataField;
2830
import org.apache.flink.cdc.common.types.DecimalType;
31+
import org.apache.flink.cdc.common.types.MapType;
2932
import org.apache.flink.cdc.common.types.TimestampType;
3033
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
3134
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -249,6 +252,24 @@ public String convertSchemaToDebeziumSchema(Schema schema) {
249252

250253
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column column) {
251254
org.apache.flink.cdc.common.types.DataType columnType = column.getType();
255+
SchemaBuilder field = convertCDCDataTypeToDebeziumDataType(columnType);
256+
257+
if (columnType.isNullable()) {
258+
field.optional();
259+
} else {
260+
field.required();
261+
}
262+
if (column.getDefaultValueExpression() != null) {
263+
field.defaultValue(column.getDefaultValueExpression());
264+
}
265+
if (column.getComment() != null) {
266+
field.doc(column.getComment());
267+
}
268+
return field;
269+
}
270+
271+
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
272+
org.apache.flink.cdc.common.types.DataType columnType) {
252273
final SchemaBuilder field;
253274
switch (columnType.getTypeRoot()) {
254275
case TINYINT:
@@ -310,23 +331,37 @@ private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column column)
310331
.orElse(0)))
311332
.version(1);
312333
break;
334+
case ARRAY:
335+
ArrayType arrayType = (ArrayType) columnType;
336+
org.apache.kafka.connect.data.Schema elementSchema =
337+
convertCDCDataTypeToDebeziumDataType(arrayType.getElementType()).build();
338+
field = SchemaBuilder.array(elementSchema);
339+
break;
340+
case MAP:
341+
MapType mapType = (MapType) columnType;
342+
org.apache.kafka.connect.data.Schema keySchema =
343+
convertCDCDataTypeToDebeziumDataType(mapType.getKeyType()).build();
344+
org.apache.kafka.connect.data.Schema valueSchema =
345+
convertCDCDataTypeToDebeziumDataType(mapType.getValueType()).build();
346+
field = SchemaBuilder.map(keySchema, valueSchema);
347+
break;
348+
case ROW:
349+
org.apache.flink.cdc.common.types.RowType rowType =
350+
(org.apache.flink.cdc.common.types.RowType) columnType;
351+
SchemaBuilder structBuilder = SchemaBuilder.struct();
352+
for (DataField dataField : rowType.getFields()) {
353+
org.apache.kafka.connect.data.Schema fieldSchema =
354+
convertCDCDataTypeToDebeziumDataType(dataField.getType()).build();
355+
structBuilder.field(dataField.getName(), fieldSchema);
356+
}
357+
field = structBuilder;
358+
break;
313359
case CHAR:
314360
case VARCHAR:
315361
default:
316362
field = SchemaBuilder.string();
317363
}
318364

319-
if (columnType.isNullable()) {
320-
field.optional();
321-
} else {
322-
field.required();
323-
}
324-
if (column.getDefaultValueExpression() != null) {
325-
field.defaultValue(column.getDefaultValueExpression());
326-
}
327-
if (column.getComment() != null) {
328-
field.doc(column.getComment());
329-
}
330365
return field;
331366
}
332367

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,4 +340,101 @@ void testSerializeComplexTypes() throws Exception {
340340
assertThat(rowNode.has("f1")).isTrue();
341341
assertThat(rowNode.has("f2")).isTrue();
342342
}
343+
344+
@Test
345+
void testSerializeWithSchemaComplexTypes() throws Exception {
346+
ObjectMapper mapper =
347+
JacksonMapperFactory.createObjectMapper()
348+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
349+
Map<String, String> properties = new HashMap<>();
350+
properties.put("include-schema.enabled", "true");
351+
Configuration configuration = Configuration.fromMap(properties);
352+
SerializationSchema<Event> serializationSchema =
353+
ChangeLogJsonFormatFactory.createSerializationSchema(
354+
configuration, JsonSerializationType.DEBEZIUM_JSON, ZoneId.systemDefault());
355+
serializationSchema.open(new MockInitializationContext());
356+
357+
// create table with complex types
358+
Schema schema =
359+
Schema.newBuilder()
360+
.physicalColumn("id", DataTypes.INT())
361+
.physicalColumn("arr", DataTypes.ARRAY(DataTypes.STRING()))
362+
.physicalColumn("map", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
363+
.physicalColumn(
364+
"row",
365+
DataTypes.ROW(
366+
DataTypes.FIELD("f1", DataTypes.STRING()),
367+
DataTypes.FIELD("f2", DataTypes.INT())))
368+
.primaryKey("id")
369+
.build();
370+
371+
RowType rowType =
372+
RowType.of(
373+
DataTypes.INT(),
374+
DataTypes.ARRAY(DataTypes.STRING()),
375+
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
376+
DataTypes.ROW(
377+
DataTypes.FIELD("f1", DataTypes.STRING()),
378+
DataTypes.FIELD("f2", DataTypes.INT())));
379+
380+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
381+
assertThat(serializationSchema.serialize(createTableEvent)).isNull();
382+
383+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
384+
385+
// Create test data with complex types
386+
org.apache.flink.cdc.common.data.GenericArrayData arrayData =
387+
new org.apache.flink.cdc.common.data.GenericArrayData(
388+
new Object[] {
389+
BinaryStringData.fromString("item1"),
390+
BinaryStringData.fromString("item2")
391+
});
392+
393+
Map<Object, Object> mapValues = new HashMap<>();
394+
mapValues.put(BinaryStringData.fromString("key1"), 100);
395+
mapValues.put(BinaryStringData.fromString("key2"), 200);
396+
org.apache.flink.cdc.common.data.GenericMapData mapData =
397+
new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
398+
399+
BinaryRecordDataGenerator nestedRowGenerator =
400+
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.INT()));
401+
org.apache.flink.cdc.common.data.RecordData nestedRow =
402+
nestedRowGenerator.generate(
403+
new Object[] {BinaryStringData.fromString("nested"), 42});
404+
405+
// insert event with complex types
406+
DataChangeEvent insertEvent =
407+
DataChangeEvent.insertEvent(
408+
TABLE_1,
409+
generator.generate(new Object[] {1, arrayData, mapData, nestedRow}));
410+
411+
byte[] serialized = serializationSchema.serialize(insertEvent);
412+
JsonNode actual = mapper.readTree(serialized);
413+
414+
JsonNode expected =
415+
mapper.readTree(
416+
"{\"schema\":{\"type\":\"struct\",\"fields\":["
417+
+ "{\"type\":\"struct\",\"fields\":["
418+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
419+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
420+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
421+
+ "{\"type\":\"struct\",\"fields\":["
422+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
423+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
424+
+ "],\"optional\":true,\"field\":\"row\"}"
425+
+ "],\"optional\":true,\"field\":\"before\"},"
426+
+ "{\"type\":\"struct\",\"fields\":["
427+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
428+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
429+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
430+
+ "{\"type\":\"struct\",\"fields\":["
431+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
432+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
433+
+ "],\"optional\":true,\"field\":\"row\"}"
434+
+ "],\"optional\":true,\"field\":\"after\"}"
435+
+ "],\"optional\":false},"
436+
+ "\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
437+
438+
assertThat(actual).isEqualTo(expected);
439+
}
343440
}

0 commit comments

Comments
 (0)