Skip to content

Commit 98b5c5f

Browse files
committed
Add Flink2 support for pipeline source connector.
1 parent 147d7c1 commit 98b5c5f

File tree

2 files changed

+38
-28
lines changed

2 files changed

+38
-28
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,7 @@ void testConsumingAllEvents(boolean parallelismSnapshot)
154154
+ " PRIMARY KEY (name) NOT ENFORCED"
155155
+ ") WITH ("
156156
+ " 'connector' = 'values',"
157-
+ " 'sink-insert-only' = 'false',"
158-
+ " 'sink-expected-messages-num' = '20'"
157+
+ " 'sink-insert-only' = 'false'"
159158
+ ")";
160159
tEnv.executeSql(sourceDDL);
161160
tEnv.executeSql(sinkDDL);
@@ -187,7 +186,20 @@ void testConsumingAllEvents(boolean parallelismSnapshot)
187186
statement.execute("DELETE FROM inventory.products WHERE id=111;");
188187
}
189188

190-
waitForSinkSize("sink", 20);
189+
// Wait for the final aggregated result
190+
// Note: We use waitForSinkResult instead of waitForSinkSize because Flink 2.x
191+
// optimizations may reduce the number of intermediate changelog messages
192+
String[] expected =
193+
new String[] {
194+
"scooter,3.140",
195+
"car battery,8.100",
196+
"12-pack drill bits,0.800",
197+
"hammer,2.625",
198+
"rocks,5.100",
199+
"jacket,0.600",
200+
"spare tire,22.200"
201+
};
202+
waitForSinkResult("sink", Arrays.asList(expected));
191203

192204
/*
193205
* <pre>
@@ -215,17 +227,6 @@ void testConsumingAllEvents(boolean parallelismSnapshot)
215227
* </pre>
216228
*/
217229

218-
String[] expected =
219-
new String[] {
220-
"scooter,3.140",
221-
"car battery,8.100",
222-
"12-pack drill bits,0.800",
223-
"hammer,2.625",
224-
"rocks,5.100",
225-
"jacket,0.600",
226-
"spare tire,22.200"
227-
};
228-
229230
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
230231
Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
231232

@@ -891,8 +892,7 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception {
891892
+ " PRIMARY KEY (name) NOT ENFORCED"
892893
+ ") WITH ("
893894
+ " 'connector' = 'values',"
894-
+ " 'sink-insert-only' = 'false',"
895-
+ " 'sink-expected-messages-num' = '20'"
895+
+ " 'sink-insert-only' = 'false'"
896896
+ ")";
897897
tEnv.executeSql(sourceDDL);
898898
tEnv.executeSql(sinkDDL);
@@ -923,7 +923,20 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception {
923923
statement.execute("DELETE FROM inventory.products WHERE id=111;");
924924
}
925925

926-
waitForSinkSize("sink", 20);
926+
// Wait for the final aggregated result
927+
// Note: We use waitForSinkResult instead of waitForSinkSize because Flink 2.x
928+
// optimizations may reduce the number of intermediate changelog messages
929+
String[] expected =
930+
new String[] {
931+
"scooter,3.140",
932+
"car battery,8.100",
933+
"12-pack drill bits,0.800",
934+
"hammer,2.625",
935+
"rocks,5.100",
936+
"jacket,0.600",
937+
"spare tire,22.200"
938+
};
939+
waitForSinkResult("sink", Arrays.asList(expected));
927940

928941
/*
929942
* <pre>
@@ -951,17 +964,6 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception {
951964
* </pre>
952965
*/
953966

954-
String[] expected =
955-
new String[] {
956-
"scooter,3.140",
957-
"car battery,8.100",
958-
"12-pack drill bits,0.800",
959-
"hammer,2.625",
960-
"rocks,5.100",
961-
"jacket,0.600",
962-
"spare tire,22.200"
963-
};
964-
965967
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
966968
Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
967969

flink-cdc-flink2-compat/src/main/java/org/apache/flink/api/connector/sink2/Sink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,13 @@ public interface InitContext extends org.apache.flink.api.connector.sink2.InitCo
7373
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
7474
return Optional.empty();
7575
}
76+
77+
default int getSubtaskId() {
78+
return this.getTaskInfo().getIndexOfThisSubtask();
79+
}
80+
81+
default int getNumberOfParallelSubtasks() {
82+
return this.getTaskInfo().getNumberOfParallelSubtasks();
83+
}
7684
}
7785
}

0 commit comments

Comments
 (0)