Skip to content

Commit 9422995

Browse files
authored
[core] Support writing into postpone bucket tables with table API (#5437)
1 parent 2cb9acc commit 9422995

File tree

15 files changed

+107
-200
lines changed

15 files changed

+107
-200
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,10 @@ protected AbstractFileStore(
121121

122122
@Override
123123
public FileStorePathFactory pathFactory() {
124-
return pathFactory(options.fileFormatString());
124+
return pathFactory(options, options.fileFormatString());
125125
}
126126

127-
protected FileStorePathFactory pathFactory(String format) {
127+
protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
128128
return new FileStorePathFactory(
129129
options.path(),
130130
partitionType,

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public AbstractFileStoreWrite<KeyValue> newWrite(
170170
return new PostponeBucketFileStoreWrite(
171171
fileIO,
172172
schema,
173+
commitUser,
173174
partitionType,
174175
keyType,
175176
valueType,

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,7 @@ protected AbstractFileStoreWrite(
9999
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
100100
String tableName,
101101
CoreOptions options,
102-
int numBuckets,
103-
RowType partitionType,
104-
int writerNumberMax,
105-
boolean legacyPartitionName) {
102+
RowType partitionType) {
106103
this.snapshotManager = snapshotManager;
107104
this.scan = scan;
108105
// Statistic is useless in writer
@@ -113,12 +110,12 @@ protected AbstractFileStoreWrite(
113110
}
114111
this.indexFactory = indexFactory;
115112
this.dvMaintainerFactory = dvMaintainerFactory;
116-
this.numBuckets = numBuckets;
113+
this.numBuckets = options.bucket();
117114
this.partitionType = partitionType;
118115
this.writers = new HashMap<>();
119116
this.tableName = tableName;
120-
this.writerNumberMax = writerNumberMax;
121-
this.legacyPartitionName = legacyPartitionName;
117+
this.writerNumberMax = options.writeMaxWritersToSpill();
118+
this.legacyPartitionName = options.legacyPartitionName();
122119
}
123120

124121
@Override

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.Comparator;
8383
import java.util.List;
8484
import java.util.concurrent.ExecutorService;
85+
import java.util.function.BiFunction;
8586
import java.util.function.Function;
8687
import java.util.function.Supplier;
8788

@@ -124,7 +125,7 @@ public KeyValueFileStoreWrite(
124125
Supplier<RecordEqualiser> logDedupEqualSupplier,
125126
MergeFunctionFactory<KeyValue> mfFactory,
126127
FileStorePathFactory pathFactory,
127-
Function<String, FileStorePathFactory> formatPathFactory,
128+
BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory,
128129
SnapshotManager snapshotManager,
129130
FileStoreScan scan,
130131
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,

paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,7 @@ public MemoryFileStoreWrite(
7474
dvMaintainerFactory,
7575
tableName,
7676
options,
77-
options.bucket(),
78-
partitionType,
79-
options.writeMaxWritersToSpill(),
80-
options.legacyPartitionName());
77+
partitionType);
8178
this.options = options;
8279
this.cacheManager =
8380
new CacheManager(

paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.operation.AbstractFileStoreWrite;
2929
import org.apache.paimon.operation.FileStoreScan;
3030
import org.apache.paimon.operation.FileStoreWrite;
31+
import org.apache.paimon.options.Options;
3132
import org.apache.paimon.schema.TableSchema;
3233
import org.apache.paimon.table.BucketMode;
3334
import org.apache.paimon.types.RowType;
@@ -40,6 +41,8 @@
4041

4142
import java.util.List;
4243
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.ThreadLocalRandom;
45+
import java.util.function.BiFunction;
4346
import java.util.function.Function;
4447

4548
import static org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories;
@@ -53,32 +56,31 @@ public class PostponeBucketFileStoreWrite extends AbstractFileStoreWrite<KeyValu
5356
public PostponeBucketFileStoreWrite(
5457
FileIO fileIO,
5558
TableSchema schema,
59+
String commitUser,
5660
RowType partitionType,
5761
RowType keyType,
5862
RowType valueType,
59-
Function<String, FileStorePathFactory> formatPathFactory,
63+
BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory,
6064
SnapshotManager snapshotManager,
6165
FileStoreScan scan,
6266
CoreOptions options,
6367
String tableName) {
64-
super(
65-
snapshotManager,
66-
scan,
67-
null,
68-
null,
69-
tableName,
70-
options,
71-
options.bucket(),
72-
partitionType,
73-
options.writeMaxWritersToSpill(),
74-
options.legacyPartitionName());
75-
76-
// copy options for postpone bucket
77-
this.options = new CoreOptions(options.toConfiguration().toMap());
68+
super(snapshotManager, scan, null, null, tableName, options, partitionType);
7869

70+
Options newOptions = new Options(options.toMap());
7971
// use avro for postpone bucket
80-
this.options.toConfiguration().set(CoreOptions.FILE_FORMAT, "avro");
81-
this.options.toConfiguration().set(CoreOptions.METADATA_STATS_MODE, "none");
72+
newOptions.set(CoreOptions.FILE_FORMAT, "avro");
73+
newOptions.set(CoreOptions.METADATA_STATS_MODE, "none");
74+
// each writer should have its unique prefix, so files from the same writer can be consumed
75+
// by the same compaction reader to keep the input order
76+
newOptions.set(
77+
CoreOptions.DATA_FILE_PREFIX,
78+
String.format(
79+
"%s-u-%s-s-%d-w-",
80+
options.dataFilePrefix(),
81+
commitUser,
82+
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)));
83+
this.options = new CoreOptions(newOptions);
8284

8385
this.writerFactoryBuilder =
8486
KeyValueFileWriterFactory.builder(
@@ -88,7 +90,7 @@ public PostponeBucketFileStoreWrite(
8890
valueType,
8991
this.options.fileFormat(),
9092
createFormatPathFactories(this.options, formatPathFactory),
91-
options.targetFileSize(true));
93+
this.options.targetFileSize(true));
9294

9395
// Ignoring previous files saves scanning time.
9496
//

paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.KeyValue;
2323
import org.apache.paimon.KeyValueFileStore;
24+
import org.apache.paimon.annotation.VisibleForTesting;
2425
import org.apache.paimon.fs.FileIO;
2526
import org.apache.paimon.fs.Path;
2627
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -32,6 +33,8 @@
3233
import org.apache.paimon.schema.KeyValueFieldsExtractor;
3334
import org.apache.paimon.schema.TableSchema;
3435
import org.apache.paimon.table.query.LocalTableQuery;
36+
import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
37+
import org.apache.paimon.table.sink.RowKeyExtractor;
3538
import org.apache.paimon.table.sink.TableWriteImpl;
3639
import org.apache.paimon.table.source.InnerTableRead;
3740
import org.apache.paimon.table.source.KeyValueTableRead;
@@ -55,6 +58,7 @@ public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
5558

5659
private transient KeyValueFileStore lazyStore;
5760

61+
@VisibleForTesting
5862
PrimaryKeyFileStoreTable(FileIO fileIO, Path path, TableSchema tableSchema) {
5963
this(fileIO, path, tableSchema, CatalogEnvironment.empty());
6064
}
@@ -133,7 +137,7 @@ protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
133137
splitAnd(predicate),
134138
tableSchema.fieldNames(),
135139
tableSchema.trimmedPrimaryKeys());
136-
if (keyFilters.size() > 0) {
140+
if (!keyFilters.isEmpty()) {
137141
((KeyValueFileStoreScan) scan).withKeyFilter(and(keyFilters));
138142
}
139143

@@ -185,4 +189,13 @@ protected Runnable newExpireRunnable() {
185189
return super.newExpireRunnable();
186190
}
187191
}
192+
193+
@Override
194+
public RowKeyExtractor createRowKeyExtractor() {
195+
if (coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
196+
return new PostponeBucketRowKeyExtractor(schema());
197+
} else {
198+
return super.createRowKeyExtractor();
199+
}
200+
}
188201
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.sink;
20+
21+
import org.apache.paimon.schema.TableSchema;
22+
import org.apache.paimon.table.BucketMode;
23+
24+
/**
25+
* {@link RowKeyExtractor} for postpone bucket tables. The bucket is always {@link
26+
* BucketMode#POSTPONE_BUCKET}.
27+
*/
28+
public class PostponeBucketRowKeyExtractor extends RowKeyExtractor {
29+
30+
public PostponeBucketRowKeyExtractor(TableSchema schema) {
31+
super(schema);
32+
}
33+
34+
@Override
35+
public int bucket() {
36+
return BucketMode.POSTPONE_BUCKET;
37+
}
38+
}

paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.util.Set;
3838
import java.util.UUID;
3939
import java.util.concurrent.atomic.AtomicInteger;
40-
import java.util.function.Function;
40+
import java.util.function.BiFunction;
4141
import java.util.stream.Collectors;
4242

4343
/** Factory which produces {@link Path}s for manifest files. */
@@ -298,11 +298,13 @@ public Path toPath(String fileName) {
298298
}
299299

300300
public static Map<String, FileStorePathFactory> createFormatPathFactories(
301-
CoreOptions options, Function<String, FileStorePathFactory> formatPathFactory) {
301+
CoreOptions options,
302+
BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory) {
302303
Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
303304
Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values());
304305
formats.add(options.fileFormatString());
305-
formats.forEach(format -> pathFactoryMap.put(format, formatPathFactory.apply(format)));
306+
formats.forEach(
307+
format -> pathFactoryMap.put(format, formatPathFactory.apply(options, format)));
306308
return pathFactoryMap;
307309
}
308310
}

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.io.BundleRecords;
3333
import org.apache.paimon.io.DataFileMeta;
3434
import org.apache.paimon.manifest.FileKind;
35+
import org.apache.paimon.manifest.ManifestEntry;
3536
import org.apache.paimon.manifest.ManifestFileMeta;
3637
import org.apache.paimon.operation.FileStoreScan;
3738
import org.apache.paimon.options.MemorySize;
@@ -130,19 +131,23 @@ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase {
130131

131132
@Test
132133
public void testPostponeBucket() throws Exception {
133-
FileStoreTable table = createFileStoreTable(options -> options.set(BUCKET, -2));
134+
FileStoreTable table =
135+
createFileStoreTable(options -> options.set(BUCKET, BucketMode.POSTPONE_BUCKET));
134136

135137
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
136138
try (BatchTableWrite write = writeBuilder.newWrite();
137139
BatchTableCommit commit = writeBuilder.newCommit()) {
138-
write.write(rowData(0, 0, 0L), BucketMode.POSTPONE_BUCKET);
140+
write.write(rowData(0, 0, 0L));
139141
commit.commit(write.prepareCommit());
140142
}
141143

142144
Snapshot snapshot = table.latestSnapshot().get();
143145
ManifestFileMeta manifest =
144146
table.manifestListReader().read(snapshot.deltaManifestList()).get(0);
145-
DataFileMeta file = table.manifestFileReader().read(manifest.fileName()).get(0).file();
147+
ManifestEntry entry = table.manifestFileReader().read(manifest.fileName()).get(0);
148+
assertThat(entry.bucket()).isEqualTo(BucketMode.POSTPONE_BUCKET);
149+
150+
DataFileMeta file = entry.file();
146151
assertThat(file.fileName()).endsWith(".avro");
147152
assertThat(file.level()).isEqualTo(0);
148153
assertThat(file.valueStatsCols()).isEmpty();

0 commit comments

Comments
 (0)