Skip to content

Commit 3d846d6

Browse files
authored
[hotfix] Added convert to properties with prefix key utility class. (#74)
1 parent f5292d8 commit 3d846d6

File tree

6 files changed

+84
-50
lines changed

6 files changed

+84
-50
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.utils;
18+
19+
import java.util.Map;
20+
import java.util.stream.Collectors;
21+
22+
/** Utility class for properties related helper functions. */
23+
public class PropertiesUtils {
24+
25+
/** Returns the properties as a map copy with a prefix key. */
26+
public static <V> Map<String, V> asPrefixedMap(Map<String, V> properties, String prefix) {
27+
return properties.entrySet().stream()
28+
.collect(Collectors.toMap(e -> prefix + e.getKey(), Map.Entry::getValue));
29+
}
30+
31+
/**
32+
* Extracts the properties with the given prefix and removes the prefix from the keys.
33+
*
34+
* @param originalMap The original map
35+
* @param prefix The prefix to filter the keys
36+
*/
37+
public static <V> Map<String, V> extractAndRemovePrefix(
38+
Map<String, V> originalMap, String prefix) {
39+
return originalMap.entrySet().stream()
40+
.filter(entry -> entry.getKey().startsWith(prefix))
41+
.collect(
42+
Collectors.toMap(
43+
entry -> entry.getKey().substring(prefix.length()),
44+
Map.Entry::getValue));
45+
}
46+
47+
/**
48+
* Extracts the properties with the given prefix.
49+
*
50+
* @param originalMap The original map
51+
* @param prefix The prefix to filter the keys
52+
*/
53+
public static <V> Map<String, V> extractPrefix(Map<String, V> originalMap, String prefix) {
54+
return originalMap.entrySet().stream()
55+
.filter(entry -> entry.getKey().startsWith(prefix))
56+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
57+
}
58+
59+
/** Filter out keys that start with the given prefix from the original map. */
60+
public static <T> Map<String, T> excludeByPrefix(Map<String, T> originalMap, String prefix) {
61+
return originalMap.entrySet().stream()
62+
.filter(entry -> !entry.getKey().startsWith(prefix))
63+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
64+
}
65+
66+
// Make sure that we cannot instantiate this class
67+
private PropertiesUtils() {}
68+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/CatalogPropertiesUtils.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.IntStream;
3838
import java.util.stream.Stream;
3939

40+
import static com.alibaba.fluss.utils.PropertiesUtils.excludeByPrefix;
4041
import static org.apache.flink.util.Preconditions.checkNotNull;
4142

4243
/**
@@ -75,13 +76,7 @@ public class CatalogPropertiesUtils {
7576
private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name");
7677

7778
public static Map<String, String> deserializeOptions(Map<String, String> map) {
78-
return map.entrySet().stream()
79-
.filter(
80-
e -> {
81-
final String key = e.getKey();
82-
return !key.startsWith(SCHEMA + SEPARATOR);
83-
})
84-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
79+
return excludeByPrefix(map, SCHEMA + SEPARATOR);
8580
}
8681

8782
public static void deserializeWatermark(Map<String, String> map, Schema.Builder builder) {

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/DataLakeUtils.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.metadata.DataLakeFormat;
2222

23-
import java.util.HashMap;
2423
import java.util.Map;
2524

25+
import static com.alibaba.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
26+
2627
/** Utility class for accessing data lake related configurations. */
2728
public class DataLakeUtils {
2829

@@ -43,16 +44,7 @@ public static Map<String, String> extractLakeCatalogProperties(Configuration tab
4344
}
4445

4546
// currently, extract datalake catalog config
46-
Map<String, String> datalakeConfig = new HashMap<>();
47-
Map<String, String> flussConfig = tableOptions.toMap();
4847
String dataLakePrefix = "table.datalake." + datalakeFormat + ".";
49-
for (Map.Entry<String, String> configEntry : flussConfig.entrySet()) {
50-
String configKey = configEntry.getKey();
51-
String configValue = configEntry.getValue();
52-
if (configKey.startsWith(dataLakePrefix)) {
53-
datalakeConfig.put(configKey.substring(dataLakePrefix.length()), configValue);
54-
}
55-
}
56-
return datalakeConfig;
48+
return extractAndRemovePrefix(tableOptions.toMap(), dataLakePrefix);
5749
}
5850
}

fluss-lakehouse/fluss-lakehouse-cli/src/main/java/com/alibaba/fluss/lakehouse/cli/FlussLakehouseCli.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737

38+
import static com.alibaba.fluss.utils.PropertiesUtils.extractPrefix;
3839
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
3940

4041
/** Cli for Fluss lakehouse integrating. */
@@ -162,13 +163,7 @@ private static Map<String, String> getLakeStorageConfig(Configuration configurat
162163
+ ConfigOptions.DATALAKE_FORMAT.key());
163164
}
164165
String datalakeConfigPrefix = "datalake." + datalakeFormat + ".";
165-
Map<String, String> lakeStorageConfig = new HashMap<>();
166-
for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
167-
if (entry.getKey().startsWith(datalakeConfigPrefix)) {
168-
lakeStorageConfig.put(entry.getKey(), entry.getValue());
169-
}
170-
}
171-
return lakeStorageConfig;
166+
return extractPrefix(configuration.toMap(), datalakeConfigPrefix);
172167
}
173168

