Skip to content

Commit 701a1c7

Browse files
author
guoxuanlin
committed
[FLINK-38889][pipeline][kafka] Support serializing complex types(MAP, ARRAY, ROW) to JSON (Debezium / Canal)
1 parent 42580dc commit 701a1c7

File tree

6 files changed

+1184
-66
lines changed

6 files changed

+1184
-66
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818
package org.apache.flink.cdc.connectors.kafka.json;
1919

2020
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.cdc.common.data.ArrayData;
22+
import org.apache.flink.cdc.common.data.MapData;
2123
import org.apache.flink.cdc.common.data.RecordData;
2224
import org.apache.flink.cdc.common.event.TableId;
2325
import org.apache.flink.cdc.common.schema.Schema;
26+
import org.apache.flink.cdc.common.types.ArrayType;
27+
import org.apache.flink.cdc.common.types.DataField;
2428
import org.apache.flink.cdc.common.types.DataType;
2529
import org.apache.flink.cdc.common.types.DataTypeChecks;
30+
import org.apache.flink.cdc.common.types.MapType;
31+
import org.apache.flink.cdc.common.types.RowType;
2632
import org.apache.flink.table.data.DecimalData;
2733
import org.apache.flink.table.data.GenericRowData;
2834
import org.apache.flink.table.data.RowData;
@@ -32,7 +38,9 @@
3238

3339
import java.time.ZoneId;
3440
import java.util.ArrayList;
41+
import java.util.HashMap;
3542
import java.util.List;
43+
import java.util.Map;
3644

