Skip to content

Commit 65d8c00

Browse files
authored
[Improve] support nested type fields in ArrayData (#606)
1 parent 9e06de9 commit 65d8c00

File tree

2 files changed

+88
-17
lines changed

2 files changed

+88
-17
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -339,26 +339,40 @@ private RowData convertRowData(Map<String, ?> row, LogicalType type) {
339339
}
340340

341341
private static List<Object> convertArrayData(ArrayData array, LogicalType type) {
342+
LogicalType elementType = ((ArrayType) type).getElementType();
343+
List<Object> values;
342344
if (array instanceof GenericArrayData) {
343-
return Arrays.asList(((GenericArrayData) array).toObjectArray());
345+
values = Arrays.asList(((GenericArrayData) array).toObjectArray());
346+
} else if (array instanceof BinaryArrayData) {
347+
values = Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
348+
} else {
349+
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
344350
}
345-
if (array instanceof BinaryArrayData) {
346-
LogicalType elementType = ((ArrayType) type).getElementType();
347-
List<Object> values =
348-
Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
349-
if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
350-
return values.stream()
351-
.map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
352-
.collect(Collectors.toList());
353-
}
354-
if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
355-
return values.stream()
356-
.map(arr -> convertArrayData((ArrayData) arr, elementType))
357-
.collect(Collectors.toList());
358-
}
359-
return values;
351+
352+
if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
353+
return values.stream()
354+
.map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
355+
.collect(Collectors.toList());
356+
}
357+
if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
358+
return values.stream()
359+
.map(arr -> convertArrayData((ArrayData) arr, elementType))
360+
.collect(Collectors.toList());
361+
}
362+
if (LogicalTypeRoot.MAP.equals(elementType.getTypeRoot())) {
363+
return values.stream()
364+
.map(arr -> writeValueAsString(convertMapData((MapData) arr, elementType)))
365+
.collect(Collectors.toList());
366+
}
367+
if (LogicalTypeRoot.ROW.equals(elementType.getTypeRoot())) {
368+
return values.stream()
369+
.map(
370+
arr ->
371+
writeValueAsString(
372+
convertRowData(GenericRowData.of(arr), 0, elementType)))
373+
.collect(Collectors.toList());
360374
}
361-
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
375+
return values;
362376
}
363377

364378
private static Object convertMapData(MapData map, LogicalType type) {

flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.table.catalog.Column;
2222
import org.apache.flink.table.catalog.ResolvedSchema;
2323
import org.apache.flink.table.data.DecimalData;
24+
import org.apache.flink.table.data.GenericArrayData;
2425
import org.apache.flink.table.data.GenericMapData;
2526
import org.apache.flink.table.data.GenericRowData;
2627
import org.apache.flink.table.data.StringData;
@@ -366,4 +367,60 @@ public static ResolvedSchema getRowMapSchema() {
366367
Column.physical(
367368
"f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
368369
}
370+
371+
@Test
372+
public void testArrayExternalConvert() {
373+
ResolvedSchema schema =
374+
ResolvedSchema.of(
375+
// list with string
376+
Column.physical("f1", DataTypes.ARRAY(DataTypes.STRING())),
377+
// list with list
378+
Column.physical("f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))),
379+
// list with row
380+
Column.physical(
381+
"f3",
382+
DataTypes.ARRAY(
383+
DataTypes.ROW(
384+
DataTypes.FIELD("l1", DataTypes.STRING()),
385+
DataTypes.FIELD("l2", DataTypes.INT())))),
386+
// list with map
387+
Column.physical(
388+
"f4",
389+
DataTypes.ARRAY(
390+
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))),
391+
// list with date
392+
Column.physical("f5", DataTypes.ARRAY(DataTypes.DATE())));
393+
394+
DorisRowConverter converter =
395+
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
396+
397+
Map<String, Integer> mapData1 = createMapAndPut(new HashMap<>(), "hello", 1);
398+
Map<String, Integer> mapData2 = createMapAndPut(new HashMap<>(), "world", 2);
399+
GenericRowData rowData =
400+
GenericRowData.of(
401+
new GenericArrayData(new String[] {"1", "2", "3"}),
402+
new GenericArrayData(
403+
new GenericArrayData[] {
404+
new GenericArrayData(new String[] {"1", "2", "3"}),
405+
new GenericArrayData(new String[] {"4", "5", "6"})
406+
}),
407+
new GenericArrayData(
408+
new GenericRowData[] {
409+
GenericRowData.of(StringData.fromString("on"), 1),
410+
GenericRowData.of(StringData.fromString("off"), 2)
411+
}),
412+
new GenericArrayData(
413+
new GenericMapData[] {
414+
new GenericMapData(mapData1), new GenericMapData(mapData2)
415+
}),
416+
new GenericArrayData(new int[] {1, 2, 3}));
417+
418+
List<Object> row = new ArrayList<>();
419+
for (int i = 0; i < rowData.getArity(); i++) {
420+
row.add(converter.convertExternal(rowData, i));
421+
}
422+
String expected =
423+
"[[1, 2, 3], [[1, 2, 3], [4, 5, 6]], [{\"l1\":\"on\",\"l2\":\"1\"}, {\"l1\":\"off\",\"l2\":\"2\"}], [{\"hello\":\"1\"}, {\"world\":\"2\"}], [1970-01-02, 1970-01-03, 1970-01-04]]";
424+
Assert.assertEquals(expected, row.toString());
425+
}
369426
}

0 commit comments

Comments
 (0)