Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -262,8 +263,8 @@ private Object convertArrowData(

private Object convertMap(
int rowIndex, Converter converter, FieldVector fieldVector, MapType mapType) {
SqlType keyType = mapType.getKeyType().getSqlType();
SqlType valueType = mapType.getValueType().getSqlType();
SeaTunnelDataType keyType = mapType.getKeyType();
SeaTunnelDataType valueType = mapType.getValueType();
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType));
fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType));
Expand All @@ -272,10 +273,20 @@ private Object convertMap(

private Object convertArray(
int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) {
SqlType elementType = arrayType.getElementType().getSqlType();
SeaTunnelDataType elementType = arrayType.getElementType();
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType));
return converter.convert(rowIndex, fieldVector, fieldConverters);
Object convertedValue = converter.convert(rowIndex, fieldVector, fieldConverters);
if (convertedValue instanceof List) {
List<?> list = (List<?>) convertedValue;
Class<?> componentType = arrayType.getElementType().getTypeClass();
Object array = Array.newInstance(componentType, list.size());
for (int i = 0; i < list.size(); i++) {
Array.set(array, i, list.get(i));
}
return array;
}
return convertedValue;
}

private Object convertRow(
Expand All @@ -284,13 +295,26 @@ private Object convertRow(
List<SeaTunnelDataType<?>> fieldTypes = rowType.getChildren();
Map<String, Function> fieldConverters = new HashMap<>();
for (int i = 0; i < fieldTypes.size(); i++) {
fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i).getSqlType()));
fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i)));
}
return converter.convert(rowIndex, fieldVector, fieldConverters);
}

private Function<Object, Object> genericsConvert(SqlType sqlType) {
return value -> convertSeatunnelRowValue(sqlType, null, value);
private Function<Object, Object> genericsConvert(SeaTunnelDataType dataType) {
return value -> {
if (dataType instanceof ArrayType) {
if (value instanceof List) {
List<?> list = (List<?>) value;
Class<?> componentType = ((ArrayType) dataType).getElementType().getTypeClass();
Object array = Array.newInstance(componentType, list.size());
for (int i = 0; i < list.size(); i++) {
Array.set(array, i, list.get(i));
}
return array;
}
}
return convertSeatunnelRowValue(dataType.getSqlType(), null, value);
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,15 @@ public void testSeatunnelRow() throws Exception {
Collections.singletonList(localDateTime.toLocalDate()), actualDate2Data);
// check array int
List<Object> actualArrayIntData =
rows.stream().map(s -> s.getField(16)).collect(Collectors.toList());
rows.stream()
.map(s -> Arrays.asList((Integer[]) s.getField(16)))
.collect(Collectors.toList());
Assertions.assertIterableEquals(arrayData1, actualArrayIntData);
// check array timestamp
List<Object> actualArrayTimestampData =
rows.stream().map(s -> s.getField(17)).collect(Collectors.toList());
rows.stream()
.map(s -> Arrays.asList((LocalDateTime[]) s.getField(17)))
.collect(Collectors.toList());
Assertions.assertIterableEquals(arrayData2, actualArrayTimestampData);
// check SECOND timestamp without timezone
List<Object> actualTimestampSecData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,24 +717,24 @@ private List<SeaTunnelRow> genDuplicateTableTestData(Long nums) {
GenerateTestData.genDateString(),
GenerateTestData.genJsonString(),
GenerateTestData.genJsonString(),
(new boolean[] {true, true, false}).toString(),
(new int[] {1, 2, 3}).toString(),
(new int[] {1, 2, 3}).toString(),
(new int[] {1, 2, 3}).toString(),
(new long[] {1L, 2L, 3L}).toString(),
(new float[] {1.0F, 1.0F, 1.0F}).toString(),
(new double[] {1.0, 1.0, 1.0}).toString(),
(new String[] {"1", "1"}).toString(),
(new String[] {"1", "1"}).toString(),
(new String[] {"1", "1"}).toString(),
(new String[] {"1", "1"}).toString(),
(new BigDecimal[] {
Arrays.toString(new boolean[] {true, true, false}),
Arrays.toString(new byte[] {1, 2, 3}),
Arrays.toString(new short[] {1, 2, 3}),
Arrays.toString(new int[] {1, 2, 3}),
Arrays.toString(new long[] {1L, 2L, 3L}),
Arrays.toString(new float[] {1.0F, 1.0F, 1.0F}),
Arrays.toString(new double[] {1.0, 1.0, 1.0}),
Arrays.toString(new String[] {"1", "1"}),
Arrays.toString(new String[] {"1", "1"}),
Arrays.toString(new String[] {"1", "1"}),
Arrays.toString(new String[] {"1", "1"}),
Arrays.toString(
new BigDecimal[] {
new BigDecimal("10.02"), new BigDecimal("10.03")
})
.toString(),
(new String[] {"2020-06-09", "2020-06-10"}).toString(),
(new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"})
.toString()
}),
Arrays.toString(new String[] {"2020-06-09", "2020-06-10"}),
Arrays.toString(
new String[] {"2020-06-09 12:02:02", "2020-06-10 12:02:02"})
}));
}
log.info("generate test data succeed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,31 @@ private static Object convert(Object field, SeaTunnelDataType<?> dataType) {
case DECIMAL:
return Decimal.apply((BigDecimal) field);
case ARRAY:
Class<?> elementTypeClass =
((ArrayType<?, ?>) dataType).getElementType().getTypeClass();

if (((ArrayType<?, ?>) dataType).getElementType() instanceof MapType) {
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
if (elementType instanceof MapType) {
Object arrayMap =
Array.newInstance(ArrayBasedMapData.class, ((Map[]) field).length);
for (int i = 0; i < ((Map[]) field).length; i++) {
Map<?, ?> value = (Map<?, ?>) ((Map[]) field)[i];
MapType<?, ?> type =
(MapType<?, ?>) ((ArrayType<?, ?>) dataType).getElementType();
Array.set(arrayMap, i, convertMap(value, type));
Array.set(arrayMap, i, convertMap(value, (MapType<?, ?>) elementType));
}
return ArrayData.toArrayData(arrayMap);
}
// if string array, we need to covert every item in array from String to UTF8String
if (((ArrayType<?, ?>) dataType).getElementType().equals(BasicType.STRING_TYPE)) {
if (elementType.equals(BasicType.STRING_TYPE)) {
Object[] fields = (Object[]) field;
UTF8String[] objects =
Arrays.stream(fields)
.map(v -> UTF8String.fromString((String) v))
.toArray(UTF8String[]::new);
return ArrayData.toArrayData(objects);
}
// except string, now only support convert boolean int tinyint smallint bigint float
// double, because SeaTunnel Array only support these types
Object array = Array.newInstance(elementTypeClass, ((Object[]) field).length);
for (int i = 0; i < ((Object[]) field).length; i++) {
Array.set(array, i, ((Object[]) field)[i]);

Object[] arrayData = (Object[]) field;
Object[] convertedArray = new Object[arrayData.length];
for (int i = 0; i < arrayData.length; i++) {
convertedArray[i] = convert(arrayData[i], elementType);
}
return ArrayData.toArrayData(field);
return ArrayData.toArrayData(convertedArray);
default:
if (field instanceof Some) {
return ((Some<?>) field).get();
Expand Down
Loading