diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java index 5f5b5fec17ad..f7821ed5fdd7 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java @@ -37,6 +37,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Array; import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; @@ -262,8 +263,8 @@ private Object convertArrowData( private Object convertMap( int rowIndex, Converter converter, FieldVector fieldVector, MapType mapType) { - SqlType keyType = mapType.getKeyType().getSqlType(); - SqlType valueType = mapType.getValueType().getSqlType(); + SeaTunnelDataType keyType = mapType.getKeyType(); + SeaTunnelDataType valueType = mapType.getValueType(); Map fieldConverters = new HashMap<>(); fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType)); fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType)); @@ -272,10 +273,20 @@ private Object convertMap( private Object convertArray( int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) { - SqlType elementType = arrayType.getElementType().getSqlType(); + SeaTunnelDataType elementType = arrayType.getElementType(); Map fieldConverters = new HashMap<>(); fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType)); - return converter.convert(rowIndex, fieldVector, fieldConverters); + Object convertedValue = converter.convert(rowIndex, fieldVector, fieldConverters); + if (convertedValue instanceof List) { + List list = (List) convertedValue; + Class componentType = arrayType.getElementType().getTypeClass(); + Object array = Array.newInstance(componentType, list.size()); + for (int i = 0; i < list.size(); i++) { + Array.set(array, i, list.get(i)); + } + return array; + } + return convertedValue; } private Object convertRow( @@ -284,13 +295,26 @@ private Object convertRow( List> fieldTypes = rowType.getChildren(); Map fieldConverters = new HashMap<>(); for (int i = 0; i < fieldTypes.size(); i++) { - fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i).getSqlType())); + fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i))); } return converter.convert(rowIndex, fieldVector, fieldConverters); } - private Function genericsConvert(SqlType sqlType) { - return value -> convertSeatunnelRowValue(sqlType, null, value); + private Function genericsConvert(SeaTunnelDataType dataType) { + return value -> { + if (dataType instanceof ArrayType) { + if (value instanceof List) { + List list = (List) value; + Class componentType = ((ArrayType) dataType).getElementType().getTypeClass(); + Object array = Array.newInstance(componentType, list.size()); + for (int i = 0; i < list.size(); i++) { + Array.set(array, i, list.get(i)); + } + return array; + } + } + return convertSeatunnelRowValue(dataType.getSqlType(), null, value); + }; } @Override diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java index 79a26743dfb7..24e923c9ca38 100644 --- a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java +++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java @@ -393,11 +393,15 @@ public void testSeatunnelRow() throws Exception { Collections.singletonList(localDateTime.toLocalDate()), actualDate2Data); // check array int List actualArrayIntData = - rows.stream().map(s -> s.getField(16)).collect(Collectors.toList()); + rows.stream() + .map(s -> Arrays.asList((Integer[]) s.getField(16))) + .collect(Collectors.toList()); Assertions.assertIterableEquals(arrayData1, actualArrayIntData); // check array timestamp List actualArrayTimestampData = - rows.stream().map(s -> s.getField(17)).collect(Collectors.toList()); + rows.stream() + .map(s -> Arrays.asList((LocalDateTime[]) s.getField(17))) + .collect(Collectors.toList()); Assertions.assertIterableEquals(arrayData2, actualArrayTimestampData); // check SECOND timestamp without timezone List actualTimestampSecData = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index d817c7743b43..f62454a8612c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -717,24 +717,24 @@ private List genDuplicateTableTestData(Long nums) { GenerateTestData.genDateString(), GenerateTestData.genJsonString(), GenerateTestData.genJsonString(), - (new boolean[] {true, true, false}).toString(), - (new int[] {1, 2, 3}).toString(), - (new int[] {1, 2, 3}).toString(), - (new int[] {1, 2, 3}).toString(), - (new long[] {1L, 2L, 3L}).toString(), - (new float[] {1.0F, 1.0F, 1.0F}).toString(), - (new double[] {1.0, 1.0, 1.0}).toString(), - (new String[] {"1", "1"}).toString(), - (new String[] {"1", "1"}).toString(), - (new String[] {"1", "1"}).toString(), - (new String[] {"1", "1"}).toString(), - (new BigDecimal[] { + Arrays.toString(new boolean[] {true, true, false}), + Arrays.toString(new byte[] {1, 2, 3}), + Arrays.toString(new short[] {1, 2, 3}), + Arrays.toString(new int[] {1, 2, 3}), + Arrays.toString(new long[] {1L, 2L, 3L}), + Arrays.toString(new float[] {1.0F, 1.0F, 1.0F}), + Arrays.toString(new double[] {1.0, 1.0, 1.0}), + Arrays.toString(new String[] {"1", "1"}), + Arrays.toString(new String[] {"1", "1"}), + Arrays.toString(new String[] {"1", "1"}), + Arrays.toString(new String[] {"1", "1"}), + Arrays.toString( + new BigDecimal[] { new BigDecimal("10.02"), new BigDecimal("10.03") - }) - .toString(), - (new String[] {"2020-06-09", "2020-06-10"}).toString(), - (new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"}) - .toString() + }), + Arrays.toString(new String[] {"2020-06-09", "2020-06-10"}), + Arrays.toString( + new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"}) })); } log.info("generate test data succeed"); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java index f68c2892f9d3..0258c526616a 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java @@ -114,22 +114,17 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { case DECIMAL: return Decimal.apply((BigDecimal) field); case ARRAY: - Class elementTypeClass = - ((ArrayType) dataType).getElementType().getTypeClass(); - - if (((ArrayType) dataType).getElementType() instanceof MapType) { + SeaTunnelDataType elementType = ((ArrayType) dataType).getElementType(); + if (elementType instanceof MapType) { Object arrayMap = Array.newInstance(ArrayBasedMapData.class, ((Map[]) field).length); for (int i = 0; i < ((Map[]) field).length; i++) { Map value = (Map) ((Map[]) field)[i]; - MapType type = - (MapType) ((ArrayType) dataType).getElementType(); - Array.set(arrayMap, i, convertMap(value, type)); + Array.set(arrayMap, i, convertMap(value, (MapType) elementType)); } return ArrayData.toArrayData(arrayMap); } - // if string array, we need to covert every item in array from String to UTF8String - if (((ArrayType) dataType).getElementType().equals(BasicType.STRING_TYPE)) { + if (elementType.equals(BasicType.STRING_TYPE)) { Object[] fields = (Object[]) field; UTF8String[] objects = Arrays.stream(fields) @@ -137,13 +132,13 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { .toArray(UTF8String[]::new); return ArrayData.toArrayData(objects); } - // except string, now only support convert boolean int tinyint smallint bigint float - // double, because SeaTunnel Array only support these types - Object array = Array.newInstance(elementTypeClass, ((Object[]) field).length); - for (int i = 0; i < ((Object[]) field).length; i++) { - Array.set(array, i, ((Object[]) field)[i]); + + Object[] arrayData = (Object[]) field; + Object[] convertedArray = new Object[arrayData.length]; + for (int i = 0; i < arrayData.length; i++) { + convertedArray[i] = convert(arrayData[i], elementType); } - return ArrayData.toArrayData(field); + return ArrayData.toArrayData(convertedArray); default: if (field instanceof Some) { return ((Some) field).get();