Skip to content

Commit 68b7a2a

Browse files
authored
Core, Data, Flink: Moving Flink to use the new FormatModel API (apache#15329)
1 parent bfcb979 commit 68b7a2a

File tree

6 files changed

+104
-183
lines changed

6 files changed

+104
-183
lines changed

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ private FormatModelRegistry() {}
5757
private static final List<String> CLASSES_TO_REGISTER =
5858
ImmutableList.of(
5959
"org.apache.iceberg.data.GenericFormatModels",
60-
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels");
60+
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
61+
"org.apache.iceberg.flink.data.FlinkFormatModels");
6162

6263
// Format models indexed by file format and object model class
6364
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.data;
20+
21+
import org.apache.flink.table.data.RowData;
22+
import org.apache.flink.table.types.logical.RowType;
23+
import org.apache.iceberg.avro.AvroFormatModel;
24+
import org.apache.iceberg.formats.FormatModelRegistry;
25+
import org.apache.iceberg.orc.ORCFormatModel;
26+
import org.apache.iceberg.parquet.ParquetFormatModel;
27+
28+
public class FlinkFormatModels {
29+
public static void register() {
30+
FormatModelRegistry.register(
31+
ParquetFormatModel.create(
32+
RowData.class,
33+
RowType.class,
34+
(icebergSchema, fileSchema, engineSchema) ->
35+
FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
36+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
37+
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
38+
39+
FormatModelRegistry.register(
40+
AvroFormatModel.create(
41+
RowData.class,
42+
RowType.class,
43+
(icebergSchema, fileSchema, engineSchema) -> new FlinkAvroWriter(engineSchema),
44+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
45+
FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
46+
47+
FormatModelRegistry.register(
48+
ORCFormatModel.create(
49+
RowData.class,
50+
RowType.class,
51+
(icebergSchema, fileSchema, engineSchema) ->
52+
FlinkOrcWriter.buildWriter(engineSchema, icebergSchema),
53+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
54+
new FlinkOrcReader(icebergSchema, fileSchema, idToConstant)));
55+
}
56+
57+
private FlinkFormatModels() {}
58+
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java

Lines changed: 17 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,19 @@
2525
import java.io.Serializable;
2626
import java.util.Map;
2727
import org.apache.flink.table.data.RowData;
28-
import org.apache.flink.table.data.StringData;
2928
import org.apache.flink.table.types.logical.RowType;
3029
import org.apache.iceberg.FileFormat;
3130
import org.apache.iceberg.Schema;
3231
import org.apache.iceberg.SortOrder;
3332
import org.apache.iceberg.Table;
34-
import org.apache.iceberg.avro.Avro;
35-
import org.apache.iceberg.data.BaseFileWriterFactory;
33+
import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
3634
import org.apache.iceberg.flink.FlinkSchemaUtil;
37-
import org.apache.iceberg.flink.data.FlinkAvroWriter;
38-
import org.apache.iceberg.flink.data.FlinkOrcWriter;
39-
import org.apache.iceberg.flink.data.FlinkParquetWriters;
40-
import org.apache.iceberg.orc.ORC;
41-
import org.apache.iceberg.parquet.Parquet;
4235
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4336
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4437

45-
public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> implements Serializable {
46-
private RowType dataFlinkType;
47-
private RowType equalityDeleteFlinkType;
48-
49-
private FlinkFileWriterFactory(
38+
public class FlinkFileWriterFactory extends RegistryBasedFileWriterFactory<RowData, RowType>
39+
implements Serializable {
40+
FlinkFileWriterFactory(
5041
Table table,
5142
FileFormat dataFileFormat,
5243
Schema dataSchema,
@@ -62,85 +53,30 @@ private FlinkFileWriterFactory(
6253
super(
6354
table,
6455
dataFileFormat,
56+
RowData.class,
6557
dataSchema,
6658
dataSortOrder,
6759
deleteFileFormat,
6860
equalityFieldIds,
6961
equalityDeleteRowSchema,
7062
equalityDeleteSortOrder,
71-
writeProperties);
72-
73-
this.dataFlinkType = dataFlinkType;
74-
this.equalityDeleteFlinkType = equalityDeleteFlinkType;
75-
}
76-
77-
static Builder builderFor(Table table) {
78-
return new Builder(table);
79-
}
80-
81-
@Override
82-
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
83-
builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType()));
84-
}
85-
86-
@Override
87-
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
88-
builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteFlinkType()));
89-
}
90-
91-
@Override
92-
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
93-
94-
@Override
95-
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
96-
builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(dataFlinkType(), msgType));
97-
}
98-
99-
@Override
100-
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
101-
builder.createWriterFunc(
102-
msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), msgType));
103-
}
104-
105-
@Override
106-
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
107-
builder.transformPaths(path -> StringData.fromString(path.toString()));
108-
}
109-
110-
@Override
111-
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
112-
builder.createWriterFunc(
113-
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema));
114-
}
115-
116-
@Override
117-
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
118-
builder.createWriterFunc(
119-
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
63+
writeProperties,
64+
dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : dataFlinkType,
65+
equalityDeleteInputSchema(equalityDeleteFlinkType, equalityDeleteRowSchema));
12066
}
12167

