Skip to content

Commit d6abee3

Browse files
committed
add iceberg for alink
1 parent 6c45c7c commit d6abee3

File tree

6 files changed

+63
-103
lines changed

6 files changed

+63
-103
lines changed

connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/iceberg/flink/InputOutputFormat.java renamed to connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
package org.apache.iceberg.flink;
1+
package org.apache.flink.iceberg;
22

33
import org.apache.flink.api.common.io.RichInputFormat;
44
import org.apache.flink.api.common.typeinfo.TypeInformation;
5-
import org.apache.flink.api.java.tuple.Tuple2;
5+
import org.apache.flink.api.java.tuple.Tuple3;
6+
import org.apache.flink.iceberg.source.FlinkInputFormat;
7+
import org.apache.flink.iceberg.source.FlinkSource;
8+
import org.apache.flink.iceberg.util.FlinkCompatibilityUtil;
69
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
import org.apache.flink.table.api.TableSchema;
711
import org.apache.flink.table.catalog.Catalog;
812
import org.apache.flink.table.catalog.ObjectPath;
913
import org.apache.flink.table.data.RowData;
1014
import org.apache.flink.util.Preconditions;
1115
import org.apache.iceberg.Schema;
1216
import org.apache.iceberg.Table;
13-
import org.apache.iceberg.flink.source.FlinkInputFormat;
14-
import org.apache.iceberg.flink.source.FlinkSource;
15-
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
1617

1718
import java.io.IOException;
1819
import java.io.UncheckedIOException;
1920

2021
public class InputOutputFormat {
2122

22-
public static Tuple2<TypeInformation<RowData>, RichInputFormat<RowData, ?>> createInputFormat(
23+
public static Tuple3<TableSchema, TypeInformation<RowData>, RichInputFormat<RowData, ?>> createInputFormat(
2324
StreamExecutionEnvironment execEnv, Catalog catalog, ObjectPath objectPath) {
2425

2526
if (!(catalog instanceof FlinkCatalog)) {
@@ -38,13 +39,15 @@ public class InputOutputFormat {
3839
throw new UncheckedIOException(e);
3940
}
4041

42+
TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema));
43+
4144
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(icebergSchema));
4245
FlinkInputFormat flinkInputFormat = FlinkSource.forRowData()
4346
.env(execEnv)
4447
.tableLoader(tableLoader)
4548
.table(table)
4649
.buildFormat();
47-
return Tuple2.of(typeInfo, flinkInputFormat);
50+
return Tuple3.of(tableSchema, typeInfo, flinkInputFormat);
4851
}
4952

5053
private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {

0 commit comments

Comments
 (0)