Skip to content

Commit 39f5128

Browse files
committed
[core] introduce weighted robin strategy for external-path
1 parent e102abd commit 39f5128

File tree

14 files changed

+303
-3
lines changed

14 files changed

+303
-3
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,23 @@ public InlineElement getDescription() {
209209
+ ExternalPathStrategy.SPECIFIC_FS
210210
+ ", should be the prefix scheme of the external path, now supported are s3 and oss.");
211211

212+
public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATHS_WEIGHTS =
213+
key("data-file.external-paths.weights")
214+
.stringType()
215+
.noDefaultValue()
216+
.withDescription(
217+
"The weights for external paths when "
218+
+ DATA_FILE_EXTERNAL_PATHS_STRATEGY.key()
219+
+ " is set to "
220+
+ ExternalPathStrategy.WEIGHTED
221+
+ ". "
222+
+ "Format: 'weight1,weight2,...' "
223+
+ "with weights corresponding to paths in "
224+
+ DATA_FILE_EXTERNAL_PATHS.key()
225+
+ " by order. "
226+
+ "Example: '10,5,15' means first path has weight 10, second 5, third 15. "
227+
+ "Weights must be positive integers.");
228+
212229
public static final ConfigOption<Boolean> COMPACTION_FORCE_REWRITE_ALL_FILES =
213230
key("compaction.force-rewrite-all-files")
214231
.booleanType()
@@ -3178,6 +3195,21 @@ public String externalSpecificFS() {
31783195
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
31793196
}
31803197

3198+
@Nullable
3199+
public int[] externalPathWeights() {
3200+
String weightsStr = options.get(DATA_FILE_EXTERNAL_PATHS_WEIGHTS);
3201+
if (weightsStr == null) {
3202+
return null;
3203+
}
3204+
String[] parts = weightsStr.split(",");
3205+
int[] weights = new int[parts.length];
3206+
for (int i = 0; i < parts.length; i++) {
3207+
weights[i] = Integer.parseInt(parts[i].trim());
3208+
checkArgument(weights[i] > 0, "Weight must be positive, got: %s", weights[i]);
3209+
}
3210+
return weights;
3211+
}
3212+
31813213
public Boolean forceRewriteAllFiles() {
31823214
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
31833215
}
@@ -4088,7 +4120,11 @@ public enum ExternalPathStrategy implements DescribedEnum {
40884120

40894121
ENTROPY_INJECT(
40904122
"entropy-inject",
4091-
"When writing a new file, a path is chosen based on the hash value of the file's content.");
4123+
"When writing a new file, a path is chosen based on the hash value of the file's content."),
4124+
4125+
WEIGHTED(
4126+
"weight-robin",
4127+
"When writing a new file, a path is chosen based on configured weights.");
40924128

