Skip to content

Commit f15bcfe

Browse files
authored
[core] Enable Entropy Inject for data file path to prevent being throttled by object storage (#6832)
1 parent 44b14ee commit f15bcfe

File tree

17 files changed

+195
-8
lines changed

17 files changed

+195
-8
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@
396396
<td><h5>data-file.external-paths.strategy</h5></td>
397397
<td style="word-wrap: break-word;">none</td>
398398
<td><p>Enum</p></td>
399-
<td>The strategy of selecting an external path when writing data.<br /><br />Possible values:<ul><li>"none": Do not choose any external storage, data will still be written to the default warehouse path.</li><li>"specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.</li></ul></td>
399+
<td>The strategy of selecting an external path when writing data.<br /><br />Possible values:<ul><li>"none": Do not choose any external storage, data will still be written to the default warehouse path.</li><li>"specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.</li><li>"entropy-inject": When writing a new file, a path is chosen based on the hash value of the file's content.</li></ul></td>
400400
</tr>
401401
<tr>
402402
<td><h5>data-file.path-directory</h5></td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3936,7 +3936,11 @@ public enum ExternalPathStrategy implements DescribedEnum {
39363936

39373937
ROUND_ROBIN(
39383938
"round-robin",
3939-
"When writing a new file, a path is chosen from data-file.external-paths in turn.");
3939+
"When writing a new file, a path is chosen from data-file.external-paths in turn."),
3940+
3941+
ENTROPY_INJECT(
3942+
"entropy-inject",
3943+
"When writing a new file, a path is chosen based on the hash value of the file's content.");
39403944

39413945
private final String value;
39423946

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs;
20+
21+
import org.apache.paimon.shade.guava30.com.google.common.hash.HashCode;
22+
import org.apache.paimon.shade.guava30.com.google.common.hash.HashFunction;
23+
import org.apache.paimon.shade.guava30.com.google.common.hash.Hashing;
24+
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.List;
27+
28+
/** Provider for entropy inject external data paths. */
29+
public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
30+
31+
private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
32+
private static final int HASH_BINARY_STRING_BITS = 20;
33+
// Entropy generated will be divided into dirs with this lengths
34+
private static final int ENTROPY_DIR_LENGTH = 4;
35+
// Will create DEPTH many dirs from the entropy
36+
private static final int ENTROPY_DIR_DEPTH = 3;
37+
38+
public EntropyInjectExternalPathProvider(
39+
List<Path> externalTablePaths, Path relativeBucketPath) {
40+
super(externalTablePaths, relativeBucketPath);
41+
}
42+
43+
@Override
44+
public Path getNextExternalDataPath(String fileName) {
45+
String hashDirs = computeHash(fileName);
46+
Path filePathWithHashDirs = new Path(relativeBucketPath, hashDirs + "/" + fileName);
47+
position++;
48+
if (position == externalTablePaths.size()) {
49+
position = 0;
50+
}
51+
return new Path(externalTablePaths.get(position), filePathWithHashDirs);
52+
}
53+
54+
public String computeHash(String fileName) {
55+
HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);
56+
String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE);
57+
String hash =
58+
hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS);
59+
return dirsFromHash(hash);
60+
}
61+
62+
/**
63+
* Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH.
64+
*
65+
* @param hash 10011001100110011001
66+
* @return 1001/1001/1001/10011001 with depth 3 and length 4
67+
*/
68+
private String dirsFromHash(String hash) {
69+
StringBuilder hashWithDirs = new StringBuilder();
70+
71+
for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) {
72+
if (i > 0) {
73+
hashWithDirs.append("/");
74+
}
75+
hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length()));
76+
}
77+
78+
if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) {
79+
hashWithDirs
80+
.append("/")
81+
.append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length());
82+
}
83+
84+
return hashWithDirs.toString();
85+
}
86+
}

paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
/** Provider for external data paths. */
2626
public class ExternalPathProvider implements Serializable {
2727

28-
private final List<Path> externalTablePaths;
29-
private final Path relativeBucketPath;
28+
protected final List<Path> externalTablePaths;
29+
protected final Path relativeBucketPath;
3030

31-
private int position;
31+
protected int position;
3232

3333
public ExternalPathProvider(List<Path> externalTablePaths, Path relativeBucketPath) {
3434
this.externalTablePaths = externalTablePaths;

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
135135
options.fileCompression(),
136136
options.dataFilePathDirectory(),
137137
createExternalPaths(),
138+
options.externalPathStrategy(),
138139
options.indexFileInDataFileDir());
139140
}
140141

@@ -168,7 +169,6 @@ private List<Path> createExternalPaths() {
168169
paths.add(path);
169170
}
170171
}
171-
172172
checkArgument(!paths.isEmpty(), "External paths should not be empty");
173173
return paths;
174174
}

paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public FormatTableFileWriter(
6565
options.formatTableFileCompression(),
6666
options.dataFilePathDirectory(),
6767
null,
68-
false);
68+
CoreOptions.ExternalPathStrategy.NONE,
69+
options.indexFileInDataFileDir());
6970
}
7071

