Skip to content

Commit 001cba0

Browse files
authored
[FLINK-38737][common] Fix potential backwards incompatibility with fallback keys (#4185)
1 parent 94e0c55 commit 001cba0

File tree

3 files changed

+77
-7
lines changed

3 files changed

+77
-7
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.cdc.common.configuration.FallbackKey;
2324
import org.apache.flink.cdc.common.utils.Preconditions;
2425
import org.apache.flink.configuration.ReadableConfig;
2526
import org.apache.flink.table.api.ValidationException;
@@ -32,6 +33,7 @@
3233
import java.util.Set;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.Stream;
36+
import java.util.stream.StreamSupport;
3537

3638
/** A helper for working with {@link Factory}. */
3739
@PublicEvolving
@@ -70,7 +72,7 @@ public static void validateFactoryOptions(
7072
final List<String> missingRequiredOptions =
7173
requiredOptions.stream()
7274
.filter(option -> configuration.get(option) == null)
73-
.map(ConfigOption::key)
75+
.flatMap(FactoryHelper::allKeys)
7476
.sorted()
7577
.collect(Collectors.toList());
7678

@@ -109,8 +111,8 @@ public static void validateUnconsumedKeys(
109111
public void validate() {
110112
Set<String> allOptionKeys =
111113
Stream.concat(
112-
factory.requiredOptions().stream().map(ConfigOption::key),
113-
factory.optionalOptions().stream().map(ConfigOption::key))
114+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
115+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
114116
.collect(Collectors.toSet());
115117

116118
validateFactoryOptions(factory, context.getFactoryConfiguration());
@@ -135,8 +137,8 @@ public void validateExcept(String... prefixesToSkip) {
135137

136138
Set<String> allOptionKeys =
137139
Stream.concat(
138-
factory.requiredOptions().stream().map(ConfigOption::key),
139-
factory.optionalOptions().stream().map(ConfigOption::key))
140+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
141+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
140142
.collect(Collectors.toSet());
141143

142144
Set<String> filteredOptionKeys =
@@ -148,6 +150,13 @@ public void validateExcept(String... prefixesToSkip) {
148150
validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, allOptionKeys);
149151
}
150152

153+
private static Stream<String> allKeys(ConfigOption<?> option) {
154+
return Stream.concat(
155+
Stream.of(option.key()),
156+
StreamSupport.stream(option.fallbackKeys().spliterator(), false)
157+
.map(FallbackKey::getKey));
158+
}
159+
151160
public ReadableConfig getFormatConfig(String formatPrefix) {
152161
final String prefix = formatPrefix + ".";
153162
Map<String, String> formatConfigMap = new HashMap<>();

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,21 @@ public String identifier() {
4646
@Override
4747
public Set<ConfigOption<?>> requiredOptions() {
4848
return Sets.newHashSet(
49-
ConfigOptions.key("id").intType().noDefaultValue(),
49+
ConfigOptions.key("id")
50+
.intType()
51+
.noDefaultValue()
52+
.withFallbackKeys("id_fallback"),
5053
ConfigOptions.key("name").stringType().noDefaultValue(),
5154
ConfigOptions.key("age").doubleType().noDefaultValue());
5255
}
5356

5457
@Override
5558
public Set<ConfigOption<?>> optionalOptions() {
5659
return Sets.newHashSet(
57-
ConfigOptions.key("hobby").stringType().noDefaultValue(),
60+
ConfigOptions.key("hobby")
61+
.stringType()
62+
.noDefaultValue()
63+
.withFallbackKeys("hobby_fallback"),
5864
ConfigOptions.key("location").stringType().defaultValue("Everywhere"),
5965
ConfigOptions.key("misc")
6066
.mapType()
@@ -79,6 +85,19 @@ void testCorrectConfigValidation() {
7985
Configuration.fromMap(configurations), null, null));
8086

8187
factoryHelper.validate();
88+
89+
// Validation for fallback keys.
90+
configurations.clear();
91+
configurations.put("id_fallback", "2");
92+
configurations.put("name", "Bob");
93+
configurations.put("age", "18");
94+
configurations.put("hobby_fallback", "Swimming");
95+
factoryHelper =
96+
FactoryHelper.createFactoryHelper(
97+
getDummyFactory(),
98+
new FactoryHelper.DefaultContext(
99+
Configuration.fromMap(configurations), null, null));
100+
factoryHelper.validate();
82101
}
83102

84103
@Test

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@
2222
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2323
import org.apache.flink.cdc.common.factories.FactoryHelper;
2424
import org.apache.flink.cdc.common.sink.DataSink;
25+
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
2526
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
27+
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
2628
import org.apache.flink.table.api.ValidationException;
2729

2830
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
2931

3032
import org.assertj.core.api.Assertions;
3133
import org.junit.jupiter.api.Test;
3234
import org.junit.jupiter.api.io.TempDir;
35+
import org.junit.jupiter.params.ParameterizedTest;
36+
import org.junit.jupiter.params.provider.ValueSource;
3337

3438
import java.io.File;
3539
import java.util.HashMap;
@@ -38,6 +42,8 @@
3842
import java.util.UUID;
3943
import java.util.stream.Collectors;
4044

45+
import static org.assertj.core.api.InstanceOfAssertFactories.type;
46+
4147
/** Tests for {@link PaimonDataSinkFactory}. */
4248
class PaimonDataSinkFactoryTest {
4349

@@ -164,4 +170,40 @@ void testPrefixRequireOption() {
164170
conf, conf, Thread.currentThread().getContextClassLoader()));
165171
Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
166172
}
173+
174+
@ParameterizedTest(name = "{0}")
175+
@ValueSource(strings = {"commit.user", "commit.user-prefix"})
176+
void testSpecifyingCommitUser(String commitUserKey) {
177+
DataSinkFactory sinkFactory =
178+
FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
179+
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
180+
Configuration conf =
181+
Configuration.fromMap(
182+
ImmutableMap.<String, String>builder()
183+
.put(PaimonDataSinkOptions.METASTORE.key(), "filesystem")
184+
.put(
185+
PaimonDataSinkOptions.WAREHOUSE.key(),
186+
new File(
187+
temporaryFolder.toFile(),
188+
UUID.randomUUID().toString())
189+
.toString())
190+
.put(commitUserKey, "yux")
191+
.build());
192+
193+
DataSink dataSink =
194+
sinkFactory.createDataSink(
195+
new FactoryHelper.DefaultContext(
196+
conf, conf, Thread.currentThread().getContextClassLoader()));
197+
Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
198+
Assertions.assertThat(dataSink).extracting("commitUser").isEqualTo("yux");
199+
Assertions.assertThat(dataSink.getEventSinkProvider())
200+
.isInstanceOf(FlinkSinkProvider.class)
201+
.asInstanceOf(type(FlinkSinkProvider.class))
202+
.extracting(FlinkSinkProvider::getSink)
203+
.isExactlyInstanceOf(PaimonEventSink.class)
204+
.extracting("commitUser")
205+
.asString()
206+
.hasSize(39) // 3 ("yux") + 36 (Random UUID)
207+
.startsWith("yux");
208+
}
167209
}

0 commit comments

Comments
 (0)