Skip to content

Commit 19b4b3c

Browse files
committed
Add Flink2 support for pipeline source connector.
1 parent e9d2ba4 commit 19b4b3c

File tree

5 files changed

+134
-6
lines changed

5 files changed

+134
-6
lines changed

.github/workflows/modules.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values",
5151
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres",
5252
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg",
53-
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute"
53+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute"
54+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris"
5455
]
5556

5657
MODULES_MYSQL_SOURCE = [

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ limitations under the License.
2727
<name>flink-cdc-pipeline-connector-doris</name>
2828

2929
<properties>
30-
<doris.connector.version>25.1.0</doris.connector.version>
30+
<doris.connector.version>26.0.0</doris.connector.version>
3131
<mysql.connector.version>8.0.26</mysql.connector.version>
3232
</properties>
3333

@@ -154,4 +154,70 @@ limitations under the License.
154154
</plugin>
155155
</plugins>
156156
</build>
157+
158+
<profiles>
159+
<profile>
160+
<id>flink2</id>
161+
<properties>
162+
<flink.version>${flink.2.x.version}</flink.version>
163+
</properties>
164+
<dependencies>
165+
<dependency>
166+
<groupId>org.apache.flink</groupId>
167+
<artifactId>flink-streaming-java</artifactId>
168+
<version>${flink.2.x.version}</version>
169+
<scope>provided</scope>
170+
</dependency>
171+
<dependency>
172+
<groupId>org.apache.flink</groupId>
173+
<artifactId>flink-cdc-flink2-compat</artifactId>
174+
<version>${project.version}</version>
175+
<scope>provided</scope>
176+
</dependency>
177+
</dependencies>
178+
<build>
179+
<plugins>
180+
<plugin>
181+
<groupId>org.apache.maven.plugins</groupId>
182+
<artifactId>maven-dependency-plugin</artifactId>
183+
<executions>
184+
<execution>
185+
<id>copy-flink2-extra-libs</id>
186+
<phase>process-test-resources</phase>
187+
<goals>
188+
<goal>copy</goal>
189+
</goals>
190+
<configuration>
191+
<artifactItems>
192+
<artifactItem>
193+
<groupId>org.apache.flink</groupId>
194+
<artifactId>flink-cdc-flink2-compat</artifactId>
195+
<version>${project.version}</version>
196+
<outputDirectory>${project.build.directory}/flink2-extra-libs</outputDirectory>
197+
</artifactItem>
198+
<artifactItem>
199+
<groupId>org.apache.flink</groupId>
200+
<artifactId>flink-shaded-guava</artifactId>
201+
<version>${flink.2.x.shaded.guava.version}</version>
202+
<outputDirectory>${project.build.directory}/flink2-extra-libs</outputDirectory>
203+
</artifactItem>
204+
</artifactItems>
205+
</configuration>
206+
</execution>
207+
</executions>
208+
</plugin>
209+
<plugin>
210+
<groupId>org.apache.maven.plugins</groupId>
211+
<artifactId>maven-surefire-plugin</artifactId>
212+
<configuration>
213+
<additionalClasspathElements>
214+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-cdc-flink2-compat-${project.version}.jar</additionalClasspathElement>
215+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-shaded-guava-${flink.2.x.shaded.guava.version}.jar</additionalClasspathElement>
216+
</additionalClasspathElements>
217+
</configuration>
218+
</plugin>
219+
</plugins>
220+
</build>
221+
</profile>
222+
</profiles>
157223
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.doris.sink;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2120
import org.apache.flink.api.common.typeinfo.TypeInformation;
2221
import org.apache.flink.cdc.common.configuration.Configuration;
2322
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -48,6 +47,7 @@
4847
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
4948
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
5049
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
50+
import org.apache.flink.configuration.RestartStrategyOptions;
5151
import org.apache.flink.runtime.client.JobExecutionException;
5252
import org.apache.flink.streaming.api.datastream.DataStream;
5353
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -85,7 +85,9 @@ class DorisMetadataApplierITCase extends DorisSinkTestBase {
8585
public static void before() {
8686
env.setParallelism(DEFAULT_PARALLELISM);
8787
env.enableCheckpointing(3000);
88-
env.setRestartStrategy(RestartStrategies.noRestart());
88+
env.configure(
89+
new org.apache.flink.configuration.Configuration()
90+
.set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
8991
}
9092

9193
@BeforeEach

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.doris.sink;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2120
import org.apache.flink.api.common.typeinfo.TypeInformation;
2221
import org.apache.flink.api.connector.sink2.Sink;
2322
import org.apache.flink.cdc.common.configuration.Configuration;
@@ -35,6 +34,7 @@
3534
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
3635
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
3736
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
37+
import org.apache.flink.configuration.RestartStrategyOptions;
3838
import org.apache.flink.streaming.api.datastream.DataStream;
3939
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4040

@@ -69,7 +69,9 @@ class DorisPipelineITCase extends DorisSinkTestBase {
6969
public static void before() {
7070
env.setParallelism(DEFAULT_PARALLELISM);
7171
env.enableCheckpointing(3000);
72-
env.setRestartStrategy(RestartStrategies.noRestart());
72+
env.configure(
73+
new org.apache.flink.configuration.Configuration()
74+
.set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
7375
}
7476

7577
@BeforeEach
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.api.connector.sink2;
19+
20+
import org.apache.flink.annotation.Internal;
21+
22+
import java.io.IOException;
23+
import java.util.Collection;
24+
25+
/**
26+
* Compatibility adapter for Flink 2.2. This class is part of the multi-version compatibility layer
27+
* that allows Flink CDC to work across different Flink versions.
28+
*
29+
* <p>In Flink 1.x, StatefulSink was a specific interface for sinks with state. In Flink 2.x, this
30+
* concept is replaced by SupportsWriterState directly. This adapter provides backward
31+
* compatibility.
32+
*/
33+
@Internal
34+
public interface StatefulSink<InputT, WriterStateT>
35+
extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {
36+
37+
/**
38+
* @deprecated Use {@link #restoreWriter(WriterInitContext, Collection)} instead.
39+
*/
40+
@Deprecated
41+
default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
42+
InitContext context, Collection<WriterStateT> retrievedState) throws IOException {
43+
return null;
44+
}
45+
46+
/**
47+
* Restore a writer from state.
48+
*
49+
* @param context the init context
50+
* @param retrievedState the retrieved state
51+
* @return the restored writer
52+
*/
53+
default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
54+
WriterInitContext context, Collection<WriterStateT> retrievedState) throws IOException {
55+
return null;
56+
}
57+
}

0 commit comments

Comments
 (0)