Skip to content

Commit b2fa4a9

Browse files
[lake/iceberg] Support tier array type for iceberg (#2266)
1 parent 3d46efa commit b2fa4a9

File tree

11 files changed

+699
-11
lines changed

11 files changed

+699
-11
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,30 @@ public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {
4747
public static final FlussDataTypeToIcebergDataType INSTANCE =
4848
new FlussDataTypeToIcebergDataType();
4949

50+
private final RowType root;
51+
private int nextId;
52+
53+
FlussDataTypeToIcebergDataType() {
54+
this.root = null;
55+
this.nextId = 0;
56+
}
57+
58+
FlussDataTypeToIcebergDataType(int startId) {
59+
this.root = null;
60+
this.nextId = startId;
61+
}
62+
63+
FlussDataTypeToIcebergDataType(RowType root) {
64+
this.root = root;
65+
this.nextId = root.getFieldCount();
66+
}
67+
68+
private int getNextId() {
69+
int next = nextId;
70+
nextId += 1;
71+
return next;
72+
}
73+
5074
@Override
5175
public Type visit(CharType charType) {
5276
return Types.StringType.get();
@@ -129,7 +153,12 @@ public Type visit(LocalZonedTimestampType localZonedTimestampType) {
129153

130154
@Override
131155
public Type visit(ArrayType arrayType) {
132-
throw new UnsupportedOperationException("Unsupported array type");
156+
Type elementType = arrayType.getElementType().accept(this);
157+
if (arrayType.getElementType().isNullable()) {
158+
return Types.ListType.ofOptional(getNextId(), elementType);
159+
} else {
160+
return Types.ListType.ofRequired(getNextId(), elementType);
161+
}
133162
}
134163

135164
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is
178178
List<Types.NestedField> fields = new ArrayList<>();
179179
int fieldId = 0;
180180

181+
int totalTopLevelFields =
182+
tableDescriptor.getSchema().getColumns().size() + SYSTEM_COLUMNS.size();
183+
FlussDataTypeToIcebergDataType converter =
184+
new FlussDataTypeToIcebergDataType(totalTopLevelFields);
185+
181186
// general columns
182187
for (org.apache.fluss.metadata.Schema.Column column :
183188
tableDescriptor.getSchema().getColumns()) {
@@ -192,16 +197,14 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is
192197
Types.NestedField.optional(
193198
fieldId++,
194199
colName,
195-
column.getDataType()
196-
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
200+
column.getDataType().accept(converter),
197201
column.getComment().orElse(null));
198202
} else {
199203
field =
200204
Types.NestedField.required(
201205
fieldId++,
202206
colName,
203-
column.getDataType()
204-
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
207+
column.getDataType().accept(converter),
205208
column.getComment().orElse(null));
206209
}
207210
fields.add(field);
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.iceberg.source;
19+
20+
import org.apache.fluss.row.InternalArray;
21+
import org.apache.fluss.types.ArrayType;
22+
import org.apache.fluss.types.BigIntType;
23+
import org.apache.fluss.types.BinaryType;
24+
import org.apache.fluss.types.BooleanType;
25+
import org.apache.fluss.types.BytesType;
26+
import org.apache.fluss.types.CharType;
27+
import org.apache.fluss.types.DataType;
28+
import org.apache.fluss.types.DateType;
29+
import org.apache.fluss.types.DecimalType;
30+
import org.apache.fluss.types.DoubleType;
31+
import org.apache.fluss.types.FloatType;
32+
import org.apache.fluss.types.IntType;
33+
import org.apache.fluss.types.LocalZonedTimestampType;
34+
import org.apache.fluss.types.SmallIntType;
35+
import org.apache.fluss.types.StringType;
36+
import org.apache.fluss.types.TimeType;
37+
import org.apache.fluss.types.TimestampType;
38+
import org.apache.fluss.types.TinyIntType;
39+
import org.apache.fluss.utils.DateTimeUtils;
40+
41+
import java.nio.ByteBuffer;
42+
import java.time.Instant;
43+
import java.time.OffsetDateTime;
44+
import java.time.ZoneOffset;
45+
import java.util.AbstractList;
46+
47+
/** Adapter class for converting Fluss InternalArray to a Java List for Iceberg. */
48+
public class FlussArrayAsIcebergList extends AbstractList<Object> {
49+
50+
private final InternalArray flussArray;
51+
private final DataType elementType;
52+
53+
public FlussArrayAsIcebergList(InternalArray flussArray, DataType elementType) {
54+
this.flussArray = flussArray;
55+
this.elementType = elementType;
56+
}
57+
58+
@Override
59+
public Object get(int index) {
60+
if (flussArray.isNullAt(index)) {
61+
return null;
62+
}
63+
64+
if (elementType instanceof BooleanType) {
65+
return flussArray.getBoolean(index);
66+
} else if (elementType instanceof TinyIntType) {
67+
return (int) flussArray.getByte(index);
68+
} else if (elementType instanceof SmallIntType) {
69+
return (int) flussArray.getShort(index);
70+
} else if (elementType instanceof IntType) {
71+
return flussArray.getInt(index);
72+
} else if (elementType instanceof BigIntType) {
73+
return flussArray.getLong(index);
74+
} else if (elementType instanceof FloatType) {
75+
return flussArray.getFloat(index);
76+
} else if (elementType instanceof DoubleType) {
77+
return flussArray.getDouble(index);
78+
} else if (elementType instanceof StringType) {
79+
return flussArray.getString(index).toString();
80+
} else if (elementType instanceof CharType) {
81+
CharType charType = (CharType) elementType;
82+
return flussArray.getChar(index, charType.getLength()).toString();
83+
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
84+
return ByteBuffer.wrap(flussArray.getBytes(index));
85+
} else if (elementType instanceof DecimalType) {
86+
DecimalType decimalType = (DecimalType) elementType;
87+
return flussArray
88+
.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
89+
.toBigDecimal();
90+
} else if (elementType instanceof LocalZonedTimestampType) {
91+
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
92+
return toIcebergTimestampLtz(
93+
flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
94+
} else if (elementType instanceof TimestampType) {
95+
TimestampType tsType = (TimestampType) elementType;
96+
return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
97+
} else if (elementType instanceof DateType) {
98+
return DateTimeUtils.toLocalDate(flussArray.getInt(index));
99+
} else if (elementType instanceof TimeType) {
100+
return DateTimeUtils.toLocalTime(flussArray.getInt(index));
101+
} else if (elementType instanceof ArrayType) {
102+
InternalArray innerArray = flussArray.getArray(index);
103+
return innerArray == null
104+
? null
105+
: new FlussArrayAsIcebergList(
106+
innerArray, ((ArrayType) elementType).getElementType());
107+
} else {
108+
throw new UnsupportedOperationException(
109+
"Unsupported array element type conversion for Fluss type: "
110+
+ elementType.getClass().getSimpleName());
111+
}
112+
}
113+
114+
@Override
115+
public int size() {
116+
return flussArray.size();
117+
}
118+
119+
private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
120+
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
121+
}
122+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.fluss.lake.iceberg.source;
1919

20+
import org.apache.fluss.row.InternalArray;
2021
import org.apache.fluss.row.InternalRow;
22+
import org.apache.fluss.types.ArrayType;
2123
import org.apache.fluss.types.BigIntType;
2224
import org.apache.fluss.types.BinaryType;
2325
import org.apache.fluss.types.BooleanType;
@@ -169,6 +171,14 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType,
169171
return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
170172
} else if (flussType instanceof TimeType) {
171173
return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
174+
} else if (flussType instanceof ArrayType) {
175+
ArrayType arrayType = (ArrayType) flussType;
176+
return row -> {
177+
InternalArray array = row.getArray(pos);
178+
return array == null
179+
? null
180+
: new FlussArrayAsIcebergList(array, arrayType.getElementType());
181+
};
172182
} else {
173183
throw new UnsupportedOperationException(
174184
"Unsupported data type conversion for Fluss type: "

0 commit comments

Comments
 (0)