Skip to content

Commit c09c9d4

Browse files
committed
[server] Fluss server support multiple paths for remote storage
1 parent fbf50fd commit c09c9d4

File tree

106 files changed

+3929
-390
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+3929
-390
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.config.cluster.AlterConfig;
3232
import org.apache.fluss.config.cluster.ConfigEntry;
3333
import org.apache.fluss.exception.LeaderNotAvailableException;
34+
import org.apache.fluss.fs.FsPath;
3435
import org.apache.fluss.metadata.DatabaseDescriptor;
3536
import org.apache.fluss.metadata.DatabaseInfo;
3637
import org.apache.fluss.metadata.DatabaseSummary;
@@ -291,6 +292,11 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
291292
r.getTableId(),
292293
r.getSchemaId(),
293294
TableDescriptor.fromJsonBytes(r.getTableJson()),
295+
// For backward compatibility, results returned by old
296+
// clusters do not include the remote data dir
297+
r.hasRemoteDataDir()
298+
? new FsPath(r.getRemoteDataDir())
299+
: null,
294300
r.getCreatedTime(),
295301
r.getModifiedTime()));
296302
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ void fetchOnce() throws Exception {
167167
(bytes, throwable) -> {
168168
if (throwable != null) {
169169
LOG.error(
170-
"Failed to download remote log segment file {}.",
170+
"Failed to download remote log segment file {} for bucket {}.",
171171
fsPathAndFileName.getFileName(),
172+
request.segment.tableBucket(),
172173
ExceptionUtils.stripExecutionException(throwable));
173174
// release the semaphore for the failed request
174175
prefetchSemaphore.release();
@@ -178,8 +179,9 @@ void fetchOnce() throws Exception {
178179
scannerMetricGroup.remoteFetchErrorCount().inc();
179180
} else {
180181
LOG.info(
181-
"Successfully downloaded remote log segment file {} to local cost {} ms.",
182+
"Successfully downloaded remote log segment file {} for bucket {} to local cost {} ms.",
182183
fsPathAndFileName.getFileName(),
184+
request.segment.tableBucket(),
183185
System.currentTimeMillis() - startTime);
184186
File localFile =
185187
new File(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6868
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6969
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
70+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7071
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
7172
import static org.apache.fluss.row.BinaryString.fromString;
7273
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
@@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
169170
.distributedBy(3)
170171
.logFormat(logFormat)
171172
.build(),
173+
DEFAULT_REMOTE_DATA_DIR,
172174
System.currentTimeMillis(),
173175
System.currentTimeMillis());
174176
long fetchOffset = 0L;
@@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
313315
.distributedBy(3)
314316
.logFormat(LogFormat.ARROW)
315317
.build(),
318+
DEFAULT_REMOTE_DATA_DIR,
316319
System.currentTimeMillis(),
317320
System.currentTimeMillis());
318321
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
172172
DATA1_TABLE_INFO.getNumBuckets(),
173173
DATA1_TABLE_INFO.getProperties(),
174174
DATA1_TABLE_INFO.getCustomProperties(),
175+
DATA1_TABLE_INFO.getRemoteDataDir(),
175176
DATA1_TABLE_INFO.getComment().orElse(null),
176177
DATA1_TABLE_INFO.getCreatedTime(),
177178
DATA1_TABLE_INFO.getModifiedTime()),

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6262
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6363
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
64+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
6465
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
6566
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
6667
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
@@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
218219
.distributedBy(3)
219220
.logFormat(logFormat)
220221
.build(),
222+
DEFAULT_REMOTE_DATA_DIR,
221223
System.currentTimeMillis(),
222224
System.currentTimeMillis());
223225
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
7777
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
7878
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
79+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7980
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
8081
import static org.apache.fluss.testutils.DataTestUtils.row;
8182
import static org.assertj.core.api.Assertions.assertThat;
@@ -96,6 +97,7 @@ class RecordAccumulatorTest {
9697
.distributedBy(3)
9798
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
9899
.build(),
100+
DEFAULT_REMOTE_DATA_DIR,
99101
System.currentTimeMillis(),
100102
System.currentTimeMillis());
101103

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,44 @@ public class ConfigOptions {
9999
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
100100
+ " in a Fluss supported filesystem.");
101101

102+
public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
103+
key("remote.data.dirs")
104+
.stringType()
105+
.asList()
106+
.defaultValues()
107+
.withDescription(
108+
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
109+
+ " in a Fluss supported filesystem. "
110+
+ "This is a list of remote data directory paths. "
111+
+ "Example: `remote.data.dirs: oss://bucket1/fluss-remote-data, oss://bucket2/fluss-remote-data`.");
112+
113+
public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
114+
key("remote.data.dirs.strategy")
115+
.enumType(RemoteDataDirStrategy.class)
116+
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
117+
.withDescription(
118+
"The strategy for selecting the remote data directory from `"
119+
+ REMOTE_DATA_DIRS.key()
120+
+ "`.");
121+
122+
public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
123+
key("remote.data.dirs.weights")
124+
.intType()
125+
.asList()
126+
.defaultValues()
127+
.withDescription(
128+
"The weights of the remote data directories. "
129+
+ "This is a list of weights corresponding to the `"
130+
+ REMOTE_DATA_DIRS.key()
131+
+ "` in the same order. When `"
132+
+ REMOTE_DATA_DIRS_STRATEGY.key()
133+
+ "` is set to `"
134+
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
135+
+ "`, this must be configured, and its size must be equal to `"
136+
+ REMOTE_DATA_DIRS.key()
137+
+ "`; otherwise, it will be ignored."
138+
+ "Example: `remote.data.dir.weights: 1, 2`");
139+
102140
public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
103141
key("remote.fs.write-buffer-size")
104142
.memoryType()
@@ -2058,4 +2096,10 @@ private static class ConfigOptionsHolder {
20582096
public static ConfigOption<?> getConfigOption(String key) {
20592097
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
20602098
}
2099+
2100+
/** Remote data dir select strategy for Fluss. */
2101+
public enum RemoteDataDirStrategy {
2102+
ROUND_ROBIN,
2103+
WEIGHTED_ROUND_ROBIN
2104+
}
20612105
}

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.exception.IllegalConfigurationException;
23+
import org.apache.fluss.fs.FsPath;
24+
import org.apache.fluss.utils.FlussPaths;
2225

2326
import java.lang.reflect.Field;
2427
import java.util.Arrays;
2528
import java.util.HashMap;
2629
import java.util.List;
2730
import java.util.Map;
31+
import java.util.Optional;
2832

2933
/** Utilities of Fluss {@link ConfigOptions}. */
3034
@Internal
@@ -76,4 +80,109 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
7680
}
7781
return options;
7882
}
83+
84+
public static void validateCoordinatorConfigs(Configuration conf) {
85+
validServerConfigs(conf);
86+
87+
validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
88+
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
89+
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
90+
91+
// Validate remote.data.dirs
92+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
93+
for (int i = 0; i < remoteDataDirs.size(); i++) {
94+
String remoteDataDir = remoteDataDirs.get(i);
95+
try {
96+
new FsPath(remoteDataDir);
97+
} catch (Exception e) {
98+
throw new IllegalConfigurationException(
99+
String.format(
100+
"Invalid remote path for %s at index %d.",
101+
ConfigOptions.REMOTE_DATA_DIRS.key(), i),
102+
e);
103+
}
104+
}
105+
106+
// Validate remote.data.dirs.strategy
107+
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
108+
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
109+
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
110+
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
111+
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
112+
if (remoteDataDirs.size() != weights.size()) {
113+
throw new IllegalConfigurationException(
114+
String.format(
115+
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
116+
ConfigOptions.REMOTE_DATA_DIRS.key(),
117+
remoteDataDirs.size(),
118+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
119+
weights.size()));
120+
}
121+
// Validate all weights are positive
122+
for (int i = 0; i < weights.size(); i++) {
123+
if (weights.get(i) < 0) {
124+
throw new IllegalConfigurationException(
125+
String.format(
126+
"All weights in '%s' must be no less than 0, but found %d at index %d.",
127+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
128+
weights.get(i),
129+
i));
130+
}
131+
}
132+
}
133+
}
134+
}
135+
136+
public static void validateTabletConfigs(Configuration conf) {
137+
validServerConfigs(conf);
138+
139+
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
140+
if (!serverId.isPresent()) {
141+
throw new IllegalConfigurationException(
142+
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
143+
}
144+
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
145+
146+
validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
147+
148+
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
149+
throw new IllegalConfigurationException(
150+
String.format(
151+
"Invalid configuration for %s, it must be less than or equal %d bytes.",
152+
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
153+
}
154+
}
155+
156+
/** Validate common server configs. */
157+
private static void validServerConfigs(Configuration conf) {
158+
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
159+
throw new IllegalConfigurationException(
160+
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
161+
} else {
162+
// Must validate that remote.data.dir is a valid FsPath
163+
try {
164+
FlussPaths.remoteDataDir(conf);
165+
} catch (Exception e) {
166+
throw new IllegalConfigurationException(
167+
String.format(
168+
"Invalid configuration for %s.",
169+
ConfigOptions.REMOTE_DATA_DIR.key()),
170+
e);
171+
}
172+
}
173+
}
174+
175+
private static void validMinValue(
176+
Configuration conf, ConfigOption<Integer> option, int minValue) {
177+
validMinValue(option, conf.get(option), minValue);
178+
}
179+
180+
private static void validMinValue(ConfigOption<Integer> option, int value, int minValue) {
181+
if (value < minValue) {
182+
throw new IllegalConfigurationException(
183+
String.format(
184+
"Invalid configuration for %s, it must be greater than or equal %d.",
185+
option.key(), minValue));
186+
}
187+
}
79188
}

fluss-common/src/main/java/org/apache/fluss/metadata/PartitionInfo.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,31 @@
1818
package org.apache.fluss.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.fs.FsPath;
2122

2223
import java.util.Objects;
2324

2425
/**
25-
* Information of a partition metadata, includes the partition's name and the partition id that
26-
* represents the unique identifier of the partition.
26+
* Information of a partition metadata, includes partition id (unique identifier of the partition),
27+
* partition name, etc.
2728
*
2829
* @since 0.2
2930
*/
3031
@PublicEvolving
3132
public class PartitionInfo {
3233
private final long partitionId;
3334
private final ResolvedPartitionSpec partitionSpec;
35+
private final FsPath remoteDataDir;
3436

3537
public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
38+
this(partitionId, partitionSpec, null);
39+
}
40+
41+
public PartitionInfo(
42+
long partitionId, ResolvedPartitionSpec partitionSpec, FsPath remoteDataDir) {
3643
this.partitionId = partitionId;
3744
this.partitionSpec = partitionSpec;
45+
this.remoteDataDir = remoteDataDir;
3846
}
3947

4048
/** Get the partition id. The id is globally unique in the Fluss cluster. */
@@ -58,6 +66,10 @@ public PartitionSpec getPartitionSpec() {
5866
return partitionSpec.toPartitionSpec();
5967
}
6068

69+
public FsPath getRemoteDataDir() {
70+
return remoteDataDir;
71+
}
72+
6173
@Override
6274
public boolean equals(Object o) {
6375
if (this == o) {
@@ -67,16 +79,25 @@ public boolean equals(Object o) {
6779
return false;
6880
}
6981
PartitionInfo that = (PartitionInfo) o;
70-
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
82+
return partitionId == that.partitionId
83+
&& Objects.equals(partitionSpec, that.partitionSpec)
84+
&& Objects.equals(remoteDataDir, that.remoteDataDir);
7185
}
7286

7387
@Override
7488
public int hashCode() {
75-
return Objects.hash(partitionId, partitionSpec);
89+
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
7690
}
7791

7892
@Override
7993
public String toString() {
80-
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
94+
return "Partition{name='"
95+
+ getPartitionName()
96+
+ '\''
97+
+ ", id="
98+
+ partitionId
99+
+ ", remoteDataDir="
100+
+ remoteDataDir
101+
+ '}';
81102
}
82103
}

0 commit comments

Comments
 (0)