Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,53 @@ public class ConfigOptions {
.noDefaultValue()
.withDescription(
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem.");
+ "in a Fluss supported filesystem.");

public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
key("remote.data.dirs")
.stringType()
.asList()
.defaultValues()
.withDescription(
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
+ "in a Fluss supported filesystem. "
+ "This should be a comma-separated list of remote URIs. "
+ "If not configured, it defaults to the path specified in `"
+ REMOTE_DATA_DIR.key()
+ "`. Otherwise, one of the paths from this configuration will be used.");

public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
key("remote.data.dirs.strategy")
.enumType(RemoteDataDirStrategy.class)
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
.withDescription(
String.format(
"The strategy for selecting the remote data directory from `%s`. "
+ "The candidate strategies are: %s, the default strategy is %s.\n"
+ "%s: this strategy employs a round-robin approach to select one from the available remote directories.\n"
+ "%s: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`.",
REMOTE_DATA_DIRS.key(),
Arrays.toString(RemoteDataDirStrategy.values()),
RemoteDataDirStrategy.ROUND_ROBIN,
RemoteDataDirStrategy.ROUND_ROBIN,
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN));

public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
key("remote.data.dirs.weights")
.intType()
.asList()
.defaultValues()
.withDescription(
"The weights of the remote data directories. "
+ "This is a list of weights corresponding to the `"
+ REMOTE_DATA_DIRS.key()
+ "` in the same order. When `"
+ REMOTE_DATA_DIRS_STRATEGY.key()
+ "` is set to `"
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+ "`, this must be configured, and its size must be equal to `"
+ REMOTE_DATA_DIRS.key()
+ "`; otherwise, it will be ignored.");

public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
key("remote.fs.write-buffer-size")
Expand Down Expand Up @@ -2066,4 +2112,10 @@ private static class ConfigOptionsHolder {
public static ConfigOption<?> getConfigOption(String key) {
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
}

/** Remote data dir select strategy for Fluss. */
public enum RemoteDataDirStrategy {
ROUND_ROBIN,
WEIGHTED_ROUND_ROBIN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.utils.FlussPaths;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
Expand Down Expand Up @@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
}
return options;
}

public static void validateCoordinatorConfigs(Configuration conf) {
validServerConfigs(conf);

validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);

// Validate remote.data.dirs
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
for (int i = 0; i < remoteDataDirs.size(); i++) {
String remoteDataDir = remoteDataDirs.get(i);
try {
new FsPath(remoteDataDir);
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid remote path for %s at index %d.",
ConfigOptions.REMOTE_DATA_DIRS.key(), i),
e);
}
}

// Validate remote.data.dirs.strategy
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
if (remoteDataDirs.size() != weights.size()) {
throw new IllegalConfigurationException(
String.format(
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
ConfigOptions.REMOTE_DATA_DIRS.key(),
remoteDataDirs.size(),
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.size()));
}
// Validate all weights are positive
for (int i = 0; i < weights.size(); i++) {
if (weights.get(i) < 0) {
throw new IllegalConfigurationException(
String.format(
"All weights in '%s' must be no less than 0, but found %d at index %d.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition weights.get(i) < 0 should be weights.get(i) <= 0, and the error message should be "must be greater than 0".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need 0. Imagine a scenario where the capacity of a remote storage has reached its limit and we don’t want to transfer any more files to it; in that case, we can set its weight to 0.

ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.get(i),
i));
}
}
}
}
}

public static void validateTabletConfigs(Configuration conf) {
validServerConfigs(conf);

Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
if (!serverId.isPresent()) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
}
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);

validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);

if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be less than or equal %d bytes.",
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
}
}

/** Validate common server configs. */
private static void validServerConfigs(Configuration conf) {
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
throw new IllegalConfigurationException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow remote.data.dirs is set and remote.data.dir is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I updated in the document. We should always set remote.data.dir, as the recently introduced producer offsets and kv snapshot lease files are not belong to a specific table.

  • kv snapshot lease dir: {$remote.data.dir}/lease/kv-snapshot/{leaseId}/{tableId}/
  • producer offset dir: {$remote.data.dir}/producers

I think we must keep remote.data.dir to store producer offsets and kv snapshot lease files for now.

String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
} else {
// Must validate that remote.data.dir is a valid FsPath
try {
FlussPaths.remoteDataDir(conf);
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s.",
ConfigOptions.REMOTE_DATA_DIR.key()),
e);
}
}
}

private static void validMinValue(
Configuration conf, ConfigOption<Integer> option, int minValue) {
validMinValue(option, conf.get(option), minValue);
}

private static void validMinValue(ConfigOption<Integer> option, int value, int minValue) {
if (value < minValue) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal %d.",
option.key(), minValue));
}
}
}
17 changes: 17 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,23 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.')));
}

// ----------------------------------------------------------------------------------------
// Remote Data Paths
// ----------------------------------------------------------------------------------------

/**
* Returns the remote root directory path for storing data files.
*
* <p>The path contract:
*
* <pre>
* {$remote.data.dir}
* </pre>
*/
public static FsPath remoteDataDir(Configuration conf) {
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
}

// ----------------------------------------------------------------------------------------
// Remote Log Paths
// ----------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.fluss.config;

import org.apache.fluss.exception.IllegalConfigurationException;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link FlussConfigUtils}. */
class FlussConfigUtilsTest {
Expand All @@ -49,4 +55,105 @@ void testExtractOptions() {
});
assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
}

@Test
void testValidateCoordinatorConfigs() {
// Test valid configuration
Configuration validConf = new Configuration();
validConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
validateCoordinatorConfigs(validConf);

// Test invalid DEFAULT_REPLICATION_FACTOR
Configuration invalidReplicationConf = new Configuration();
invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
.hasMessageContaining("must be greater than or equal 1");

// Test invalid KV_MAX_RETAINED_SNAPSHOTS
Configuration invalidSnapshotConf = new Configuration();
invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
.hasMessageContaining("must be greater than or equal 1");

// Test invalid SERVER_IO_POOL_SIZE
Configuration invalidIoPoolConf = new Configuration();
invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
.hasMessageContaining("must be greater than or equal 1");

// Test REMOTE_DATA_DIR not set
Configuration noRemoteDirConf = new Configuration();
assertThatThrownBy(() -> validateCoordinatorConfigs(noRemoteDirConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
.hasMessageContaining("must be set");

// Test invalid REMOTE_DATA_DIR
Configuration invalidRemoteDirConf = new Configuration();
invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "123://invalid.com");
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
.hasMessageContaining("Invalid configuration for remote.data.dir");

// Test REMOTE_DATA_DIRS contains invalid path
Configuration invalidRemoteDirsConf = new Configuration();
invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidRemoteDirsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "123://invalid.com"));
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
.hasMessageContaining("Invalid remote path for remote.data.dirs");

// Test WEIGHTED_ROUND_ROBIN with mismatched sizes
Configuration mismatchedWeightsConf = new Configuration();
mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1));
assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());

// Test WEIGHTED_ROUND_ROBIN with matched sizes
Configuration matchedWeightsConf = new Configuration();
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
matchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
matchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2));
validateCoordinatorConfigs(matchedWeightsConf);

// Test negative weight
Configuration negativeWeightConf = new Configuration();
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
negativeWeightConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
negativeWeightConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(-1, 2));
assertThatThrownBy(() -> validateCoordinatorConfigs(negativeWeightConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
.hasMessageContaining(
"All weights in 'remote.data.dirs.weights' must be no less than 0");
}
}
Loading