Skip to content

Commit cc332e7

Browse files
committed
wip
1 parent dc1bfba commit cc332e7

File tree

64 files changed

+2128
-250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2128
-250
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,37 @@ public class ConfigOptions {
9696
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
9797
+ " in a Fluss supported filesystem.");
9898

99+
public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
100+
key("remote.data.dirs")
101+
.stringType()
102+
.asList()
103+
.defaultValues()
104+
.withDescription(
105+
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
106+
+ " in a Fluss supported filesystem. "
107+
+ "This is a list of remote data directory paths. "
108+
+ "Example: remote.data.dirs: oss://bucket1/fluss-remote-data, oss://bucket2/fluss-remote-data");
109+
110+
public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
111+
key("remote.data.dirs.weights")
112+
.intType()
113+
.asList()
114+
.defaultValues()
115+
.withDescription(
116+
"The weights of the remote data directories. "
117+
+ "This is a list of weights corresponding to the "
118+
+ REMOTE_DATA_DIRS.key()
119+
+ " in the same order. "
120+
+ "Example: remote.data.dir.weights: 1, 2");
121+
122+
public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
123+
key("remote.data.dirs.strategy")
124+
.enumType(RemoteDataDirStrategy.class)
125+
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
126+
.withDescription(
127+
"The strategy for selecting the remote data directory. "
128+
+ "The default value is ROUND_ROBIN.");
129+
99130
public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
100131
key("remote.fs.write-buffer-size")
101132
.memoryType()
@@ -1925,4 +1956,10 @@ private static class ConfigOptionsHolder {
19251956
public static ConfigOption<?> getConfigOption(String key) {
19261957
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
19271958
}
1959+
1960+
/** Remote data dir select strategy for Fluss. */
1961+
public enum RemoteDataDirStrategy {
1962+
ROUND_ROBIN,
1963+
WEIGHTED_ROUND_ROBIN
1964+
}
19281965
}

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.exception.IllegalConfigurationException;
2223

2324
import java.lang.reflect.Field;
2425
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Optional;
2830

2931
/** Utilities of Fluss {@link ConfigOptions}. */
3032
@Internal
@@ -74,4 +76,82 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
7476
}
7577
return options;
7678
}
79+
80+
public static void validateCoordinatorConfigs(Configuration conf) {
81+
validServerConfigs(conf);
82+
83+
if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
84+
throw new IllegalConfigurationException(
85+
String.format(
86+
"Invalid configuration for %s, it must be greater than or equal 1.",
87+
ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()));
88+
}
89+
90+
if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) {
91+
throw new IllegalConfigurationException(
92+
String.format(
93+
"Invalid configuration for %s, it must be greater than or equal 1.",
94+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
95+
}
96+
97+
if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
98+
throw new IllegalConfigurationException(
99+
String.format(
100+
"Invalid configuration for %s, it must be greater than or equal 1.",
101+
ConfigOptions.SERVER_IO_POOL_SIZE.key()));
102+
}
103+
104+
// validate remote.data.dirs
105+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
106+
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
107+
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
108+
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
109+
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
110+
if (remoteDataDirs.size() != weights.size()) {
111+
throw new IllegalConfigurationException(
112+
String.format(
113+
"Invalid configuration for %s, its size must be equal to the size of %s.",
114+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
115+
ConfigOptions.REMOTE_DATA_DIRS.key()));
116+
}
117+
}
118+
}
119+
120+
public static void validateTabletConfigs(Configuration conf) {
121+
validServerConfigs(conf);
122+
123+
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
124+
if (!serverId.isPresent()) {
125+
throw new IllegalConfigurationException(
126+
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
127+
}
128+
129+
if (serverId.get() < 0) {
130+
throw new IllegalConfigurationException(
131+
String.format(
132+
"Invalid configuration for %s, it must be greater than or equal 0.",
133+
ConfigOptions.TABLET_SERVER_ID.key()));
134+
}
135+
136+
if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
137+
throw new IllegalConfigurationException(
138+
String.format(
139+
"Invalid configuration for %s, it must be greater than or equal 1.",
140+
ConfigOptions.BACKGROUND_THREADS.key()));
141+
}
142+
143+
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
144+
throw new IllegalConfigurationException(
145+
String.format(
146+
"Invalid configuration for %s, it must be less than or equal %d bytes.",
147+
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
148+
}
149+
}
150+
151+
private static void validServerConfigs(Configuration conf) {
152+
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
153+
throw new IllegalConfigurationException(
154+
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
155+
}
156+
}
77157
}

fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.remote;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.fs.FsPath;
2122
import org.apache.fluss.metadata.PhysicalTablePath;
2223
import org.apache.fluss.metadata.TableBucket;
2324

@@ -50,14 +51,17 @@ public class RemoteLogSegment {
5051

5152
private final int segmentSizeInBytes;
5253

54+
private final FsPath remoteLogDir;
55+
5356
private RemoteLogSegment(
5457
PhysicalTablePath physicalTablePath,
5558
TableBucket tableBucket,
5659
UUID remoteLogSegmentId,
5760
long remoteLogStartOffset,
5861
long remoteLogEndOffset,
5962
long maxTimestamp,
60-
int segmentSizeInBytes) {
63+
int segmentSizeInBytes,
64+
FsPath remoteLogDir) {
6165
this.physicalTablePath = checkNotNull(physicalTablePath);
6266
this.tableBucket = checkNotNull(tableBucket);
6367
this.remoteLogSegmentId = checkNotNull(remoteLogSegmentId);
@@ -79,6 +83,7 @@ private RemoteLogSegment(
7983
this.remoteLogEndOffset = remoteLogEndOffset;
8084
this.maxTimestamp = maxTimestamp;
8185
this.segmentSizeInBytes = segmentSizeInBytes;
86+
this.remoteLogDir = remoteLogDir;
8287
}
8388

8489
public PhysicalTablePath physicalTablePath() {
@@ -115,6 +120,10 @@ public int segmentSizeInBytes() {
115120
return segmentSizeInBytes;
116121
}
117122

123+
public FsPath remoteLogDir() {
124+
return remoteLogDir;
125+
}
126+
118127
@Override
119128
public boolean equals(Object o) {
120129
if (this == o) {
@@ -174,6 +183,7 @@ public static class Builder {
174183
private long remoteLogEndOffset;
175184
private long maxTimestamp;
176185
private int segmentSizeInBytes;
186+
private FsPath remoteLogDir;
177187

178188
public static Builder builder() {
179189
return new Builder();
@@ -214,6 +224,11 @@ public Builder tableBucket(TableBucket tableBucket) {
214224
return this;
215225
}
216226

227+
public Builder remoteLogDir(FsPath remoteLogDir) {
228+
this.remoteLogDir = remoteLogDir;
229+
return this;
230+
}
231+
217232
public RemoteLogSegment build() {
218233
return new RemoteLogSegment(
219234
physicalTablePath,
@@ -222,7 +237,8 @@ public RemoteLogSegment build() {
222237
remoteLogStartOffset,
223238
remoteLogEndOffset,
224239
maxTimestamp,
225-
segmentSizeInBytes);
240+
segmentSizeInBytes,
241+
remoteLogDir);
226242
}
227243
}
228244
}

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,14 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
401401
fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.')));
402402
}
403403

404+
// ----------------------------------------------------------------------------------------
405+
// Remote Data Paths
406+
// ----------------------------------------------------------------------------------------
407+
408+
public static FsPath remoteDataDir(Configuration conf) {
409+
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
410+
}
411+
404412
// ----------------------------------------------------------------------------------------
405413
// Remote Log Paths
406414
// ----------------------------------------------------------------------------------------
@@ -418,6 +426,10 @@ public static FsPath remoteLogDir(Configuration conf) {
418426
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_LOG_DIR_NAME);
419427
}
420428

429+
public static FsPath remoteLogDir(FsPath remoteDataDir) {
430+
return new FsPath(remoteDataDir, REMOTE_LOG_DIR_NAME);
431+
}
432+
421433
/**
422434
* Returns the remote directory path for storing log files for a log tablet.
423435
*
@@ -584,6 +596,10 @@ public static FsPath remoteKvDir(Configuration conf) {
584596
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_KV_DIR_NAME);
585597
}
586598

599+
public static FsPath remoteKvDir(FsPath remoteDataDir) {
600+
return new FsPath(remoteDataDir, REMOTE_KV_DIR_NAME);
601+
}
602+
587603
/**
588604
* Returns the remote directory path for storing kv snapshot files for a kv tablet.
589605
*

0 commit comments

Comments
 (0)