Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit 4f07c6f

Browse files
[Fix][Connector-v2][Doris] Array type data parsing failed (apache#10095)
1 parent 5f4e944 commit 4f07c6f

File tree

4 files changed

+64
-41
lines changed
  • seatunnel-connectors-v2/connector-common/src
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris
  • seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization

4 files changed

+64
-41
lines changed

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import java.io.ByteArrayInputStream;
3939
import java.io.IOException;
40+
import java.lang.reflect.Array;
4041
import java.math.BigDecimal;
4142
import java.time.Instant;
4243
import java.time.LocalDate;
@@ -262,8 +263,8 @@ private Object convertArrowData(
262263

263264
private Object convertMap(
264265
int rowIndex, Converter converter, FieldVector fieldVector, MapType mapType) {
265-
SqlType keyType = mapType.getKeyType().getSqlType();
266-
SqlType valueType = mapType.getValueType().getSqlType();
266+
SeaTunnelDataType keyType = mapType.getKeyType();
267+
SeaTunnelDataType valueType = mapType.getValueType();
267268
Map<String, Function> fieldConverters = new HashMap<>();
268269
fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType));
269270
fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType));
@@ -272,10 +273,20 @@ private Object convertMap(
272273

273274
private Object convertArray(
274275
int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) {
275-
SqlType elementType = arrayType.getElementType().getSqlType();
276+
SeaTunnelDataType elementType = arrayType.getElementType();
276277
Map<String, Function> fieldConverters = new HashMap<>();
277278
fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType));
278-
return converter.convert(rowIndex, fieldVector, fieldConverters);
279+
Object convertedValue = converter.convert(rowIndex, fieldVector, fieldConverters);
280+
if (convertedValue instanceof List) {
281+
List<?> list = (List<?>) convertedValue;
282+
Class<?> componentType = arrayType.getElementType().getTypeClass();
283+
Object array = Array.newInstance(componentType, list.size());
284+
for (int i = 0; i < list.size(); i++) {
285+
Array.set(array, i, list.get(i));
286+
}
287+
return array;
288+
}
289+
return convertedValue;
279290
}
280291

281292
private Object convertRow(
@@ -284,13 +295,26 @@ private Object convertRow(
284295
List<SeaTunnelDataType<?>> fieldTypes = rowType.getChildren();
285296
Map<String, Function> fieldConverters = new HashMap<>();
286297
for (int i = 0; i < fieldTypes.size(); i++) {
287-
fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i).getSqlType()));
298+
fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i)));
288299
}
289300
return converter.convert(rowIndex, fieldVector, fieldConverters);
290301
}
291302