7172
public void withWriteType(RowType writeType) {

paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.annotation.VisibleForTesting;
2323
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
2425
import org.apache.paimon.fs.ExternalPathProvider;
2526
import org.apache.paimon.fs.Path;
2627
import org.apache.paimon.index.IndexInDataFileDirPathFactory;
@@ -79,6 +80,7 @@ public class FileStorePathFactory {
7980
private final AtomicInteger indexFileCount;
8081
private final AtomicInteger statsFileCount;
8182
private final List<Path> externalPaths;
83+
private final CoreOptions.ExternalPathStrategy strategy;
8284

8385
public FileStorePathFactory(
8486
Path root,
@@ -92,6 +94,7 @@ public FileStorePathFactory(
9294
String fileCompression,
9395
@Nullable String dataFilePathDirectory,
9496
List<Path> externalPaths,
97+
CoreOptions.ExternalPathStrategy strategy,
9598
boolean indexFileInDataFileDir) {
9699
this.root = root;
97100
this.dataFilePathDirectory = dataFilePathDirectory;
@@ -112,6 +115,7 @@ public FileStorePathFactory(
112115
this.indexFileCount = new AtomicInteger(0);
113116
this.statsFileCount = new AtomicInteger(0);
114117
this.externalPaths = externalPaths;
118+
this.strategy = strategy;
115119
}
116120

117121
public Path root() {
@@ -243,7 +247,10 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int
243247
if (externalPaths == null || externalPaths.isEmpty()) {
244248
return null;
245249
}
246-
250+
if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
251+
return new EntropyInjectExternalPathProvider(
252+
externalPaths, relativeBucketPath(partition, bucket));
253+
}
247254
return new ExternalPathProvider(externalPaths, relativeBucketPath(partition, bucket));
248255
}
249256

paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ protected void foreachIndexReader(
172172
CoreOptions.FILE_COMPRESSION.defaultValue(),
173173
null,
174174
null,
175+
CoreOptions.ExternalPathStrategy.NONE,
175176
false);
176177

177178
Table table = fileSystemCatalog.getTable(Identifier.create(tableName, tableName));

paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.paimon.io;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
2223
import org.apache.paimon.fs.Path;
2324

2425
import org.junit.jupiter.api.Test;
2526
import org.junit.jupiter.api.io.TempDir;
2627

28+
import java.util.Collections;
29+
import java.util.List;
30+
2731
import static org.assertj.core.api.Assertions.assertThat;
2832

2933
/** Tests for {@link DataFilePathFactory}. */
@@ -91,4 +95,68 @@ public void testWithPartition() {
9195
new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name")
9296
.toString());
9397
}
98+
99+
@Test
100+
public void testEntropyInjectWithNoPartition() {
101+
EntropyInjectExternalPathProvider externalPathProvider =
102+
createExternalPathProvider(new Path(tempDir.toString()), "bucket-123");
103+
DataFilePathFactory pathFactory =
104+
new DataFilePathFactory(
105+
new Path(tempDir + "/bucket-123"),
106+
CoreOptions.FILE_FORMAT.defaultValue(),
107+
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
108+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
109+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
110+
CoreOptions.FILE_COMPRESSION.defaultValue(),
111+
externalPathProvider);
112+
String uuid = pathFactory.uuid();
113+
114+
for (int i = 0; i < 20; i++) {
115+
String filename =
116+
"data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
117+
assertThat(pathFactory.newPath())
118+
.isEqualTo(
119+
new Path(
120+
tempDir.toString()
121+
+ "/bucket-123/"
122+
+ externalPathProvider.computeHash(filename)
123+
+ "/"
124+
+ filename));
125+
}
126+
}
127+
128+
@Test
129+
public void testEntropyInjectWithPartition() {
130+
EntropyInjectExternalPathProvider externalPathProvider =
131+
createExternalPathProvider(new Path(tempDir.toString()), "dt=20211224/bucket-123");
132+
DataFilePathFactory pathFactory =
133+
new DataFilePathFactory(
134+
new Path(tempDir + "/dt=20211224/bucket-123"),
135+
CoreOptions.FILE_FORMAT.defaultValue(),
136+
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
137+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
138+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
139+
CoreOptions.FILE_COMPRESSION.defaultValue(),
140+
externalPathProvider);
141+
String uuid = pathFactory.uuid();
142+
143+
for (int i = 0; i < 20; i++) {
144+
String filename =
145+
"data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
146+
assertThat(pathFactory.newPath())
147+
.isEqualTo(
148+
new Path(
149+
tempDir.toString()
150+
+ "/dt=20211224/bucket-123/"
151+
+ externalPathProvider.computeHash(filename)
152+
+ "/"
153+
+ filename));
154+
}
155+
}
156+
157+
public EntropyInjectExternalPathProvider createExternalPathProvider(
158+
Path path, String relativeBucketPath) {
159+
List<Path> externalPaths = Collections.singletonList(path);
160+
return new EntropyInjectExternalPathProvider(externalPaths, new Path(relativeBucketPath));
161+
}
94162
}

paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
239239
CoreOptions.FILE_COMPRESSION.defaultValue(),
240240
null,
241241
null,
242+
CoreOptions.ExternalPathStrategy.NONE,
242243
false);
243244
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
244245
FileIO fileIO = FileIOFinder.find(path);
@@ -261,6 +262,7 @@ public FileStorePathFactory apply(String format) {
261262
CoreOptions.FILE_COMPRESSION.defaultValue(),
262263
null,
263264
null,
265+
CoreOptions.ExternalPathStrategy.NONE,
264266
false);
265267
}
266268
};

0 commit comments

Comments
 (0)