40934129
private final String value;
40944130

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ public interface ExternalPathProvider extends Serializable {
3232

3333
@Nullable
3434
static ExternalPathProvider create(
35-
ExternalPathStrategy strategy, List<Path> externalTablePaths, Path relativeBucketPath) {
35+
ExternalPathStrategy strategy,
36+
List<Path> externalTablePaths,
37+
Path relativeBucketPath,
38+
@Nullable int[] weights) {
3639
switch (strategy) {
3740
case ENTROPY_INJECT:
3841
return new EntropyInjectExternalPathProvider(
@@ -41,6 +44,13 @@ static ExternalPathProvider create(
4144
// specific fs can use round-robin with only one path
4245
case ROUND_ROBIN:
4346
return new RoundRobinExternalPathProvider(externalTablePaths, relativeBucketPath);
47+
case WEIGHTED:
48+
if (externalTablePaths.size() < 2 || weights == null) {
49+
return new RoundRobinExternalPathProvider(
50+
externalTablePaths, relativeBucketPath);
51+
}
52+
return new WeightedExternalPathProvider(
53+
externalTablePaths, relativeBucketPath, weights);
4454
case NONE:
4555
return null;
4656
default:
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.paimon.fs;
21+
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.NavigableMap;
25+
import java.util.TreeMap;
26+
import java.util.concurrent.ThreadLocalRandom;
27+
28+
import static org.apache.paimon.utils.Preconditions.checkArgument;
29+
30+
/**
31+
* Provider for weighted external data paths.
32+
*
33+
* <p>This provider uses a weighted random algorithm to select paths based on configured weights.
34+
* Higher weights result in higher probability of selection.
35+
*/
36+
public class WeightedExternalPathProvider implements ExternalPathProvider {
37+
38+
private final NavigableMap<Double, Path> cumulativeWeightMap;
39+
private final double totalWeight;
40+
private final Path relativeBucketPath;
41+
42+
public WeightedExternalPathProvider(
43+
List<Path> externalTablePaths, Path relativeBucketPath, int[] weights) {
44+
checkArgument(
45+
externalTablePaths.size() == weights.length,
46+
"The number of external paths and weights should be the same. Paths: "
47+
+ externalTablePaths.size()
48+
+ ", Weights: "
49+
+ weights.length);
50+
this.relativeBucketPath = relativeBucketPath;
51+
this.cumulativeWeightMap = buildCumulativeWeightMap(externalTablePaths, weights);
52+
this.totalWeight = Arrays.stream(weights).sum();
53+
}
54+
55+
@Override
56+
public Path getNextExternalDataPath(String fileName) {
57+
double randomValue = ThreadLocalRandom.current().nextDouble() * totalWeight;
58+
Path selectedPath = cumulativeWeightMap.higherEntry(randomValue).getValue();
59+
return new Path(new Path(selectedPath, relativeBucketPath), fileName);
60+
}
61+
62+
private NavigableMap<Double, Path> buildCumulativeWeightMap(
63+
List<Path> externalTablePaths, int[] weights) {
64+
NavigableMap<Double, Path> map = new TreeMap<>();
65+
double cumulativeWeight = 0;
66+
for (int i = 0; i < externalTablePaths.size(); i++) {
67+
cumulativeWeight += weights[i];
68+
map.put(cumulativeWeight, externalTablePaths.get(i));
69+
}
70+
return map;
71+
}
72+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.paimon.fs;
21+
22+
import org.apache.paimon.CoreOptions;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
35+
36+
/** Tests for {@link WeightedExternalPathProvider}. */
37+
public class WeightedExternalPathProviderTest {
38+
39+
@Test
40+
public void testEqualWeights() {
41+
int fileNum = 3000;
42+
int[] weights = {10, 10, 10};
43+
Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
44+
45+
int expectedCount = fileNum / 3;
46+
for (int count : pathCounts.values()) {
47+
assertThat(count).isBetween(expectedCount - 100, expectedCount + 100);
48+
}
49+
}
50+
51+
@Test
52+
public void testDifferentWeights() {
53+
int[] weights = {10, 5, 15};
54+
int fileNum = 3000;
55+
Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
56+
57+
int totalWeight = 30;
58+
assertThat(pathCounts.get("s3://bucket1/data"))
59+
.isBetween(
60+
(int) (fileNum * 10.0 / totalWeight) - 100,
61+
(int) (fileNum * 10.0 / totalWeight) + 100);
62+
assertThat(pathCounts.get("oss://bucket2/data"))
63+
.isBetween(
64+
(int) (fileNum * 5.0 / totalWeight) - 100,
65+
(int) (fileNum * 5.0 / totalWeight) + 100);
66+
assertThat(pathCounts.get("hdfs://namenode/data"))
67+
.isBetween(
68+
(int) (fileNum * 15.0 / totalWeight) - 100,
69+
(int) (fileNum * 15.0 / totalWeight) + 100);
70+
}
71+
72+
@Test
73+
public void testSinglePath() {
74+
List<Path> paths = new ArrayList<>();
75+
paths.add(new Path("s3://bucket1/data"));
76+
77+
int[] weights = {10};
78+
79+
Path relativeBucketPath = new Path("bucket-0");
80+
WeightedExternalPathProvider provider =
81+
new WeightedExternalPathProvider(paths, relativeBucketPath, weights);
82+
83+
for (int fileNum = 0; fileNum < 1000; fileNum++) {
84+
Path selectedPath = provider.getNextExternalDataPath("file-" + fileNum + ".parquet");
85+
assertThat(selectedPath.toString())
86+
.contains("s3://bucket1/data/bucket-0/file-" + fileNum + ".parquet");
87+
}
88+
}
89+
90+
@Test
91+
public void testMissingWeight() {
92+
List<Path> paths = new ArrayList<>();
93+
paths.add(new Path("s3://bucket1/data"));
94+
paths.add(new Path("oss://bucket2/data"));
95+
96+
int[] weights = {10};
97+
// Missing weight for oss://bucket2/data
98+
99+
Path relativeBucketPath = new Path("bucket-0");
100+
101+
assertThatThrownBy(
102+
() -> new WeightedExternalPathProvider(paths, relativeBucketPath, weights))
103+
.isInstanceOf(IllegalArgumentException.class)
104+
.hasMessageContaining(
105+
"The number of external paths and weights should be the same. Paths: 2, Weights: 1");
106+
}
107+
108+
@Test
109+
public void testPathConstruction() {
110+
List<Path> paths = new ArrayList<>();
111+
paths.add(new Path("s3://bucket1/data"));
112+
113+
int[] weights = {10};
114+
115+
Path relativeBucketPath = new Path("bucket-0");
116+
WeightedExternalPathProvider provider =
117+
new WeightedExternalPathProvider(paths, relativeBucketPath, weights);
118+
119+
Path selectedPath = provider.getNextExternalDataPath("test-file.parquet");
120+
assertThat(selectedPath.toString())
121+
.isEqualTo("s3://bucket1/data/bucket-0/test-file.parquet");
122+
}
123+
124+
@Test
125+
public void testCreateExternalPathProvider() {
126+
ExternalPathProvider provider1 =
127+
ExternalPathProvider.create(
128+
CoreOptions.ExternalPathStrategy.WEIGHTED,
129+
Arrays.asList(new Path("oss://path1"), new Path("oss://path2")),
130+
new Path("bucket-0"),
131+
null);
132+
assertThat(provider1).isInstanceOf(RoundRobinExternalPathProvider.class);
133+
134+
ExternalPathProvider provider2 =
135+
ExternalPathProvider.create(
136+
CoreOptions.ExternalPathStrategy.WEIGHTED,
137+
Collections.singletonList(new Path("oss://path1")),
138+
new Path("bucket-0"),
139+
new int[] {10});
140+
assertThat(provider2).isInstanceOf(RoundRobinExternalPathProvider.class);
141+
142+
ExternalPathProvider provider3 =
143+
ExternalPathProvider.create(
144+
CoreOptions.ExternalPathStrategy.WEIGHTED,
145+
Arrays.asList(new Path("oss://path1"), new Path("oss://path2")),
146+
new Path("bucket-0"),
147+
new int[] {10, 20});
148+
assertThat(provider3).isInstanceOf(WeightedExternalPathProvider.class);
149+
}
150+
151+
private Map<String, Integer> generatePaths(int fileNum, int[] weights) {
152+
List<Path> paths = new ArrayList<>();
153+
paths.add(new Path("s3://bucket1/data"));
154+
paths.add(new Path("oss://bucket2/data"));
155+
paths.add(new Path("hdfs://namenode/data"));
156+
157+
Path relativeBucketPath = new Path("bucket-0");
158+
WeightedExternalPathProvider provider =
159+
new WeightedExternalPathProvider(paths, relativeBucketPath, weights);
160+
161+
Map<String, Integer> pathCounts = new HashMap<>();
162+
for (int i = 0; i < fileNum; i++) {
163+
Path selectedPath = provider.getNextExternalDataPath("file-" + i + ".parquet");
164+
String basePath = selectedPath.getParent().getParent().toString();
165+
pathCounts.put(basePath, pathCounts.getOrDefault(basePath, 0) + 1);
166+
}
167+
return pathCounts;
168+
}
169+
}

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
143143
options.dataFilePathDirectory(),
144144
createExternalPaths(),
145145
options.externalPathStrategy(),
146+
options.externalPathWeights(),
146147
options.indexFileInDataFileDir(),
147148
options.globalIndexExternalPath());
148149
}

paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public FormatTableFileWriter(
6969
options.dataFilePathDirectory(),
7070
null,
7171
CoreOptions.ExternalPathStrategy.NONE,
72+
null,
7273
options.indexFileInDataFileDir(),
7374
null);
7475
}

paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class FileStorePathFactory {
8383
private final AtomicInteger statsFileCount;
8484
private final List<Path> externalPaths;
8585
private final ExternalPathStrategy strategy;
86+
@Nullable private final int[] externalPathWeights;
8687
@Nullable private final Path globalIndexExternalRootDir;
8788

8889
public FileStorePathFactory(
@@ -98,6 +99,7 @@ public FileStorePathFactory(
9899
@Nullable String dataFilePathDirectory,
99100
List<Path> externalPaths,
100101
ExternalPathStrategy strategy,
102+
@Nullable int[] externalPathWeights,
101103
boolean indexFileInDataFileDir,
102104
@Nullable Path globalIndexExternalRootDir) {
103105
this.root = root;
@@ -120,6 +122,7 @@ public FileStorePathFactory(
120122
this.statsFileCount = new AtomicInteger(0);
121123
this.externalPaths = externalPaths;
122124
this.strategy = strategy;
125+
this.externalPathWeights = externalPathWeights;
123126
this.globalIndexExternalRootDir = globalIndexExternalRootDir;
124127
}
125128

@@ -215,7 +218,7 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int
215218
return null;
216219
}
217220
return ExternalPathProvider.create(
218-
strategy, externalPaths, relativeBucketPath(partition, bucket));
221+
strategy, externalPaths, relativeBucketPath(partition, bucket), externalPathWeights);
219222
}
220223

221224
public List<Path> getExternalPaths() {

paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ protected void foreachIndexReader(
173173
null,
174174
null,
175175
CoreOptions.ExternalPathStrategy.NONE,
176+
null,
176177
false,
177178
null);
178179

paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
327327
null,
328328
null,
329329
CoreOptions.ExternalPathStrategy.NONE,
330+
null,
330331
false,
331332
null);
332333

0 commit comments

Comments
 (0)