Skip to content

Commit 4a65e8f

Browse files
committed
Add Flink2 support for pipeline source connector.
1 parent 323b2bc commit 4a65e8f

File tree

129 files changed

+2734
-454
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

129 files changed

+2734
-454
lines changed

.github/workflows/flink_cdc_ci.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,28 @@ jobs:
7777
with:
7878
java-versions: "[11]"
7979
modules: "['doris, elasticsearch, fluss, hudi, iceberg, kafka, maxcompute, mysql-pipeline, oceanbase-pipeline, paimon, postgres-pipeline, starrocks, oracle-pipeline']"
80+
pipeline-connectors-ut-2-x:
81+
name: Pipeline Connectors Unit Tests 2.x
82+
uses: ./.github/workflows/flink_cdc_base.yml
83+
with:
84+
java-versions: "[11]"
85+
flink-versions: "['2.2.0']"
86+
custom-maven-parameter: "-Pflink2"
87+
modules: "['pipeline_connectors_2_x']"
8088
source-ut:
8189
name: Source Unit Tests
8290
uses: ./.github/workflows/flink_cdc_base.yml
8391
with:
8492
java-versions: "[11]"
8593
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
94+
source-ut-2-x:
95+
name: Source Unit Tests 2.x
96+
uses: ./.github/workflows/flink_cdc_base.yml
97+
with:
98+
java-versions: "[11]"
99+
flink-versions: "['2.2.0']"
100+
custom-maven-parameter: "-Pflink2"
101+
modules: "['mysql-source', 'postgres-source, oceanbase-source, tidb, vitess', 'oracle, sqlserver', 'db2, mongodb']"
86102
pipeline_e2e:
87103
strategy:
88104
max-parallel: 2

.github/workflows/modules.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@
4343
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
4444
]
4545

