Skip to content

Commit 48362ca

Browse files
committed
[FLINK-38729] Add Flink2 support for Source/Pipeline connector.
1 parent 323b2bc commit 48362ca

File tree

144 files changed

+2388
-587
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+2388
-587
lines changed

.github/workflows/flink_cdc_ci.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,28 @@ jobs:
7777
with:
7878
java-versions: "[11]"
7979
modules: "['doris, elasticsearch, fluss, hudi, iceberg, kafka, maxcompute, mysql-pipeline, oceanbase-pipeline, paimon, postgres-pipeline, starrocks, oracle-pipeline']"
80+
pipeline-connectors-ut-2-x:
81+
name: Pipeline Connectors Unit Tests 2.x
82+
uses: ./.github/workflows/flink_cdc_base.yml
83+
with:
84+
java-versions: "[11]"
85+
flink-versions: "['2.2.0']"
86+
custom-maven-parameter: "-Pflink2"
87+
modules: "['pipeline_connectors_2_x']"
8088
source-ut:
8189
name: Source Unit Tests
8290
uses: ./.github/workflows/flink_cdc_base.yml
8391
with:
8492
java-versions: "[11]"
8593
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
94+
source-ut-2-x:
95+
name: Source Unit Tests 2.x
96+
uses: ./.github/workflows/flink_cdc_base.yml
97+
with:
98+
java-versions: "[11]"
99+
flink-versions: "['2.2.0']"
100+
custom-maven-parameter: "-Pflink2"
101+
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
86102
pipeline_e2e:
87103
strategy:
88104
max-parallel: 2

.github/workflows/modules.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@
4343
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
4444
]
4545