174169
private static String getFlussBootStrapServers(Configuration configuration) {

fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/FlussLakehousePaimon.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3131
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3232

33-
import java.util.HashMap;
3433
import java.util.Map;
3534
import java.util.regex.Pattern;
3635

36+
import static com.alibaba.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
37+
3738
/** The entrypoint for fluss tier data to Paimon. */
3839
public class FlussLakehousePaimon {
3940

@@ -52,7 +53,7 @@ public static void main(String[] args) throws Exception {
5253
String database = paramsMap.get(DATABASE);
5354

5455
// extract fluss config
55-
Map<String, String> flussConfigMap = extractConfigStartWith(paramsMap, FLUSS_CONF_PREFIX);
56+
Map<String, String> flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
5657
// we need to get bootstrap.servers
5758
String bootstrapServers = paramsMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
5859
if (bootstrapServers == null) {
@@ -61,7 +62,7 @@ public static void main(String[] args) throws Exception {
6162
flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
6263

6364
// extract paimon config
64-
Map<String, String> paimonConfig = extractConfigStartWith(paramsMap, PAIMON_CONF_PREFIX);
65+
Map<String, String> paimonConfig = extractAndRemovePrefix(paramsMap, PAIMON_CONF_PREFIX);
6566

6667
// then build the fluss to paimon job
6768
final StreamExecutionEnvironment execEnv =
@@ -110,17 +111,4 @@ public boolean test(String database) {
110111
return databasePattern.matcher(database).matches();
111112
}
112113
}
113-
114-
private static Map<String, String> extractConfigStartWith(
115-
Map<String, String> configParams, String prefix) {
116-
Map<String, String> extractedConfig = new HashMap<>();
117-
for (Map.Entry<String, String> configEntry : configParams.entrySet()) {
118-
String configKey = configEntry.getKey();
119-
String configValue = configEntry.getValue();
120-
if (configKey.startsWith(prefix)) {
121-
extractedConfig.put(configKey.substring(prefix.length()), configValue);
122-
}
123-
}
124-
return extractedConfig;
125-
}
126114
}

fluss-server/src/main/java/com/alibaba/fluss/server/utils/LakeStorageUtils.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222

2323
import javax.annotation.Nullable;
2424

25-
import java.util.HashMap;
2625
import java.util.Map;
2726
import java.util.Optional;
2827

28+
import static com.alibaba.fluss.utils.PropertiesUtils.asPrefixedMap;
29+
import static com.alibaba.fluss.utils.PropertiesUtils.extractPrefix;
30+
2931
/** Utils for Fluss lake storage. */
3032
public class LakeStorageUtils {
3133

@@ -36,14 +38,8 @@ public static Map<String, String> generateDefaultTableLakeOptions(Configuration
3638
if (!optDataLakeFormat.isPresent()) {
3739
return null;
3840
}
39-
Map<String, String> datalakeProperties = new HashMap<>();
41+
4042
String dataLakePrefix = "datalake." + optDataLakeFormat.get() + ".";
41-
for (Map.Entry<String, String> configurationEntry : clusterConf.toMap().entrySet()) {
42-
if (configurationEntry.getKey().startsWith(dataLakePrefix)) {
43-
datalakeProperties.put(
44-
"table." + configurationEntry.getKey(), configurationEntry.getValue());
45-
}
46-
}
47-
return datalakeProperties;
43+
return asPrefixedMap(extractPrefix(clusterConf.toMap(), dataLakePrefix), "table.");
4844
}
4945
}

0 commit comments

Comments
 (0)