Skip to content

Commit 9c503da

Browse files
authored
[core] Introduce Table.fileIO interface to public fileIO to file operation (#5102)
1 parent ef16f2d commit 9c503da

21 files changed

+130
-8
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public static Table loadTable(
185185
TableSchema schema = metadata.schema();
186186
CoreOptions options = CoreOptions.fromMap(schema.options());
187187
if (options.type() == TableType.FORMAT_TABLE) {
188-
return toFormatTable(identifier, schema);
188+
return toFormatTable(identifier, schema, dataFileIO);
189189
}
190190

191191
CatalogEnvironment catalogEnv =
@@ -249,7 +249,8 @@ private static Table createSystemTable(Identifier identifier, Table originTable)
249249
return table;
250250
}
251251

252-
private static FormatTable toFormatTable(Identifier identifier, TableSchema schema) {
252+
private static FormatTable toFormatTable(
253+
Identifier identifier, TableSchema schema, Function<Path, FileIO> fileIO) {
253254
Map<String, String> options = schema.options();
254255
FormatTable.Format format =
255256
FormatTable.parseFormat(
@@ -258,6 +259,7 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche
258259
CoreOptions.FILE_FORMAT.defaultValue()));
259260
String location = options.get(CoreOptions.PATH.key());
260261
return FormatTable.builder()
262+
.fileIO(fileIO.apply(new Path(location)))
261263
.identifier(identifier)
262264
.rowType(schema.logicalRowType())
263265
.partitionKeys(schema.partitionKeys())

paimon-core/src/main/java/org/apache/paimon/table/DataTable.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.paimon.table;
2020

2121
import org.apache.paimon.CoreOptions;
22-
import org.apache.paimon.fs.FileIO;
2322
import org.apache.paimon.fs.Path;
2423
import org.apache.paimon.schema.SchemaManager;
2524
import org.apache.paimon.table.source.DataTableScan;
@@ -53,6 +52,4 @@ public interface DataTable extends InnerTable {
5352
DataTable switchToBranch(String branchName);
5453

5554
Path location();
56-
57-
FileIO fileIO();
5855
}

paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.annotation.Public;
2323
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.fs.FileIO;
2425
import org.apache.paimon.manifest.IndexManifestEntry;
2526
import org.apache.paimon.manifest.ManifestEntry;
2627
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -92,6 +93,7 @@ static FormatTable.Builder builder() {
9293
/** Builder for {@link FormatTable}. */
9394
class Builder {
9495

96+
private FileIO fileIO;
9597
private Identifier identifier;
9698
private RowType rowType;
9799
private List<String> partitionKeys;
@@ -100,6 +102,11 @@ class Builder {
100102
private Map<String, String> options;
101103
@Nullable private String comment;
102104

105+
public FormatTable.Builder fileIO(FileIO fileIO) {
106+
this.fileIO = fileIO;
107+
return this;
108+
}
109+
103110
public FormatTable.Builder identifier(Identifier identifier) {
104111
this.identifier = identifier;
105112
return this;
@@ -137,13 +144,14 @@ public FormatTable.Builder comment(@Nullable String comment) {
137144

138145
public FormatTable build() {
139146
return new FormatTable.FormatTableImpl(
140-
identifier, rowType, partitionKeys, location, format, options, comment);
147+
fileIO, identifier, rowType, partitionKeys, location, format, options, comment);
141148
}
142149
}
143150

144151
/** An implementation for {@link FormatTable}. */
145152
class FormatTableImpl implements FormatTable {
146153

154+
private final FileIO fileIO;
147155
private final Identifier identifier;
148156
private final RowType rowType;
149157
private final List<String> partitionKeys;
@@ -153,13 +161,15 @@ class FormatTableImpl implements FormatTable {
153161
@Nullable private final String comment;
154162

155163
public FormatTableImpl(
164+
FileIO fileIO,
156165
Identifier identifier,
157166
RowType rowType,
158167
List<String> partitionKeys,
159168
String location,
160169
Format format,
161170
Map<String, String> options,
162171
@Nullable String comment) {
172+
this.fileIO = fileIO;
163173
this.identifier = identifier;
164174
this.rowType = rowType;
165175
this.partitionKeys = partitionKeys;
@@ -214,12 +224,24 @@ public Optional<String> comment() {
214224
return Optional.ofNullable(comment);
215225
}
216226

227+
@Override
228+
public FileIO fileIO() {
229+
return fileIO;
230+
}
231+
217232
@Override
218233
public FormatTable copy(Map<String, String> dynamicOptions) {
219234
Map<String, String> newOptions = new HashMap<>(options);
220235
newOptions.putAll(dynamicOptions);
221236
return new FormatTableImpl(
222-
identifier, rowType, partitionKeys, location, format, newOptions, comment);
237+
fileIO,
238+
identifier,
239+
rowType,
240+
partitionKeys,
241+
location,
242+
format,
243+
newOptions,
244+
comment);
223245
}
224246
}
225247

paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.table;
2020

21+
import org.apache.paimon.fs.FileIO;
2122
import org.apache.paimon.table.source.DataSplit;
2223
import org.apache.paimon.table.source.InnerTableRead;
2324
import org.apache.paimon.table.source.InnerTableScan;
@@ -30,6 +31,7 @@
3031
* A table to hold some known data splits. For now, it is only used by internal for Spark engine.
3132
*/
3233
public class KnownSplitsTable implements ReadonlyTable {
34+
3335
private final InnerTable origin;
3436
private final DataSplit[] splits;
3537

@@ -71,6 +73,11 @@ public Map<String, String> options() {
7173
return origin.options();
7274
}
7375

76+
@Override
77+
public FileIO fileIO() {
78+
return origin.fileIO();
79+
}
80+
7481
@Override
7582
public InnerTableRead newRead() {
7683
return origin.newRead();

paimon-core/src/main/java/org/apache/paimon/table/Table.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.annotation.Experimental;
2323
import org.apache.paimon.annotation.Public;
24+
import org.apache.paimon.fs.FileIO;
2425
import org.apache.paimon.manifest.IndexManifestEntry;
2526
import org.apache.paimon.manifest.ManifestEntry;
2627
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -86,6 +87,9 @@ default String uuid() {
8687

8788
// ================= Table Operations ====================
8889

90+
/** File io of this table. */
91+
FileIO fileIO();
92+
8993
/** Copy this table with adding dynamic options. */
9094
Table copy(Map<String, String> dynamicOptions);
9195

paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ public List<String> primaryKeys() {
100100
return Collections.singletonList("field_name");
101101
}
102102

103+
@Override
104+
public FileIO fileIO() {
105+
return fileIO;
106+
}
107+
103108
@Override
104109
public InnerTableScan newScan() {
105110
return new SchemasScan();

paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.paimon.data.GenericRow;
2424
import org.apache.paimon.data.InternalRow;
2525
import org.apache.paimon.disk.IOManager;
26+
import org.apache.paimon.fs.FileIO;
27+
import org.apache.paimon.fs.local.LocalFileIO;
2628
import org.apache.paimon.predicate.Predicate;
2729
import org.apache.paimon.reader.RecordReader;
2830
import org.apache.paimon.table.ReadonlyTable;
@@ -91,6 +93,12 @@ public List<String> primaryKeys() {
9193
return Collections.singletonList("table_name");
9294
}
9395

96+
@Override
97+
public FileIO fileIO() {
98+
// pass a useless file io, should never use this.
99+
return new LocalFileIO();
100+
}
101+
94102
@Override
95103
public InnerTableScan newScan() {
96104
return new AllTableOptionsScan();

paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public List<String> primaryKeys() {
104104
return Collections.singletonList("branch_name");
105105
}
106106

107+
@Override
108+
public FileIO fileIO() {
109+
return dataTable.fileIO();
110+
}
111+
107112
@Override
108113
public InnerTableScan newScan() {
109114
return new BranchesScan();

paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.data.Timestamp;
2727
import org.apache.paimon.disk.IOManager;
28+
import org.apache.paimon.fs.FileIO;
2829
import org.apache.paimon.manifest.BucketEntry;
2930
import org.apache.paimon.predicate.Predicate;
3031
import org.apache.paimon.reader.RecordReader;
@@ -97,6 +98,11 @@ public List<String> primaryKeys() {
9798
return Arrays.asList("partition", "bucket");
9899
}
99100

101+
@Override
102+
public FileIO fileIO() {
103+
return storeTable.fileIO();
104+
}
105+
100106
@Override
101107
public InnerTableScan newScan() {
102108
return new BucketsScan();

paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.paimon.data.GenericRow;
2323
import org.apache.paimon.data.InternalRow;
2424
import org.apache.paimon.disk.IOManager;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.local.LocalFileIO;
2527
import org.apache.paimon.options.Options;
2628
import org.apache.paimon.predicate.Predicate;
2729
import org.apache.paimon.reader.RecordReader;
@@ -93,6 +95,12 @@ public List<String> primaryKeys() {
9395
return Collections.singletonList("key");
9496
}
9597

98+
@Override
99+
public FileIO fileIO() {
100+
// pass a useless file io, should never use this.
101+
return new LocalFileIO();
102+
}
103+
96104
@Override
97105
public Table copy(Map<String, String> dynamicOptions) {
98106
return new CatalogOptionsTable(catalogOptions);

0 commit comments

Comments
 (0)