Skip to content

Commit f459f88

Browse files
committed
Add Flink2 support for pipeline source connector.
1 parent fef9f2d commit f459f88

File tree

15 files changed

+1733
-0
lines changed

15 files changed

+1733
-0
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java

Lines changed: 902 additions & 0 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.utils;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.configuration.RestartStrategyOptions;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
25+
import java.time.Duration;
26+
27+
/**
28+
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
29+
* that allows Flink CDC to work across different Flink versions.
30+
*/
31+
@Internal
32+
public class RestartStrategyUtils {
33+
34+
/**
35+
* Disables the restart strategy for the given StreamExecutionEnvironment.
36+
*
37+
* @param env the StreamExecutionEnvironment to configure
38+
*/
39+
public static void configureNoRestartStrategy(StreamExecutionEnvironment env) {
40+
env.configure(new Configuration().set(RestartStrategyOptions.RESTART_STRATEGY, "none"));
41+
}
42+
43+
/**
44+
* Sets a fixed-delay restart strategy for the given StreamExecutionEnvironment.
45+
*
46+
* @param env the StreamExecutionEnvironment to configure
47+
* @param restartAttempts the number of restart attempts
48+
* @param delayBetweenAttempts the delay between restart attempts in milliseconds
49+
*/
50+
public static void configureFixedDelayRestartStrategy(
51+
StreamExecutionEnvironment env, int restartAttempts, long delayBetweenAttempts) {
52+
configureFixedDelayRestartStrategy(
53+
env, restartAttempts, Duration.ofMillis(delayBetweenAttempts));
54+
}
55+
56+
/**
57+
* Sets a fixed-delay restart strategy for the given StreamExecutionEnvironment.
58+
*
59+
* @param env the StreamExecutionEnvironment to configure
60+
* @param restartAttempts the number of restart attempts
61+
* @param delayBetweenAttempts the delay between restart attempts
62+
*/
63+
public static void configureFixedDelayRestartStrategy(
64+
StreamExecutionEnvironment env, int restartAttempts, Duration delayBetweenAttempts) {
65+
Configuration configuration = new Configuration();
66+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
67+
configuration.set(
68+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, restartAttempts);
69+
configuration.set(
70+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, delayBetweenAttempts);
71+
72+
env.configure(configuration);
73+
}
74+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.streaming.api.operators.collect;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.api.common.typeutils.TypeSerializer;
22+
import org.apache.flink.runtime.jobgraph.OperatorID;
23+
import org.apache.flink.streaming.api.environment.CheckpointConfig;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
27+
/**
28+
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
29+
* that allows Flink CDC to work across different Flink versions.
30+
*/
31+
@Internal
32+
public class CollectResultIteratorAdapter<T> extends CollectResultIterator<T> {
33+
34+
public CollectResultIteratorAdapter(
35+
CollectSinkOperator<T> collectSinkOperator,
36+
TypeSerializer<T> serializer,
37+
String accumulatorName,
38+
CheckpointConfig checkpointConfig,
39+
long resultFetchTimeout) {
40+
super(
41+
collectSinkOperator.getOperatorIdFuture(),
42+
serializer,
43+
accumulatorName,
44+
checkpointConfig,
45+
resultFetchTimeout);
46+
}
47+
48+
/**
49+
* Creates a CollectResultIteratorAdapter with the given operatorUid.
50+
*
51+
* <p>This constructor accepts a String operatorUid (typically from {@code
52+
* sink.getTransformation().getId()}) and converts it to a CompletableFuture&lt;OperatorID&gt;
53+
* for compatibility with Flink 1.x.
54+
*
55+
* @param operatorUid the operator ID string
56+
* @param serializer the type serializer
57+
* @param accumulatorName the accumulator name
58+
* @param checkpointConfig the checkpoint config
59+
* @param resultFetchTimeout the result fetch timeout
60+
*/
61+
public CollectResultIteratorAdapter(
62+
String operatorUid,
63+
TypeSerializer<T> serializer,
64+
String accumulatorName,
65+
CheckpointConfig checkpointConfig,
66+
long resultFetchTimeout) {
67+
super(
68+
toOperatorIdFuture(operatorUid),
69+
serializer,
70+
accumulatorName,
71+
checkpointConfig,
72+
resultFetchTimeout);
73+
}
74+
75+
public CollectResultIteratorAdapter(
76+
AbstractCollectResultBuffer<T> buffer,
77+
CollectSinkOperator<T> collectSinkOperator,
78+
String accumulatorName,
79+
int retryMillis) {
80+
super(buffer, collectSinkOperator.getOperatorIdFuture(), accumulatorName, retryMillis);
81+
}
82+
83+
/**
84+
* Creates a CollectResultIteratorAdapter with the given operatorUid and buffer.
85+
*
86+
* @param buffer the collect result buffer
87+
* @param operatorUid the operator ID string
88+
* @param accumulatorName the accumulator name
89+
* @param retryMillis the retry interval in milliseconds
90+
*/
91+
public CollectResultIteratorAdapter(
92+
AbstractCollectResultBuffer<T> buffer,
93+
String operatorUid,
94+
String accumulatorName,
95+
int retryMillis) {
96+
super(buffer, toOperatorIdFuture(operatorUid), accumulatorName, retryMillis);
97+
}
98+
99+
private static CompletableFuture<OperatorID> toOperatorIdFuture(String operatorUid) {
100+
// Generate OperatorID from the string's hashCode
101+
// This handles the case where operatorUid is from transformation.getId()
102+
OperatorID operatorID = new OperatorID(operatorUid.hashCode(), operatorUid.hashCode());
103+
return CompletableFuture.completedFuture(operatorID);
104+
}
105+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.table.catalog;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.table.api.Schema;
22+
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
/**
27+
* Compatibility adapter for {@link CatalogTable} in Flink 1.20.
28+
*
29+
* <p>This adapter provides a factory method to create CatalogTable instances compatible with Flink
30+
* 1.x.
31+
*/
32+
@Internal
33+
public class CatalogTableAdapter {
34+
35+
/**
36+
* Creates a CatalogTable using the Flink 1.x API.
37+
*
38+
* @param schema the table schema
39+
* @param comment the table comment
40+
* @param partitionKeys the partition keys
41+
* @param options the table options
42+
* @return a new CatalogTable instance
43+
*/
44+
public static CatalogTable of(
45+
Schema schema,
46+
String comment,
47+
List<String> partitionKeys,
48+
Map<String, String> options) {
49+
return CatalogTable.of(schema, comment, partitionKeys, options);
50+
}
51+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.table.factories;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.table.catalog.ObjectIdentifier;
23+
import org.apache.flink.table.catalog.ResolvedCatalogTable;
24+
import org.apache.flink.table.connector.source.DynamicTableSource;
25+
26+
/**
27+
* Compatibility adapter for {@link FactoryUtil} in Flink 1.20.
28+
*
29+
* <p>This adapter provides a factory method to create DynamicTableSource instances compatible with
30+
* Flink 1.x.
31+
*/
32+
@Internal
33+
public class FactoryUtilAdapter {
34+
35+
/**
36+
* Creates a DynamicTableSource using the Flink 1.x API.
37+
*
38+
* @param context the context (can be null)
39+
* @param objectIdentifier the object identifier
40+
* @param resolvedCatalogTable the resolved catalog table
41+
* @param configuration the configuration
42+
* @param classLoader the class loader
43+
* @param isStreaming whether it is streaming
44+
* @return a new DynamicTableSource instance
45+
*/
46+
public static DynamicTableSource createTableSource(
47+
Object context,
48+
ObjectIdentifier objectIdentifier,
49+
ResolvedCatalogTable resolvedCatalogTable,
50+
Configuration configuration,
51+
ClassLoader classLoader,
52+
boolean isStreaming) {
53+
return FactoryUtil.createTableSource(
54+
null,
55+
objectIdentifier,
56+
resolvedCatalogTable,
57+
configuration,
58+
classLoader,
59+
isStreaming);
60+
}
61+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.compat;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
22+
23+
/**
24+
* Compatibility adapter for {@link MockStreamingRuntimeContext} in Flink 1.20.
25+
*
26+
* <p>In Flink 1.x, MockStreamingRuntimeContext constructor signature is:
27+
* MockStreamingRuntimeContext(boolean isCheckpointingEnabled, int numberOfParallelSubtasks, int
28+
* subtaskIndex)
29+
*
30+
* <p>This adapter provides a factory method to create MockStreamingRuntimeContext with the Flink
31+
* 1.x constructor signature.
32+
*/
33+
@Internal
34+
public class MockStreamingRuntimeContextAdapter {
35+
36+
/**
37+
* Creates a MockStreamingRuntimeContext with the standard Flink 1.x constructor signature.
38+
*
39+
* @param isCheckpointingEnabled whether checkpointing is enabled
40+
* @param numberOfParallelSubtasks the total number of parallel subtasks
41+
* @param subtaskIndex the index of this subtask
42+
* @return a new MockStreamingRuntimeContext instance
43+
*/
44+
public static MockStreamingRuntimeContext create(
45+
boolean isCheckpointingEnabled, int numberOfParallelSubtasks, int subtaskIndex) {
46+
return new MockStreamingRuntimeContext(
47+
isCheckpointingEnabled, numberOfParallelSubtasks, subtaskIndex);
48+
}
49+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.compat;
19+
20+
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.api.common.state.OperatorStateStore;
22+
23+
/**
24+
* Compatibility adapter for {@link OperatorStateStore} in Flink 1.20.
25+
*
26+
* <p>In Flink 1.x, OperatorStateStore interface does not have the v2 state descriptor methods.
27+
*
28+
* <p>This adapter provides a no-op interface that extends OperatorStateStore, allowing test code to
29+
* implement only the Flink 1.x methods.
30+
*/
31+
@Internal
32+
public interface OperatorStateStoreAdapter extends OperatorStateStore {}

0 commit comments

Comments
 (0)