Skip to content

Commit 0a71a88

Browse files
committed
add test
1 parent 1ee55c3 commit 0a71a88

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,4 +269,9 @@ private static void addAdditionalJars(
269269
}
270270
});
271271
}
272+
273+
@VisibleForTesting
274+
public StreamExecutionEnvironment getEnv() {
275+
return env;
276+
}
272277
}

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,21 @@
2424
import org.apache.flink.cdc.composer.definition.SinkDef;
2525
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
2626
import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
27+
import org.apache.flink.configuration.PipelineOptions;
28+
import org.apache.flink.configuration.ReadableConfig;
29+
import org.apache.flink.core.fs.Path;
2730

2831
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
2932

3033
import org.assertj.core.api.Assertions;
3134
import org.junit.jupiter.api.Test;
3235

36+
import java.time.Duration;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL;
41+
3342
/** A test for the {@link FlinkPipelineComposer}. */
3443
class FlinkPipelineComposerTest {
3544

@@ -59,4 +68,21 @@ void testCreateDataSinkFromSinkDef() {
5968
Assertions.assertThat(((DataSinkFactory1.TestDataSink) dataSink).getHost())
6069
.isEqualTo("0.0.0.0");
6170
}
71+
72+
@Test
73+
void testOfMiniCluster() {
74+
org.apache.flink.configuration.Configuration flinkConfig =
75+
new org.apache.flink.configuration.Configuration();
76+
flinkConfig.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(30));
77+
List<Path> additionalJars = new ArrayList<>();
78+
additionalJars.add(new Path("/path/to/additionalJars.jar"));
79+
FlinkPipelineComposer flinkPipelineComposer =
80+
FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars);
81+
ReadableConfig configuration = flinkPipelineComposer.getEnv().getConfiguration();
82+
83+
Assertions.assertThat(configuration.get(CHECKPOINTING_INTERVAL))
84+
.isEqualTo(Duration.ofSeconds(30));
85+
Assertions.assertThat(configuration.get(PipelineOptions.JARS))
86+
.contains("file:/path/to/additionalJars.jar");
87+
}
6288
}

0 commit comments

Comments
 (0)