Skip to content

Commit 8a09d1c

Browse files
committed
address jark's comments
1 parent 4cd10a0 commit 8a09d1c

File tree

5 files changed

+62
-70
lines changed

5 files changed

+62
-70
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,32 @@ public class ConfigOptions {
9696
.stringType()
9797
.noDefaultValue()
9898
.withDescription(
99-
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
100-
+ "in a Fluss supported filesystem.");
99+
"The directory in a Fluss supported filesystem for remote data storage. "
100+
+ "This configuration is required. "
101+
+ "If `remote.data.dirs` is not configured, all remote data files "
102+
+ "(kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) "
103+
+ "will be stored under this directory. "
104+
+ "If `remote.data.dirs` is configured, the kv snapshot data files and remote log files "
105+
+ "for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, "
106+
+ "while producer offsets and kv snapshot leases are always stored under this directory.");
101107

102108
public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
103109
key("remote.data.dirs")
104110
.stringType()
105111
.asList()
106112
.defaultValues()
107113
.withDescription(
108-
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
109-
+ "in a Fluss supported filesystem. "
110-
+ "This should be a comma-separated list of remote URIs. "
111-
+ "If not configured, it defaults to the path specified in `"
114+
"A comma-separated list of directories in Fluss supported filesystems "
115+
+ "for storing the kv snapshot data files and remote log files of tables/partitions. "
116+
+ "This configuration is optional. "
117+
+ "If configured, when a new table or a new partition is created, "
118+
+ "one of the directories from this list will be selected according to the strategy "
119+
+ "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). "
120+
+ "Once assigned, the table/partition will keep using the selected directory for "
121+
+ "storing the kv snapshot data files and remote log files. "
122+
+ "If not configured, the system uses `"
112123
+ REMOTE_DATA_DIR.key()
113-
+ "`. Otherwise, one of the paths from this configuration will be used.");
124+
+ "` as the sole remote data directory for all data.");
114125

115126
public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
116127
key("remote.data.dirs.strategy")

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.annotation.VisibleForTesting;
2222
import org.apache.fluss.exception.IllegalConfigurationException;
2323
import org.apache.fluss.fs.FsPath;
24-
import org.apache.fluss.utils.FlussPaths;
2524

2625
import java.lang.reflect.Field;
2726
import java.util.Arrays;
@@ -83,11 +82,49 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
8382
}
8483