292-
private Function<Object, Object> genericsConvert(SqlType sqlType) {
293-
return value -> convertSeatunnelRowValue(sqlType, null, value);
303+
private Function<Object, Object> genericsConvert(SeaTunnelDataType dataType) {
304+
return value -> {
305+
if (dataType instanceof ArrayType) {
306+
if (value instanceof List) {
307+
List<?> list = (List<?>) value;
308+
Class<?> componentType = ((ArrayType) dataType).getElementType().getTypeClass();
309+
Object array = Array.newInstance(componentType, list.size());
310+
for (int i = 0; i < list.size(); i++) {
311+
Array.set(array, i, list.get(i));
312+
}
313+
return array;
314+
}
315+
}
316+
return convertSeatunnelRowValue(dataType.getSqlType(), null, value);
317+
};
294318
}
295319

296320
@Override

seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,11 +393,15 @@ public void testSeatunnelRow() throws Exception {
393393
Collections.singletonList(localDateTime.toLocalDate()), actualDate2Data);
394394
// check array int
395395
List<Object> actualArrayIntData =
396-
rows.stream().map(s -> s.getField(16)).collect(Collectors.toList());
396+
rows.stream()
397+
.map(s -> Arrays.asList((Integer[]) s.getField(16)))
398+
.collect(Collectors.toList());
397399
Assertions.assertIterableEquals(arrayData1, actualArrayIntData);
398400
// check array timestamp
399401
List<Object> actualArrayTimestampData =
400-
rows.stream().map(s -> s.getField(17)).collect(Collectors.toList());
402+
rows.stream()
403+
.map(s -> Arrays.asList((LocalDateTime[]) s.getField(17)))
404+
.collect(Collectors.toList());
401405
Assertions.assertIterableEquals(arrayData2, actualArrayTimestampData);
402406
// check SECOND timestamp without timezone
403407
List<Object> actualTimestampSecData =

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -717,24 +717,24 @@ private List<SeaTunnelRow> genDuplicateTableTestData(Long nums) {
717717
GenerateTestData.genDateString(),
718718
GenerateTestData.genJsonString(),
719719
GenerateTestData.genJsonString(),
720-
(new boolean[] {true, true, false}).toString(),
721-
(new int[] {1, 2, 3}).toString(),
722-
(new int[] {1, 2, 3}).toString(),
723-
(new int[] {1, 2, 3}).toString(),
724-
(new long[] {1L, 2L, 3L}).toString(),
725-
(new float[] {1.0F, 1.0F, 1.0F}).toString(),
726-
(new double[] {1.0, 1.0, 1.0}).toString(),
727-
(new String[] {"1", "1"}).toString(),
728-
(new String[] {"1", "1"}).toString(),
729-
(new String[] {"1", "1"}).toString(),
730-
(new String[] {"1", "1"}).toString(),
731-
(new BigDecimal[] {
720+
Arrays.toString(new boolean[] {true, true, false}),
721+
Arrays.toString(new byte[] {1, 2, 3}),
722+
Arrays.toString(new short[] {1, 2, 3}),
723+
Arrays.toString(new int[] {1, 2, 3}),
724+
Arrays.toString(new long[] {1L, 2L, 3L}),
725+
Arrays.toString(new float[] {1.0F, 1.0F, 1.0F}),
726+
Arrays.toString(new double[] {1.0, 1.0, 1.0}),
727+
Arrays.toString(new String[] {"1", "1"}),
728+
Arrays.toString(new String[] {"1", "1"}),
729+
Arrays.toString(new String[] {"1", "1"}),
730+
Arrays.toString(new String[] {"1", "1"}),
731+
Arrays.toString(
732+
new BigDecimal[] {
732733
new BigDecimal("10.02"), new BigDecimal("10.03")
733-
})
734-
.toString(),
735-
(new String[] {"2020-06-09", "2020-06-10"}).toString(),
736-
(new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"})
737-
.toString()
734+
}),
735+
Arrays.toString(new String[] {"2020-06-09", "2020-06-10"}),
736+
Arrays.toString(
737+
new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"})
738738
}));
739739
}
740740
log.info("generate test data succeed");

seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,36 +114,31 @@ private static Object convert(Object field, SeaTunnelDataType<?> dataType) {
114114
case DECIMAL:
115115
return Decimal.apply((BigDecimal) field);
116116
case ARRAY:
117-
Class<?> elementTypeClass =
118-
((ArrayType<?, ?>) dataType).getElementType().getTypeClass();
119-
120-
if (((ArrayType<?, ?>) dataType).getElementType() instanceof MapType) {
117+
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
118+
if (elementType instanceof MapType) {
121119
Object arrayMap =
122120
Array.newInstance(ArrayBasedMapData.class, ((Map[]) field).length);
123121
for (int i = 0; i < ((Map[]) field).length; i++) {
124122
Map<?, ?> value = (Map<?, ?>) ((Map[]) field)[i];
125-
MapType<?, ?> type =
126-
(MapType<?, ?>) ((ArrayType<?, ?>) dataType).getElementType();
127-
Array.set(arrayMap, i, convertMap(value, type));
123+
Array.set(arrayMap, i, convertMap(value, (MapType<?, ?>) elementType));
128124
}
129125
return ArrayData.toArrayData(arrayMap);
130126
}
131-
// if string array, we need to covert every item in array from String to UTF8String
132-
if (((ArrayType<?, ?>) dataType).getElementType().equals(BasicType.STRING_TYPE)) {
127+
if (elementType.equals(BasicType.STRING_TYPE)) {
133128
Object[] fields = (Object[]) field;
134129
UTF8String[] objects =
135130
Arrays.stream(fields)
136131
.map(v -> UTF8String.fromString((String) v))
137132
.toArray(UTF8String[]::new);
138133
return ArrayData.toArrayData(objects);
139134
}
140-
// except string, now only support convert boolean int tinyint smallint bigint float
141-
// double, because SeaTunnel Array only support these types
142-
Object array = Array.newInstance(elementTypeClass, ((Object[]) field).length);
143-
for (int i = 0; i < ((Object[]) field).length; i++) {
144-
Array.set(array, i, ((Object[]) field)[i]);
135+
136+
Object[] arrayData = (Object[]) field;
137+
Object[] convertedArray = new Object[arrayData.length];
138+
for (int i = 0; i < arrayData.length; i++) {
139+
convertedArray[i] = convert(arrayData[i], elementType);
145140
}
146-
return ArrayData.toArrayData(field);
141+
return ArrayData.toArrayData(convertedArray);
147142
default:
148143
if (field instanceof Some) {
149144
return ((Some<?>) field).get();

0 commit comments

Comments
 (0)