122-
@Override
123-
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
124-
builder.transformPaths(path -> StringData.fromString(path.toString()));
125-
}
126-
127-
private RowType dataFlinkType() {
128-
if (dataFlinkType == null) {
129-
Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
130-
this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema());
68+
private static RowType equalityDeleteInputSchema(RowType rowType, Schema rowSchema) {
69+
if (rowType != null) {
70+
return rowType;
71+
} else if (rowSchema != null) {
72+
return FlinkSchemaUtil.convert(rowSchema);
73+
} else {
74+
return null;
13175
}
132-
133-
return dataFlinkType;
13476
}
13577

136-
private RowType equalityDeleteFlinkType() {
137-
if (equalityDeleteFlinkType == null) {
138-
Preconditions.checkNotNull(
139-
equalityDeleteRowSchema(), "Equality delete schema must not be null");
140-
this.equalityDeleteFlinkType = FlinkSchemaUtil.convert(equalityDeleteRowSchema());
141-
}
142-
143-
return equalityDeleteFlinkType;
78+
static Builder builderFor(Table table) {
79+
return new Builder(table);
14480
}
14581

14682
public static class Builder {

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java

Lines changed: 19 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,23 @@
2424
import org.apache.flink.table.data.RowData;
2525
import org.apache.flink.table.types.logical.RowType;
2626
import org.apache.iceberg.FileScanTask;
27-
import org.apache.iceberg.MetadataColumns;
2827
import org.apache.iceberg.Schema;
2928
import org.apache.iceberg.StructLike;
30-
import org.apache.iceberg.avro.Avro;
3129
import org.apache.iceberg.data.DeleteFilter;
3230
import org.apache.iceberg.encryption.InputFilesDecryptor;
3331
import org.apache.iceberg.expressions.Expression;
3432
import org.apache.iceberg.expressions.Expressions;
3533
import org.apache.iceberg.flink.FlinkSchemaUtil;
3634
import org.apache.iceberg.flink.FlinkSourceFilter;
3735
import org.apache.iceberg.flink.RowDataWrapper;
38-
import org.apache.iceberg.flink.data.FlinkOrcReader;
39-
import org.apache.iceberg.flink.data.FlinkParquetReaders;
40-
import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
4136
import org.apache.iceberg.flink.data.RowDataProjection;
4237
import org.apache.iceberg.flink.data.RowDataUtil;
38+
import org.apache.iceberg.formats.FormatModelRegistry;
39+
import org.apache.iceberg.formats.ReadBuilder;
4340
import org.apache.iceberg.io.CloseableIterable;
4441
import org.apache.iceberg.io.CloseableIterator;
4542
import org.apache.iceberg.io.InputFile;
4643
import org.apache.iceberg.mapping.NameMappingParser;
47-
import org.apache.iceberg.orc.ORC;
48-
import org.apache.iceberg.parquet.Parquet;
49-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
50-
import org.apache.iceberg.types.TypeUtil;
5144
import org.apache.iceberg.util.PartitionUtil;
5245

5346
@Internal
@@ -73,8 +66,7 @@ public RowDataFileScanTaskReader(
7366
if (filters != null && !filters.isEmpty()) {
7467
Expression combinedExpression =
7568
filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
76-
this.rowFilter =
77-
new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive);
69+
this.rowFilter = new FlinkSourceFilter(projectedSchema, combinedExpression, caseSensitive);
7870
} else {
7971
this.rowFilter = null;
8072
}
@@ -112,23 +104,23 @@ private CloseableIterable<RowData> newIterable(
112104
if (task.isDataTask()) {
113105
throw new UnsupportedOperationException("Cannot read data task.");
114106
} else {
115-
switch (task.file().format()) {
116-
case PARQUET:
117-
iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor);
118-
break;
119-
120-
case AVRO:
121-
iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor);
122-
break;
123-
124-
case ORC:
125-
iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor);
126-
break;
127-
128-
default:
129-
throw new UnsupportedOperationException(
130-
"Cannot read unknown format: " + task.file().format());
107+
ReadBuilder<RowData, RowType> builder =
108+
FormatModelRegistry.readBuilder(
109+
task.file().format(), RowData.class, inputFilesDecryptor.getInputFile(task));
110+
111+
if (nameMapping != null) {
112+
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
131113
}
114+
115+
iter =
116+
builder
117+
.project(schema)
118+
.idToConstant(idToConstant)
119+
.split(task.start(), task.length())
120+
.caseSensitive(caseSensitive)
121+
.filter(task.residual())
122+
.reuseContainers()
123+
.build();
132124
}
133125

134126
if (rowFilter != null) {
@@ -137,72 +129,6 @@ private CloseableIterable<RowData> newIterable(
137129
return iter;
138130
}
139131

140-
private CloseableIterable<RowData> newAvroIterable(
141-
FileScanTask task,
142-
Schema schema,
143-
Map<Integer, ?> idToConstant,
144-
InputFilesDecryptor inputFilesDecryptor) {
145-
Avro.ReadBuilder builder =
146-
Avro.read(inputFilesDecryptor.getInputFile(task))
147-
.reuseContainers()
148-
.project(schema)
149-
.split(task.start(), task.length())
150-
.createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant));
151-
152-
if (nameMapping != null) {
153-
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
154-
}
155-
156-
return builder.build();
157-
}
158-
159-
private CloseableIterable<RowData> newParquetIterable(
160-
FileScanTask task,
161-
Schema schema,
162-
Map<Integer, ?> idToConstant,
163-
InputFilesDecryptor inputFilesDecryptor) {
164-
Parquet.ReadBuilder builder =
165-
Parquet.read(inputFilesDecryptor.getInputFile(task))
166-
.split(task.start(), task.length())
167-
.project(schema)
168-
.createReaderFunc(
169-
fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema, idToConstant))
170-
.filter(task.residual())
171-
.caseSensitive(caseSensitive)
172-
.reuseContainers();
173-
174-
if (nameMapping != null) {
175-
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
176-
}
177-
178-
return builder.build();
179-
}
180-
181-
private CloseableIterable<RowData> newOrcIterable(
182-
FileScanTask task,
183-
Schema schema,
184-
Map<Integer, ?> idToConstant,
185-
InputFilesDecryptor inputFilesDecryptor) {
186-
Schema readSchemaWithoutConstantAndMetadataFields =
187-
TypeUtil.selectNot(
188-
schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
189-
190-
ORC.ReadBuilder builder =
191-
ORC.read(inputFilesDecryptor.getInputFile(task))
192-
.project(readSchemaWithoutConstantAndMetadataFields)
193-
.split(task.start(), task.length())
194-
.createReaderFunc(
195-
readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant))
196-
.filter(task.residual())
197-
.caseSensitive(caseSensitive);
198-
199-
if (nameMapping != null) {
200-
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
201-
}
202-
203-
return builder.build();
204-
}
205-
206132
private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
207133
private final RowType requiredRowType;
208134
private final RowDataWrapper asStructLike;

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.iceberg.Table;
3636
import org.apache.iceberg.TableProperties;
3737
import org.apache.iceberg.common.DynFields;
38-
import org.apache.iceberg.data.BaseFileWriterFactory;
38+
import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
3939
import org.apache.iceberg.flink.FlinkWriteConf;
4040
import org.apache.iceberg.flink.FlinkWriteOptions;
4141
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -238,21 +238,21 @@ private static Map<String, String> appenderProperties(
238238
testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
239239

240240
testHarness.prepareSnapshotPreBarrier(1L);
241-
DynFields.BoundField<IcebergStreamWriter> operatorField =
241+
DynFields.BoundField<IcebergStreamWriter<?>> operatorField =
242242
DynFields.builder()
243243
.hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator")
244244
.build(testHarness.getOperatorFactory());
245-
DynFields.BoundField<TaskWriter> writerField =
245+
DynFields.BoundField<TaskWriter<?>> writerField =
246246
DynFields.builder()
247247
.hiddenImpl(IcebergStreamWriter.class, "writer")
248248
.build(operatorField.get());
249-
DynFields.BoundField<FileWriterFactory> writerFactoryField =
249+
DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
250250
DynFields.builder()
251251
.hiddenImpl(BaseTaskWriter.class, "writerFactory")
252252
.build(writerField.get());
253253
DynFields.BoundField<Map<String, String>> propsField =
254254
DynFields.builder()
255-
.hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
255+
.hiddenImpl(RegistryBasedFileWriterFactory.class, "writerProperties")
256256
.build(writerFactoryField.get());
257257
return propsField.get();
258258
}

0 commit comments

Comments
 (0)