8584
public static void validateCoordinatorConfigs(Configuration conf) {
86-
validServerConfigs(conf);
85+
validateServerConfigs(conf);
86+
}
87+
88+
public static void validateTabletConfigs(Configuration conf) {
89+
validateServerConfigs(conf);
90+
91+
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
92+
if (!serverId.isPresent()) {
93+
throw new IllegalConfigurationException(
94+
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
95+
}
96+
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
97+
}
98+
99+
/** Validate common server configs. */
100+
private static void validateServerConfigs(Configuration conf) {
101+
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
102+
throw new IllegalConfigurationException(
103+
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
104+
} else {
105+
// Must validate that remote.data.dir is a valid FsPath
106+
try {
107+
new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
108+
} catch (Exception e) {
109+
throw new IllegalConfigurationException(
110+
String.format(
111+
"Invalid configuration for %s.",
112+
ConfigOptions.REMOTE_DATA_DIR.key()),
113+
e);
114+
}
115+
}
87116

88117
validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
89118
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
90119
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
120+
validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
121+
122+
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
123+
throw new IllegalConfigurationException(
124+
String.format(
125+
"Invalid configuration for %s, it must be less than or equal %d bytes.",
126+
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
127+
}
91128

92129
// Validate remote.data.dirs
93130
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
@@ -109,7 +146,7 @@ public static void validateCoordinatorConfigs(Configuration conf) {
109146
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
110147
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
111148
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
112-
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
149+
if (!remoteDataDirs.isEmpty()) {
113150
if (remoteDataDirs.size() != weights.size()) {
114151
throw new IllegalConfigurationException(
115152
String.format(
@@ -119,7 +156,7 @@ public static void validateCoordinatorConfigs(Configuration conf) {
119156
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
120157
weights.size()));
121158
}
122-
// Validate all weights are positive
159+
// Validate all weights are no less than 0
123160
for (int i = 0; i < weights.size(); i++) {
124161
if (weights.get(i) < 0) {
125162
throw new IllegalConfigurationException(
@@ -134,45 +171,6 @@ public static void validateCoordinatorConfigs(Configuration conf) {
134171
}
135172
}
136173

137-
public static void validateTabletConfigs(Configuration conf) {
138-
validServerConfigs(conf);
139-
140-
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
141-
if (!serverId.isPresent()) {
142-
throw new IllegalConfigurationException(
143-
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
144-
}
145-
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
146-
147-
validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
148-
149-
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
150-
throw new IllegalConfigurationException(
151-
String.format(
152-
"Invalid configuration for %s, it must be less than or equal %d bytes.",
153-
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
154-
}
155-
}
156-
157-
/** Validate common server configs. */
158-
private static void validServerConfigs(Configuration conf) {
159-
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
160-
throw new IllegalConfigurationException(
161-
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
162-
} else {
163-
// Must validate that remote.data.dir is a valid FsPath
164-
try {
165-
FlussPaths.remoteDataDir(conf);
166-
} catch (Exception e) {
167-
throw new IllegalConfigurationException(
168-
String.format(
169-
"Invalid configuration for %s.",
170-
ConfigOptions.REMOTE_DATA_DIR.key()),
171-
e);
172-
}
173-
}
174-
}
175-
176174
private static void validMinValue(
177175
Configuration conf, ConfigOption<Integer> option, int minValue) {
178176
validMinValue(option, conf.get(option), minValue);

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -409,23 +409,6 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
409409
fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.')));
410410
}
411411

412-
// ----------------------------------------------------------------------------------------
413-
// Remote Data Paths
414-
// ----------------------------------------------------------------------------------------
415-
416-
/**
417-
* Returns the remote root directory path for storing data files.
418-
*
419-
* <p>The path contract:
420-
*
421-
* <pre>
422-
* {$remote.data.dir}
423-
* </pre>
424-
*/
425-
public static FsPath remoteDataDir(Configuration conf) {
426-
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
427-
}
428-
429412
// ----------------------------------------------------------------------------------------
430413
// Remote Log Paths
431414
// ----------------------------------------------------------------------------------------

fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void testValidateCoordinatorConfigs() {
138138
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
139139
matchedWeightsConf.set(
140140
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
141-
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2));
141+
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 2));
142142
validateCoordinatorConfigs(matchedWeightsConf);
143143

144144
// Test negative weight

website/docs/maintenance/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ during the Fluss cluster working.
3535
| `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. |
3636
| default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. |
3737
| default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. |
38-
| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. |
39-
| remote.data.dirs | List&lt;String&gt; | (None) | The directories used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. This should be a comma-separated list of remote URIs. If not configured, it defaults to the path specified in `remote.data.dir`. Otherwise, one of the paths from this configuration will be used. |
38+
| remote.data.dir | String | (None) | The directory in a Fluss supported filesystem for remote data storage. This configuration is required. If `remote.data.dirs` is not configured, all remote data files (kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) will be stored under this directory. If `remote.data.dirs` is configured, the kv snapshot data files and remote log files for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, while producer offsets and kv snapshot leases are always stored under this directory. |
39+
| remote.data.dirs | List&lt;String&gt; | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. This configuration is optional. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). Once assigned, the table/partition will keep using the selected directory for storing the kv snapshot data files and remote log files. If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. |
4040
| remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.<br/>ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.<br/>WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. |
4141
| remote.data.dirs.weights | List&lt;Integer&gt;| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. |
4242
| remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. |

0 commit comments

Comments
 (0)