46+
MODULES_PIPELINE_CONNECTORS_2_X = [
47+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql",
48+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle",
49+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss",
50+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values",
51+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres",
52+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg",
53+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute"
54+
]
55+
4656
MODULES_MYSQL_SOURCE = [
4757
"flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc",
4858
"flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc"
@@ -152,6 +162,7 @@
152162
MODULES_CORE +
153163
MODULES_CORE_2_X +
154164
MODULES_PIPELINE_CONNECTORS +
165+
MODULES_PIPELINE_CONNECTORS_2_X +
155166
MODULES_MYSQL_SOURCE +
156167
MODULES_MYSQL_PIPELINE +
157168
MODULES_POSTGRES_SOURCE +

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
import org.apache.flink.runtime.jobgraph.OperatorID;
4141
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
4242
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
43+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
4344
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
4445
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
45-
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
4646
import org.apache.flink.streaming.api.datastream.DataStream;
4747
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4848
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -129,8 +129,8 @@ void sinkTo(
129129
OperatorUidGenerator operatorUidGenerator) {
130130
DataStream<Event> stream = input;
131131
// Pre-write topology
132-
if (sink instanceof WithPreWriteTopology) {
133-
stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
132+
if (sink instanceof SupportsPreWriteTopology) {
133+
stream = ((SupportsPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
134134
}
135135

136136
if (sink instanceof TwoPhaseCommittingSink) {
@@ -221,7 +221,7 @@ private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer
221221
// during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
222222
try {
223223
return (SimpleVersionedSerializer<CommT>)
224-
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
224+
sink.getClass().getMethod("getCommittableSerializer").invoke(sink);
225225
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
226226
throw new RuntimeException("Failed to get CommittableSerializer", e);
227227
}

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.flink.cdc.composer.flink.translator;
1919

20+
import org.apache.flink.api.connector.sink2.Sink;
2021
import org.apache.flink.api.connector.sink2.SinkWriter;
2122
import org.apache.flink.api.connector.sink2.WriterInitContext;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.cdc.common.event.Event;
2425
import org.apache.flink.runtime.jobgraph.OperatorID;
25-
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
26+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
2627
import org.apache.flink.streaming.api.datastream.DataStream;
2728
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2829
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -68,7 +69,8 @@ void testPreWriteWithoutCommitSink() {
6869

6970
private static class EmptyEvent implements Event {}
7071

71-
private static class MockPreWriteWithoutCommitSink implements WithPreWriteTopology<Event> {
72+
private static class MockPreWriteWithoutCommitSink
73+
implements Sink<Event>, SupportsPreWriteTopology<Event> {
7274

7375
private final String uid;
7476

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ private void sendFromTables(Consumer<TableId> tableIdConsumer) {
143143
}
144144

145145
@Override
146-
public void run(SourceContext<Event> context) throws InterruptedException {
146+
public void run(
147+
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Event>
148+
context)
149+
throws InterruptedException {
147150
Schema initialSchema =
148151
Schema.newBuilder()
149152
.physicalColumn("id", DataTypes.STRING())
@@ -265,7 +268,10 @@ private BinaryRecordData generateBinRec(Schema schema) {
265268
return generator.generate(rowObjects);
266269
}
267270

268-
private void collect(SourceContext<Event> sourceContext, Event event) {
271+
private void collect(
272+
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Event>
273+
sourceContext,
274+
Event event) {
269275
LOG.info("{}> Emitting event {}", subTaskId, event);
270276
sourceContext.collect(event);
271277
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,39 @@ limitations under the License.
139139
</plugins>
140140
</build>
141141

142+
<profiles>
143+
<profile>
144+
<id>flink2</id>
145+
<properties>
146+
<flink.version>${flink.2.x.version}</flink.version>
147+
</properties>
148+
<dependencies>
149+
<dependency>
150+
<groupId>org.apache.flink</groupId>
151+
<artifactId>flink-streaming-java</artifactId>
152+
<version>${flink.2.x.version}</version>
153+
<scope>provided</scope>
154+
</dependency>
155+
<dependency>
156+
<groupId>org.apache.flink</groupId>
157+
<artifactId>flink-cdc-flink2-compat</artifactId>
158+
<version>${project.version}</version>
159+
<scope>provided</scope>
160+
</dependency>
161+
</dependencies>
162+
<build>
163+
<plugins>
164+
<plugin>
165+
<groupId>org.apache.maven.plugins</groupId>
166+
<artifactId>maven-dependency-plugin</artifactId>
167+
</plugin>
168+
<plugin>
169+
<groupId>org.apache.maven.plugins</groupId>
170+
<artifactId>maven-surefire-plugin</artifactId>
171+
</plugin>
172+
</plugins>
173+
</build>
174+
</profile>
175+
</profiles>
176+
142177
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,4 +319,71 @@ limitations under the License.
319319
</plugin>
320320
</plugins>
321321
</build>
322+
323+
<profiles>
324+
<profile>
325+
<id>flink2</id>
326+
<properties>
327+
<flink.version>${flink.2.x.version}</flink.version>
328+
<flink.major.version>2.1</flink.major.version>
329+
</properties>
330+
<dependencies>
331+
<dependency>
332+
<groupId>org.apache.flink</groupId>
333+
<artifactId>flink-streaming-java</artifactId>
334+
<version>${flink.2.x.version}</version>
335+
<scope>provided</scope>
336+
</dependency>
337+
<dependency>
338+
<groupId>org.apache.flink</groupId>
339+
<artifactId>flink-cdc-flink2-compat</artifactId>
340+
<version>${project.version}</version>
341+
<scope>provided</scope>
342+
</dependency>
343+
</dependencies>
344+
<build>
345+
<plugins>
346+
<plugin>
347+
<groupId>org.apache.maven.plugins</groupId>
348+
<artifactId>maven-dependency-plugin</artifactId>
349+
<executions>
350+
<execution>
351+
<id>copy-flink2-extra-libs</id>
352+
<phase>process-test-resources</phase>
353+
<goals>
354+
<goal>copy</goal>
355+
</goals>
356+
<configuration>
357+
<artifactItems>
358+
<artifactItem>
359+
<groupId>org.apache.flink</groupId>
360+
<artifactId>flink-cdc-flink2-compat</artifactId>
361+
<version>${project.version}</version>
362+
<outputDirectory>${project.build.directory}/flink2-extra-libs</outputDirectory>
363+
</artifactItem>
364+
<artifactItem>
365+
<groupId>org.apache.flink</groupId>
366+
<artifactId>flink-shaded-guava</artifactId>
367+
<version>${flink.2.x.shaded.guava.version}</version>
368+
<outputDirectory>${project.build.directory}/flink2-extra-libs</outputDirectory>
369+
</artifactItem>
370+
</artifactItems>
371+
</configuration>
372+
</execution>
373+
</executions>
374+
</plugin>
375+
<plugin>
376+
<groupId>org.apache.maven.plugins</groupId>
377+
<artifactId>maven-surefire-plugin</artifactId>
378+
<configuration>
379+
<additionalClasspathElements>
380+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-cdc-flink2-compat-${project.version}.jar</additionalClasspathElement>
381+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-shaded-guava-${flink.2.x.shaded.guava.version}.jar</additionalClasspathElement>
382+
</additionalClasspathElements>
383+
</configuration>
384+
</plugin>
385+
</plugins>
386+
</build>
387+
</profile>
388+
</profiles>
322389
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
8686
return dataStream;
8787
}
8888

89-
@Override
9089
public Committer<WriteResultWrapper> createCommitter() {
9190
return new IcebergCommitter(catalogOptions);
9291
}
@@ -103,6 +102,7 @@ public SimpleVersionedSerializer<WriteResultWrapper> getCommittableSerializer()
103102
return new WriteResultWrapperSerializer();
104103
}
105104

105+
@Deprecated
106106
@Override
107107
public SinkWriter<Event> createWriter(InitContext context) {
108108
long lastCheckpointId =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import org.apache.flink.cdc.common.event.TableId;
2121
import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper;
22+
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
2223
import org.apache.flink.runtime.state.StateSnapshotContext;
2324
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
2425
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
2526
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26-
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
2727
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
2828
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2929
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -46,7 +46,7 @@
4646

4747
/** An Operator to trigger file compaction conditionally. */
4848
public class CompactionOperator
49-
extends AbstractStreamOperator<CommittableMessage<WriteResultWrapper>>
49+
extends AbstractStreamOperatorAdapter<CommittableMessage<WriteResultWrapper>>
5050
implements OneInputStreamOperator<
5151
CommittableMessage<WriteResultWrapper>, CommittableMessage<WriteResultWrapper>> {
5252
protected static final Logger LOGGER = LoggerFactory.getLogger(CompactionOperator.class);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.iceberg.catalog.TableIdentifier;
4242
import org.assertj.core.api.Assertions;
4343
import org.junit.jupiter.api.Test;
44+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
4445
import org.junit.jupiter.api.io.TempDir;
4546

4647
import java.io.File;
@@ -54,6 +55,7 @@
5455
import java.util.stream.Collectors;
5556

5657
/** Tests for {@link CompactionOperator}. */
58+
@DisabledIfSystemProperty(named = "flink.profile", matches = "flink2")
5759
public class CompactionOperatorTest {
5860

5961
@TempDir public static java.nio.file.Path temporaryFolder;

0 commit comments

Comments
 (0)