Skip to content

Commit 77a2abb

Browse files
committed
[server] Support Multi-Location for Remote Storage
1 parent 423e63a commit 77a2abb

File tree

69 files changed

+2882
-292
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+2882
-292
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ void fetchOnce() throws Exception {
167167
(bytes, throwable) -> {
168168
if (throwable != null) {
169169
LOG.error(
170-
"Failed to download remote log segment file {}.",
170+
"Failed to download remote log segment file {} for bucket {}.",
171171
fsPathAndFileName.getFileName(),
172+
request.segment.tableBucket(),
172173
ExceptionUtils.stripExecutionException(throwable));
173174
// release the semaphore for the failed request
174175
prefetchSemaphore.release();
@@ -178,8 +179,9 @@ void fetchOnce() throws Exception {
178179
scannerMetricGroup.remoteFetchErrorCount().inc();
179180
} else {
180181
LOG.info(
181-
"Successfully downloaded remote log segment file {} to local cost {} ms.",
182+
"Successfully downloaded remote log segment file {} for bucket {} to local cost {} ms.",
182183
fsPathAndFileName.getFileName(),
184+
request.segment.tableBucket(),
183185
System.currentTimeMillis() - startTime);
184186
File localFile =
185187
new File(

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,44 @@ public class ConfigOptions {
9696
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
9797
+ " in a Fluss supported filesystem.");
9898

99+
public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
100+
key("remote.data.dirs")
101+
.stringType()
102+
.asList()
103+
.defaultValues()
104+
.withDescription(
105+
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
106+
+ " in a Fluss supported filesystem. "
107+
+ "This is a list of remote data directory paths. "
108+
+ "Example: `remote.data.dirs: oss://bucket1/fluss-remote-data, oss://bucket2/fluss-remote-data`.");
109+
110+
public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
111+
key("remote.data.dirs.strategy")
112+
.enumType(RemoteDataDirStrategy.class)
113+
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
114+
.withDescription(
115+
"The strategy for selecting the remote data directory from `"
116+
+ REMOTE_DATA_DIRS.key()
117+
+ "`.");
118+
119+
public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
120+
key("remote.data.dirs.weights")
121+
.intType()
122+
.asList()
123+
.defaultValues()
124+
.withDescription(
125+
"The weights of the remote data directories. "
126+
+ "This is a list of weights corresponding to the `"
127+
+ REMOTE_DATA_DIRS.key()
128+
+ "` in the same order. When `"
129+
+ REMOTE_DATA_DIRS_STRATEGY.key()
130+
+ "` is set to `"
131+
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
132+
+ "`, this must be configured, and its size must be equal to `"
133+
+ REMOTE_DATA_DIRS.key()
134+
+ "`; otherwise, it will be ignored."
135+
+ "Example: `remote.data.dir.weights: 1, 2`");
136+
99137
public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
100138
key("remote.fs.write-buffer-size")
101139
.memoryType()
@@ -1925,4 +1963,10 @@ private static class ConfigOptionsHolder {
19251963
public static ConfigOption<?> getConfigOption(String key) {
19261964
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
19271965
}
1966+
1967+
/** Remote data dir select strategy for Fluss. */
1968+
public enum RemoteDataDirStrategy {
1969+
ROUND_ROBIN,
1970+
WEIGHTED_ROUND_ROBIN
1971+
}
19281972
}

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.exception.IllegalConfigurationException;
2223

2324
import java.lang.reflect.Field;
2425
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Optional;
2830

2931
/** Utilities of Fluss {@link ConfigOptions}. */
3032
@Internal
@@ -74,4 +76,97 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
7476
}
7577
return options;
7678
}
79+
80+
public static void validateCoordinatorConfigs(Configuration conf) {
81+
validServerConfigs(conf);
82+
83+
if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
84+
throw new IllegalConfigurationException(
85+
String.format(
86+
"Invalid configuration for %s, it must be greater than or equal 1.",
87+
ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()));
88+
}
89+
90+
if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) {
91+
throw new IllegalConfigurationException(
92+
String.format(
93+
"Invalid configuration for %s, it must be greater than or equal 1.",
94+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
95+
}
96+
97+
if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
98+
throw new IllegalConfigurationException(
99+
String.format(
100+
"Invalid configuration for %s, it must be greater than or equal 1.",
101+
ConfigOptions.SERVER_IO_POOL_SIZE.key()));
102+
}
103+
104+
// validate remote.data.dirs
105+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
106+
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
107+
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
108+
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
109+
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
110+
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
111+
if (remoteDataDirs.size() != weights.size()) {
112+
throw new IllegalConfigurationException(
113+
String.format(
114+
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
115+
ConfigOptions.REMOTE_DATA_DIRS.key(),
116+
remoteDataDirs.size(),
117+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
118+
weights.size()));
119+
}
120+
// validate all weights are positive
121+
for (int i = 0; i < weights.size(); i++) {
122+
if (weights.get(i) < 0) {
123+
throw new IllegalConfigurationException(
124+
String.format(
125+
"All weights in '%s' must be no less than 0, but found %d at index %d.",
126+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
127+
weights.get(i),
128+
i));
129+
}
130+
}
131+
}
132+
}
133+
}
134+
135+
public static void validateTabletConfigs(Configuration conf) {
136+
validServerConfigs(conf);
137+
138+
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
139+
if (!serverId.isPresent()) {
140+
throw new IllegalConfigurationException(
141+
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
142+
}
143+
144+
if (serverId.get() < 0) {
145+
throw new IllegalConfigurationException(
146+
String.format(
147+
"Invalid configuration for %s, it must be greater than or equal 0.",
148+
ConfigOptions.TABLET_SERVER_ID.key()));
149+
}
150+
151+
if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
152+
throw new IllegalConfigurationException(
153+
String.format(
154+
"Invalid configuration for %s, it must be greater than or equal 1.",
155+
ConfigOptions.BACKGROUND_THREADS.key()));
156+
}
157+
158+
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
159+
throw new IllegalConfigurationException(
160+
String.format(
161+
"Invalid configuration for %s, it must be less than or equal %d bytes.",
162+
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
163+
}
164+
}
165+
166+
private static void validServerConfigs(Configuration conf) {
167+
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
168+
throw new IllegalConfigurationException(
169+
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
170+
}
171+
}
77172
}

fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.remote;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.fs.FsPath;
2122
import org.apache.fluss.metadata.PhysicalTablePath;
2223
import org.apache.fluss.metadata.TableBucket;
2324

@@ -50,14 +51,17 @@ public class RemoteLogSegment {
5051

5152
private final int segmentSizeInBytes;
5253

54+
private final FsPath remoteLogDir;
55+
5356
private RemoteLogSegment(
5457
PhysicalTablePath physicalTablePath,
5558
TableBucket tableBucket,
5659
UUID remoteLogSegmentId,
5760
long remoteLogStartOffset,
5861
long remoteLogEndOffset,
5962
long maxTimestamp,
60-
int segmentSizeInBytes) {
63+
int segmentSizeInBytes,
64+
FsPath remoteLogDir) {
6165
this.physicalTablePath = checkNotNull(physicalTablePath);
6266
this.tableBucket = checkNotNull(tableBucket);
6367
this.remoteLogSegmentId = checkNotNull(remoteLogSegmentId);
@@ -79,6 +83,7 @@ private RemoteLogSegment(
7983
this.remoteLogEndOffset = remoteLogEndOffset;
8084
this.maxTimestamp = maxTimestamp;
8185
this.segmentSizeInBytes = segmentSizeInBytes;
86+
this.remoteLogDir = remoteLogDir;
8287
}
8388

8489
public PhysicalTablePath physicalTablePath() {
@@ -115,6 +120,10 @@ public int segmentSizeInBytes() {
115120
return segmentSizeInBytes;
116121
}
117122

123+
public FsPath remoteLogDir() {
124+
return remoteLogDir;
125+
}
126+
118127
@Override
119128
public boolean equals(Object o) {
120129
if (this == o) {
@@ -174,6 +183,7 @@ public static class Builder {
174183
private long remoteLogEndOffset;
175184
private long maxTimestamp;
176185
private int segmentSizeInBytes;
186+
private FsPath remoteLogDir;
177187

178188
public static Builder builder() {
179189
return new Builder();
@@ -214,6 +224,11 @@ public Builder tableBucket(TableBucket tableBucket) {
214224
return this;
215225
}
216226

227+
public Builder remoteLogDir(FsPath remoteLogDir) {
228+
this.remoteLogDir = remoteLogDir;
229+
return this;
230+
}
231+
217232
public RemoteLogSegment build() {
218233
return new RemoteLogSegment(
219234
physicalTablePath,
@@ -222,7 +237,8 @@ public RemoteLogSegment build() {
222237
remoteLogStartOffset,
223238
remoteLogEndOffset,
224239
maxTimestamp,
225-
segmentSizeInBytes);
240+
segmentSizeInBytes,
241+
remoteLogDir);
226242
}
227243
}
228244
}

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,14 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
401401
fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.')));
402402
}
403403

404+
// ----------------------------------------------------------------------------------------
405+
// Remote Data Paths
406+
// ----------------------------------------------------------------------------------------
407+
408+
public static FsPath remoteDataDir(Configuration conf) {
409+
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
410+
}
411+
404412
// ----------------------------------------------------------------------------------------
405413
// Remote Log Paths
406414
// ----------------------------------------------------------------------------------------
@@ -418,6 +426,10 @@ public static FsPath remoteLogDir(Configuration conf) {
418426
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_LOG_DIR_NAME);
419427
}
420428

429+
public static FsPath remoteLogDir(FsPath remoteDataDir) {
430+
return new FsPath(remoteDataDir, REMOTE_LOG_DIR_NAME);
431+
}
432+
421433
/**
422434
* Returns the remote directory path for storing log files for a log tablet.
423435
*
@@ -584,6 +596,10 @@ public static FsPath remoteKvDir(Configuration conf) {
584596
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_KV_DIR_NAME);
585597
}
586598

599+
public static FsPath remoteKvDir(FsPath remoteDataDir) {
600+
return new FsPath(remoteDataDir, REMOTE_KV_DIR_NAME);
601+
}
602+
587603
/**
588604
* Returns the remote directory path for storing kv snapshot files for a kv tablet.
589605
*

fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717

1818
package org.apache.fluss.config;
1919

20+
import org.apache.fluss.exception.IllegalConfigurationException;
21+
2022
import org.junit.jupiter.api.Test;
2123

24+
import java.util.Arrays;
25+
import java.util.Collections;
2226
import java.util.Map;
2327

2428
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
2529
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
2630
import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
31+
import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
2732
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2834

2935
/** Test for {@link FlussConfigUtils}. */
3036
class FlussConfigUtilsTest {
@@ -49,4 +55,72 @@ void testExtractOptions() {
4955
});
5056
assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
5157
}
58+
59+
@Test
60+
void testValidateCoordinatorConfigs() {
61+
// Test valid configuration
62+
Configuration validConf = new Configuration();
63+
validConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
64+
validateCoordinatorConfigs(validConf);
65+
66+
// Test invalid DEFAULT_REPLICATION_FACTOR
67+
Configuration invalidReplicationConf = new Configuration();
68+
invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
69+
invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0);
70+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf))
71+
.isInstanceOf(IllegalConfigurationException.class)
72+
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
73+
.hasMessageContaining("must be greater than or equal 1");
74+
75+
// Test invalid KV_MAX_RETAINED_SNAPSHOTS
76+
Configuration invalidSnapshotConf = new Configuration();
77+
invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
78+
invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
79+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf))
80+
.isInstanceOf(IllegalConfigurationException.class)
81+
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
82+
.hasMessageContaining("must be greater than or equal 1");
83+
84+
// Test invalid SERVER_IO_POOL_SIZE
85+
Configuration invalidIoPoolConf = new Configuration();
86+
invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
87+
invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
88+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
89+
.isInstanceOf(IllegalConfigurationException.class)
90+
.hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
91+
.hasMessageContaining("must be greater than or equal 1");
92+
93+
// Test REMOTE_DATA_DIR not set
94+
Configuration noRemoteDirConf = new Configuration();
95+
assertThatThrownBy(() -> validateCoordinatorConfigs(noRemoteDirConf))
96+
.isInstanceOf(IllegalConfigurationException.class)
97+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
98+
.hasMessageContaining("must be set");
99+
100+
// Test WEIGHTED_ROUND_ROBIN with mismatched sizes
101+
Configuration mismatchedWeightsConf = new Configuration();
102+
mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
103+
mismatchedWeightsConf.set(
104+
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
105+
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
106+
mismatchedWeightsConf.set(
107+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
108+
mismatchedWeightsConf.set(
109+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1));
110+
assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf))
111+
.isInstanceOf(IllegalConfigurationException.class)
112+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
113+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());
114+
115+
// Test WEIGHTED_ROUND_ROBIN with matched sizes
116+
Configuration matchedWeightsConf = new Configuration();
117+
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
118+
matchedWeightsConf.set(
119+
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
120+
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
121+
matchedWeightsConf.set(
122+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
123+
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2));
124+
validateCoordinatorConfigs(matchedWeightsConf);
125+
}
52126
}

0 commit comments

Comments
 (0)