46+
MODULES_PIPELINE_CONNECTORS_2_X = [
47+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql",
48+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle",
49+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss",
50+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values",
51+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres",
52+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg",
53+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute",
54+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris",
55+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks",
56+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch"
57+
]
58+
4659
MODULES_MYSQL_SOURCE = [
4760
"flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc",
4861
"flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc"
@@ -152,6 +165,7 @@
152165
MODULES_CORE +
153166
MODULES_CORE_2_X +
154167
MODULES_PIPELINE_CONNECTORS +
168+
MODULES_PIPELINE_CONNECTORS_2_X +
155169
MODULES_MYSQL_SOURCE +
156170
MODULES_MYSQL_PIPELINE +
157171
MODULES_POSTGRES_SOURCE +

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.flink.runtime.jobgraph.OperatorID;
4141
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
4242
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
43+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
44+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
4345
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
4446
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
4547
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -131,6 +133,8 @@ void sinkTo(
131133
// Pre-write topology
132134
if (sink instanceof WithPreWriteTopology) {
133135
stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
136+
} else if (sink instanceof SupportsPreWriteTopology) {
137+
stream = ((SupportsPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
134138
}
135139

136140
if (sink instanceof TwoPhaseCommittingSink) {
@@ -193,6 +197,8 @@ private <CommT> void addCommittingTopology(
193197
if (sink instanceof WithPreCommitTopology) {
194198
preCommitted =
195199
((WithPreCommitTopology<Event, CommT>) sink).addPreCommitTopology(written);
200+
} else if (sink instanceof SupportsPreCommitTopology) {
201+
preCommitted = ((SupportsPreCommitTopology) sink).addPreCommitTopology(written);
196202
}
197203

198204
// TODO: Hard coding checkpoint
@@ -221,7 +227,7 @@ private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer
221227
// during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
222228
try {
223229
return (SimpleVersionedSerializer<CommT>)
224-
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
230+
sink.getClass().getMethod("getCommittableSerializer").invoke(sink);
225231
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
226232
throw new RuntimeException("Failed to get CommittableSerializer", e);
227233
}

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ private void sendFromTables(Consumer<TableId> tableIdConsumer) {
143143
}
144144

145145
@Override
146-
public void run(SourceContext<Event> context) throws InterruptedException {
146+
public void run(
147+
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Event>
148+
context)
149+
throws InterruptedException {
147150
Schema initialSchema =
148151
Schema.newBuilder()
149152
.physicalColumn("id", DataTypes.STRING())
@@ -265,7 +268,10 @@ private BinaryRecordData generateBinRec(Schema schema) {
265268
return generator.generate(rowObjects);
266269
}
267270

268-
private void collect(SourceContext<Event> sourceContext, Event event) {
271+
private void collect(
272+
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Event>
273+
sourceContext,
274+
Event event) {
269275
LOG.info("{}> Emitting event {}", subTaskId, event);
270276
sourceContext.collect(event);
271277
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ limitations under the License.
2929
<properties>
3030
<doris.connector.version>25.1.0</doris.connector.version>
3131
<mysql.connector.version>8.0.26</mysql.connector.version>
32+
<doris.flink.major.version>1.20</doris.flink.major.version>
3233
</properties>
3334

3435
<dependencies>
@@ -48,7 +49,7 @@ limitations under the License.
4849

4950
<dependency>
5051
<groupId>org.apache.doris</groupId>
51-
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
52+
<artifactId>flink-doris-connector-${doris.flink.major.version}</artifactId>
5253
<version>${doris.connector.version}</version>
5354
</dependency>
5455

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.doris.sink;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2120
import org.apache.flink.api.common.typeinfo.TypeInformation;
2221
import org.apache.flink.cdc.common.configuration.Configuration;
2322
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -48,6 +47,7 @@
4847
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
4948
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
5049
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
50+
import org.apache.flink.configuration.RestartStrategyOptions;
5151
import org.apache.flink.runtime.client.JobExecutionException;
5252
import org.apache.flink.streaming.api.datastream.DataStream;
5353
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,6 +56,7 @@
5656
import org.junit.jupiter.api.AfterEach;
5757
import org.junit.jupiter.api.BeforeAll;
5858
import org.junit.jupiter.api.BeforeEach;
59+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
5960
import org.junit.jupiter.params.ParameterizedTest;
6061
import org.junit.jupiter.params.provider.ValueSource;
6162

@@ -85,7 +86,9 @@ class DorisMetadataApplierITCase extends DorisSinkTestBase {
8586
public static void before() {
8687
env.setParallelism(DEFAULT_PARALLELISM);
8788
env.enableCheckpointing(3000);
88-
env.setRestartStrategy(RestartStrategies.noRestart());
89+
env.configure(
90+
new org.apache.flink.configuration.Configuration()
91+
.set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
8992
}
9093

9194
@BeforeEach
@@ -393,6 +396,7 @@ void testDorisRenameColumn(boolean batchMode) throws Exception {
393396

394397
@ParameterizedTest(name = "batchMode: {0}")
395398
@ValueSource(booleans = {true, false})
399+
@DisabledIfSystemProperty(named = "flink.profile", matches = "flink2")
396400
void testDorisAlterColumnType(boolean batchMode) throws Exception {
397401
TableId tableId =
398402
TableId.tableId(
@@ -413,6 +417,7 @@ void testDorisAlterColumnType(boolean batchMode) throws Exception {
413417

414418
@ParameterizedTest(name = "batchMode: {0}")
415419
@ValueSource(booleans = {true, false})
420+
@DisabledIfSystemProperty(named = "flink.profile", matches = "flink2")
416421
void testDorisAlterColumnTypeWithDefaultValue(boolean batchMode) throws Exception {
417422
TableId tableId =
418423
TableId.tableId(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.doris.sink;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2120
import org.apache.flink.api.common.typeinfo.TypeInformation;
2221
import org.apache.flink.api.connector.sink2.Sink;
2322
import org.apache.flink.cdc.common.configuration.Configuration;
@@ -35,6 +34,7 @@
3534
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
3635
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
3736
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
37+
import org.apache.flink.configuration.RestartStrategyOptions;
3838
import org.apache.flink.streaming.api.datastream.DataStream;
3939
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4040

@@ -69,7 +69,9 @@ class DorisPipelineITCase extends DorisSinkTestBase {
6969
public static void before() {
7070
env.setParallelism(DEFAULT_PARALLELISM);
7171
env.enableCheckpointing(3000);
72-
env.setRestartStrategy(RestartStrategies.noRestart());
72+
env.configure(
73+
new org.apache.flink.configuration.Configuration()
74+
.set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
7375
}
7476

7577
@BeforeEach

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.flink.cdc.connectors.elasticsearch.sink;
2121

22-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2322
import org.apache.flink.api.common.typeinfo.TypeInformation;
2423
import org.apache.flink.api.connector.sink2.Sink;
2524
import org.apache.flink.cdc.common.event.Event;
@@ -29,6 +28,8 @@
2928
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
3029
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
3130
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
31+
import org.apache.flink.configuration.Configuration;
32+
import org.apache.flink.configuration.RestartStrategyOptions;
3233
import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost;
3334
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetRequest;
3435
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetResponse;
@@ -188,7 +189,7 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
188189
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
189190
env.setParallelism(1);
190191
env.enableCheckpointing(3000);
191-
env.setRestartStrategy(RestartStrategies.noRestart());
192+
env.configure(new Configuration().set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
192193
return env;
193194
}
194195

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.flink.cdc.connectors.elasticsearch.sink;
2121

22-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2322
import org.apache.flink.api.common.typeinfo.TypeInformation;
2423
import org.apache.flink.api.connector.sink2.Sink;
2524
import org.apache.flink.cdc.common.event.Event;
@@ -29,6 +28,8 @@
2928
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
3029
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
3130
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
31+
import org.apache.flink.configuration.Configuration;
32+
import org.apache.flink.configuration.RestartStrategyOptions;
3233
import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
3334
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetRequest;
3435
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetResponse;
@@ -193,7 +194,7 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
193194
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
194195
env.setParallelism(1);
195196
env.enableCheckpointing(3000);
196-
env.setRestartStrategy(RestartStrategies.noRestart());
197+
env.configure(new Configuration().set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
197198
return env;
198199
}
199200

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.elasticsearch.sink;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2120
import org.apache.flink.api.common.typeinfo.TypeInformation;
2221
import org.apache.flink.api.connector.sink2.Sink;
2322
import org.apache.flink.cdc.common.event.Event;
@@ -27,6 +26,8 @@
2726
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
2827
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
2928
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
29+
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.configuration.RestartStrategyOptions;
3031
import org.apache.flink.streaming.api.datastream.DataStream;
3132
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3233

@@ -231,7 +232,7 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
231232
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
232233
env.setParallelism(1);
233234
env.enableCheckpointing(3000);
234-
env.setRestartStrategy(RestartStrategies.noRestart());
235+
env.configure(new Configuration().set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
235236
return env;
236237
}
237238

0 commit comments

Comments
 (0)