Skip to content

Commit e6c1ad7

Browse files
committed
[core] Refactor ExternalPathProviders abstraction
1 parent 55e4d28 commit e6c1ad7

File tree

6 files changed

+112
-92
lines changed

6 files changed

+112
-92
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import java.util.List;
2727

2828
/** Provider for entropy inject external data paths. */
29-
public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
29+
public class EntropyInjectExternalPathProvider implements ExternalPathProvider {
3030

3131
private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
3232
private static final int HASH_BINARY_STRING_BITS = 20;
@@ -35,9 +35,15 @@ public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
3535
// Will create DEPTH many dirs from the entropy
3636
private static final int ENTROPY_DIR_DEPTH = 3;
3737

38+
private final List<Path> externalTablePaths;
39+
private final Path relativeBucketPath;
40+
41+
private int position;
42+
3843
public EntropyInjectExternalPathProvider(
3944
List<Path> externalTablePaths, Path relativeBucketPath) {
40-
super(externalTablePaths, relativeBucketPath);
45+
this.externalTablePaths = externalTablePaths;
46+
this.relativeBucketPath = relativeBucketPath;
4147
}
4248

4349
@Override

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,34 @@
1818

1919
package org.apache.paimon.fs;
2020

21+
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
22+
23+
import javax.annotation.Nullable;
24+
2125
import java.io.Serializable;
2226
import java.util.List;
23-
import java.util.concurrent.ThreadLocalRandom;
2427

2528
/** Provider for external data paths. */
26-
public class ExternalPathProvider implements Serializable {
27-
28-
protected final List<Path> externalTablePaths;
29-
protected final Path relativeBucketPath;
29+
public interface ExternalPathProvider extends Serializable {
3030

31-
protected int position;
32-
33-
public ExternalPathProvider(List<Path> externalTablePaths, Path relativeBucketPath) {
34-
this.externalTablePaths = externalTablePaths;
35-
this.relativeBucketPath = relativeBucketPath;
36-
this.position = ThreadLocalRandom.current().nextInt(externalTablePaths.size());
37-
}
31+
Path getNextExternalDataPath(String fileName);
3832

39-
/**
40-
* Get the next external data path.
41-
*
42-
* @return the next external data path
43-
*/
44-
public Path getNextExternalDataPath(String fileName) {
45-
position++;
46-
if (position == externalTablePaths.size()) {
47-
position = 0;
33+
@Nullable
34+
static ExternalPathProvider create(
35+
ExternalPathStrategy strategy, List<Path> externalTablePaths, Path relativeBucketPath) {
36+
switch (strategy) {
37+
case ENTROPY_INJECT:
38+
return new EntropyInjectExternalPathProvider(
39+
externalTablePaths, relativeBucketPath);
40+
case SPECIFIC_FS:
41+
// specific fs can use round-robin with only one path
42+
case ROUND_ROBIN:
43+
return new RoundRobinExternalPathProvider(externalTablePaths, relativeBucketPath);
44+
case NONE:
45+
return null;
46+
default:
47+
throw new UnsupportedOperationException(
48+
"Cannot support create external path provider for: " + strategy);
4849
}
49-
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
5050
}
5151
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs;
20+
21+
import java.util.List;
22+
import java.util.concurrent.ThreadLocalRandom;
23+
24+
/** Provider for the round-robin external data paths. */
25+
public class RoundRobinExternalPathProvider implements ExternalPathProvider {
26+
27+
private final List<Path> externalTablePaths;
28+
private final Path relativeBucketPath;
29+
30+
private int position;
31+
32+
public RoundRobinExternalPathProvider(List<Path> externalTablePaths, Path relativeBucketPath) {
33+
this.externalTablePaths = externalTablePaths;
34+
this.relativeBucketPath = relativeBucketPath;
35+
this.position = ThreadLocalRandom.current().nextInt(externalTablePaths.size());
36+
}
37+
38+
/**
39+
* Get the next external data path.
40+
*
41+
* @return the next external data path
42+
*/
43+
@Override
44+
public Path getNextExternalDataPath(String fileName) {
45+
position++;
46+
if (position == externalTablePaths.size()) {
47+
position = 0;
48+
}
49+
return new Path(new Path(externalTablePaths.get(position), relativeBucketPath), fileName);
50+
}
51+
}

paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.paimon.io;
2020

21-
import org.apache.paimon.fs.ExternalPathProvider;
2221
import org.apache.paimon.fs.Path;
22+
import org.apache.paimon.fs.RoundRobinExternalPathProvider;
2323
import org.apache.paimon.manifest.FileEntry;
2424

2525
import javax.annotation.Nullable;
@@ -38,7 +38,7 @@ public ChainReadDataFilePathFactory(
3838
String changelogFilePrefix,
3939
boolean fileSuffixIncludeCompression,
4040
String fileCompression,
41-
@Nullable ExternalPathProvider externalPathProvider,
41+
@Nullable RoundRobinExternalPathProvider externalPathProvider,
4242
ChainReadContext chainReadContext) {
4343
super(
4444
parent,

paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,20 @@
2323
import org.apache.paimon.data.InternalRow;
2424
import org.apache.paimon.format.FileFormat;
2525
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
2627
import org.apache.paimon.fs.TwoPhaseOutputStream;
2728
import org.apache.paimon.table.sink.CommitMessage;
2829
import org.apache.paimon.types.RowType;
2930
import org.apache.paimon.utils.FileStorePathFactory;
3031

3132
import java.util.ArrayList;
3233
import java.util.HashMap;
34+
import java.util.LinkedHashMap;
3335
import java.util.List;
3436
import java.util.Map;
3537

3638
import static org.apache.paimon.format.FileFormat.fileFormat;
39+
import static org.apache.paimon.utils.PartitionPathUtils.generatePartitionPathUtil;
3740

3841
/** File writer for format table. */
3942
public class FormatTableFileWriter {
@@ -99,12 +102,21 @@ public List<CommitMessage> prepareCommit() throws Exception {
99102
}
100103

101104
private FormatTableRecordWriter createWriter(BinaryRow partition) {
105+
Path parent = pathFactory.root();
106+
if (partition.getFieldCount() > 0) {
107+
LinkedHashMap<String, String> partValues =
108+
pathFactory.partitionComputer().generatePartValues(partition);
109+
parent =
110+
new Path(
111+
parent,
112+
generatePartitionPathUtil(
113+
partValues, options.formatTablePartitionOnlyValueInPath()));
114+
}
102115
return new FormatTableRecordWriter(
103116
fileIO,
104117
fileFormat,
105118
options.targetFileSize(false),
106-
pathFactory.createFormatTableDataFilePathFactory(
107-
partition, options.formatTablePartitionOnlyValueInPath()),
119+
pathFactory.createDataFilePathFactory(parent, null),
108120
writeRowType,
109121
options.formatTableFileCompression());
110122
}

paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java

Lines changed: 15 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.paimon.utils;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
2223
import org.apache.paimon.annotation.VisibleForTesting;
2324
import org.apache.paimon.data.BinaryRow;
24-
import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
2525
import org.apache.paimon.fs.ExternalPathProvider;
2626
import org.apache.paimon.fs.Path;
2727
import org.apache.paimon.index.IndexInDataFileDirPathFactory;
@@ -80,7 +80,7 @@ public class FileStorePathFactory {
8080
private final AtomicInteger indexFileCount;
8181
private final AtomicInteger statsFileCount;
8282
private final List<Path> externalPaths;
83-
private final CoreOptions.ExternalPathStrategy strategy;
83+
private final ExternalPathStrategy strategy;
8484

8585
public FileStorePathFactory(
8686
Path root,
@@ -94,7 +94,7 @@ public FileStorePathFactory(
9494
String fileCompression,
9595
@Nullable String dataFilePathDirectory,
9696
List<Path> externalPaths,
97-
CoreOptions.ExternalPathStrategy strategy,
97+
ExternalPathStrategy strategy,
9898
boolean indexFileInDataFileDir) {
9999
this.root = root;
100100
this.dataFilePathDirectory = dataFilePathDirectory;
@@ -168,14 +168,20 @@ public Path toManifestListPath(String manifestListName) {
168168
}
169169

170170
public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
171+
return createDataFilePathFactory(
172+
bucketPath(partition, bucket), createExternalPathProvider(partition, bucket));
173+
}
174+
175+
public DataFilePathFactory createDataFilePathFactory(
176+
Path parent, @Nullable ExternalPathProvider externalPathProvider) {
171177
return new DataFilePathFactory(
172-
bucketPath(partition, bucket),
178+
parent,
173179
formatIdentifier,
174180
dataFilePrefix,
175181
changelogFilePrefix,
176182
fileSuffixIncludeCompression,
177183
fileCompression,
178-
createExternalPathProvider(partition, bucket));
184+
externalPathProvider);
179185
}
180186

181187
public ChainReadDataFilePathFactory createChainReadDataFilePathFactory(
@@ -194,64 +200,13 @@ public ChainReadDataFilePathFactory createChainReadDataFilePathFactory(
194200
chainReadContext);
195201
}
196202

197-
public DataFilePathFactory createFormatTableDataFilePathFactory(
198-
BinaryRow partition, boolean onlyValue) {
199-
return new DataFilePathFactory(
200-
partitionPath(partition, onlyValue),
201-
formatIdentifier,
202-
dataFilePrefix,
203-
changelogFilePrefix,
204-
fileSuffixIncludeCompression,
205-
fileCompression,
206-
createExternalPartitionPathProvider(partition));
207-
}
208-
209-
private ExternalPathProvider createExternalPartitionPathProvider(
210-
BinaryRow partition, boolean onlyValue) {
211-
if (externalPaths == null || externalPaths.isEmpty()) {
212-
return null;
213-
}
214-
215-
return new ExternalPathProvider(externalPaths, partitionPath(partition, onlyValue));
216-
}
217-
218-
private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow partition) {
219-
if (externalPaths == null || externalPaths.isEmpty()) {
220-
return null;
221-
}
222-
223-
return new ExternalPathProvider(externalPaths, partitionPath(partition));
224-
}
225-
226-
private Path partitionPath(BinaryRow partition, boolean onlyValue) {
227-
Path relativeBucketPath = null;
228-
String partitionPath = getPartitionString(partition, onlyValue);
229-
if (!partitionPath.isEmpty()) {
230-
relativeBucketPath = new Path(partitionPath);
231-
}
232-
if (dataFilePathDirectory != null) {
233-
relativeBucketPath =
234-
relativeBucketPath != null
235-
? new Path(dataFilePathDirectory, relativeBucketPath)
236-
: new Path(dataFilePathDirectory);
237-
}
238-
return relativeBucketPath != null ? new Path(root, relativeBucketPath) : root;
239-
}
240-
241-
public Path partitionPath(BinaryRow partition) {
242-
return partitionPath(partition, false);
243-
}
244-
245203
@Nullable
246204
private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int bucket) {
247205
if (externalPaths == null || externalPaths.isEmpty()) {
248206
return null;
249207
}
250-
if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
251-
return new EntropyInjectExternalPathProvider(
252-
externalPaths, relativeBucketPath(partition, bucket));
253-
}
254-
return new ExternalPathProvider(externalPaths, relativeBucketPath(partition, bucket));
208+
return ExternalPathProvider.create(
209+
strategy, externalPaths, relativeBucketPath(partition, bucket));
255210
}
256211

257212
public List<Path> getExternalPaths() {
@@ -286,12 +241,8 @@ public String getPartitionString(BinaryRow partition) {
286241
partition, "Partition row data is null. This is unexpected.")));
287242
}
288243

289-
public String getPartitionString(BinaryRow partition, boolean onlyValue) {
290-
return PartitionPathUtils.generatePartitionPathUtil(
291-
partitionComputer.generatePartValues(
292-
Preconditions.checkNotNull(
293-
partition, "Partition row data is null. This is unexpected.")),
294-
onlyValue);
244+
public InternalRowPartitionComputer partitionComputer() {
245+
return partitionComputer;
295246
}
296247

297248
// @TODO, need to be changed

0 commit comments

Comments
 (0)