3745
import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
3846
import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
@@ -173,6 +181,27 @@ record ->
173181
DataTypeChecks.getPrecision(fieldType))
174182
.toInstant());
175183
break;
184+
case ARRAY:
185+
fieldGetter =
186+
record ->
187+
convertArrayData(
188+
record.getArray(fieldPos), (ArrayType) fieldType, zoneId);
189+
break;
190+
case MAP:
191+
fieldGetter =
192+
record ->
193+
convertMapData(
194+
record.getMap(fieldPos), (MapType) fieldType, zoneId);
195+
break;
196+
case ROW:
197+
fieldGetter =
198+
record ->
199+
convertRowData(
200+
record.getRow(
201+
fieldPos, ((RowType) fieldType).getFieldCount()),
202+
(RowType) fieldType,
203+
zoneId);
204+
break;
176205
default:
177206
throw new IllegalArgumentException(
178207
"don't support type of " + fieldType.getTypeRoot());
@@ -195,4 +224,231 @@ public Schema getSchema() {
195224
public SerializationSchema<RowData> getSerializationSchema() {
196225
return serializationSchema;
197226
}
227+
228+
/**
229+
* Convert CDC ArrayData to Flink Table ArrayData.
230+
*
231+
* @param arrayData CDC array data
232+
* @param arrayType array type information
233+
* @param zoneId time zone for temporal type conversion
234+
* @return Flink Table ArrayData
235+
*/
236+
private static org.apache.flink.table.data.ArrayData convertArrayData(
237+
ArrayData arrayData, ArrayType arrayType, ZoneId zoneId) {
238+
if (arrayData == null) {
239+
return null;
240+
}
241+
242+
DataType elementType = arrayType.getElementType();
243+
int size = arrayData.size();
244+
Object[] result = new Object[size];
245+
246+
for (int i = 0; i < size; i++) {
247+
result[i] = convertElement(arrayData, i, elementType, zoneId);
248+
}
249+
250+
return new org.apache.flink.table.data.GenericArrayData(result);
251+
}
252+
253+
/**
254+
* Convert CDC MapData to Flink Table MapData.
255+
*
256+
* @param mapData CDC map data
257+
* @param mapType map type information
258+
* @param zoneId time zone for temporal type conversion
259+
* @return Flink Table MapData
260+
*/
261+
private static org.apache.flink.table.data.MapData convertMapData(
262+
MapData mapData, MapType mapType, ZoneId zoneId) {
263+
if (mapData == null) {
264+
return null;
265+
}
266+
267+
ArrayData keyArray = mapData.keyArray();
268+
ArrayData valueArray = mapData.valueArray();
269+
270+
int size = keyArray.size();
271+
Map<Object, Object> result = new HashMap<>();
272+
273+
DataType keyType = mapType.getKeyType();
274+
DataType valueType = mapType.getValueType();
275+
276+
for (int i = 0; i < size; i++) {
277+
Object key = convertElement(keyArray, i, keyType, zoneId);
278+
Object value = convertElement(valueArray, i, valueType, zoneId);
279+
result.put(key, value);
280+
}
281+
282+
return new org.apache.flink.table.data.GenericMapData(result);
283+
}
284+
285+
/**
286+
* Convert CDC RecordData to Flink Table RowData.
287+
*
288+
* @param recordData CDC record data
289+
* @param rowType row type information
290+
* @param zoneId time zone for temporal type conversion
291+
* @return Flink Table RowData
292+
*/
293+
private static RowData convertRowData(RecordData recordData, RowType rowType, ZoneId zoneId) {
294+
if (recordData == null) {
295+
return null;
296+
}
297+
298+
List<DataField> fields = rowType.getFields();
299+
GenericRowData rowData = new GenericRowData(fields.size());
300+
301+
for (int i = 0; i < fields.size(); i++) {
302+
DataField field = fields.get(i);
303+
Object value = convertFieldValue(recordData, i, field.getType(), zoneId);
304+
rowData.setField(i, value);
305+
}
306+
307+
return rowData;
308+
}
309+
310+
/**
311+
* Convert a single element from ArrayData.
312+
*
313+
* @param arrayData the array data
314+
* @param pos position in the array
315+
* @param elementType element type
316+
* @param zoneId time zone for temporal type conversion
317+
* @return converted element
318+
*/
319+
private static Object convertElement(
320+
ArrayData arrayData, int pos, DataType elementType, ZoneId zoneId) {
321+
if (arrayData.isNullAt(pos)) {
322+
return null;
323+
}
324+
325+
switch (elementType.getTypeRoot()) {
326+
case CHAR:
327+
case VARCHAR:
328+
return BinaryStringData.fromString(arrayData.getString(pos).toString());
329+
330+
case BOOLEAN:
331+
return arrayData.getBoolean(pos);
332+
case BINARY:
333+
case VARBINARY:
334+
return arrayData.getBinary(pos);
335+
case DECIMAL:
336+
final int decimalPrecision = getPrecision(elementType);
337+
final int decimalScale = getScale(elementType);
338+
return DecimalData.fromBigDecimal(
339+
arrayData
340+
.getDecimal(pos, getPrecision(elementType), getScale(elementType))
341+
.toBigDecimal(),
342+
decimalPrecision,
343+
decimalScale);
344+
case TINYINT:
345+
return arrayData.getByte(pos);
346+
case SMALLINT:
347+
return arrayData.getShort(pos);
348+
case INTEGER:
349+
case DATE:
350+
case TIME_WITHOUT_TIME_ZONE:
351+
return arrayData.getInt(pos);
352+
case BIGINT:
353+
return arrayData.getLong(pos);
354+
case FLOAT:
355+
return arrayData.getFloat(pos);
356+
case DOUBLE:
357+
return arrayData.getDouble(pos);
358+
case TIMESTAMP_WITHOUT_TIME_ZONE:
359+
return TimestampData.fromTimestamp(
360+
arrayData.getTimestamp(pos, getPrecision(elementType)).toTimestamp());
361+
case TIMESTAMP_WITH_TIME_ZONE:
362+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
363+
return TimestampData.fromInstant(
364+
arrayData
365+
.getLocalZonedTimestamp(pos, getPrecision(elementType))
366+
.toInstant());
367+
case ARRAY:
368+
return convertArrayData(arrayData.getArray(pos), (ArrayType) elementType, zoneId);
369+
case MAP:
370+
return convertMapData(arrayData.getMap(pos), (MapType) elementType, zoneId);
371+
case ROW:
372+
return convertRowData(
373+
arrayData.getRecord(pos, ((RowType) elementType).getFieldCount()),
374+
(RowType) elementType,
375+
zoneId);
376+
default:
377+
throw new IllegalArgumentException(
378+
"Unsupported element type: " + elementType.getTypeRoot());
379+
}
380+
}
381+
382+
/**
383+
* Convert a field value from RecordData.
384+
*
385+
* @param recordData the record data
386+
* @param pos position in the record
387+
* @param fieldType field type
388+
* @param zoneId time zone for temporal type conversion
389+
* @return converted field value
390+
*/
391+
private static Object convertFieldValue(
392+
RecordData recordData, int pos, DataType fieldType, ZoneId zoneId) {
393+
if (recordData.isNullAt(pos)) {
394+
return null;
395+
}
396+
397+
switch (fieldType.getTypeRoot()) {
398+
case CHAR:
399+
case VARCHAR:
400+
return BinaryStringData.fromString(recordData.getString(pos).toString());
401+
case BOOLEAN:
402+
return recordData.getBoolean(pos);
403+
case BINARY:
404+
case VARBINARY:
405+
return recordData.getBinary(pos);
406+
case DECIMAL:
407+
final int decimalPrecision = getPrecision(fieldType);
408+
final int decimalScale = getScale(fieldType);
409+
return DecimalData.fromBigDecimal(
410+
recordData
411+
.getDecimal(pos, getPrecision(fieldType), getScale(fieldType))
412+
.toBigDecimal(),
413+
decimalPrecision,
414+
decimalScale);
415+
case TINYINT:
416+
return recordData.getByte(pos);
417+
case SMALLINT:
418+
return recordData.getShort(pos);
419+
case INTEGER:
420+
return recordData.getInt(pos);
421+
case DATE:
422+
return recordData.getDate(pos).toEpochDay();
423+
case TIME_WITHOUT_TIME_ZONE:
424+
return recordData.getTime(pos).toMillisOfDay();
425+
case BIGINT:
426+
return recordData.getLong(pos);
427+
case FLOAT:
428+
return recordData.getFloat(pos);
429+
case DOUBLE:
430+
return recordData.getDouble(pos);
431+
case TIMESTAMP_WITHOUT_TIME_ZONE:
432+
return TimestampData.fromTimestamp(
433+
recordData.getTimestamp(pos, getPrecision(fieldType)).toTimestamp());
434+
case TIMESTAMP_WITH_TIME_ZONE:
435+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
436+
return TimestampData.fromInstant(
437+
recordData
438+
.getLocalZonedTimestampData(pos, getPrecision(fieldType))
439+
.toInstant());
440+
case ARRAY:
441+
return convertArrayData(recordData.getArray(pos), (ArrayType) fieldType, zoneId);
442+
case MAP:
443+
return convertMapData(recordData.getMap(pos), (MapType) fieldType, zoneId);
444+
case ROW:
445+
return convertRowData(
446+
recordData.getRow(pos, ((RowType) fieldType).getFieldCount()),
447+
(RowType) fieldType,
448+
zoneId);
449+
default:
450+
throw new IllegalArgumentException(
451+
"Unsupported field type: " + fieldType.getTypeRoot());
452+
}
453+
}
198454
}

0 commit comments

Comments
 (0)