Skip to content

Commit 9996019

Browse files
lvyanquanleonardBang
authored andcommitted
[FLINK-36648][Connectors/Kafka] Bump Flink version to 2.0.0
This closes #140
1 parent 2fdb66d commit 9996019

File tree

44 files changed

+389
-329
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+389
-329
lines changed

.github/workflows/push_pr.yml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,9 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 1.20.0 ]
32-
jdk: [ '8, 11, 17, 21' ]
31+
flink: [ 2.0.0 ]
32+
jdk: [ '17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
3535
flink_version: ${{ matrix.flink }}
3636
jdk_version: ${{ matrix.jdk }}
37-
python_test:
38-
strategy:
39-
matrix:
40-
flink: [ 1.20.0 ]
41-
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
42-
with:
43-
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.20-SNAPSHOT,
33+
flink: 2.1-SNAPSHOT,
3434
branch: main
3535
}, {
3636
flink: 1.19.1,
@@ -46,5 +46,5 @@ jobs:
4646
with:
4747
flink_version: ${{ matrix.flink_branches.flink }}
4848
connector_branch: ${{ matrix.flink_branches.branch }}
49-
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
49+
jdk_version: ${{ matrix.flink_branches.jdk || '17, 21' }}
5050
run_dependency_convergence: false

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
package org.apache.flink.tests.util.kafka;
2020

2121
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.configuration.MemorySize;
2223
import org.apache.flink.configuration.TaskManagerOptions;
2324
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
2425
import org.apache.flink.connector.testframe.container.FlinkContainers;
2526
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
2627
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
27-
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
2828
import org.apache.flink.test.resources.ResourceTestUtils;
2929
import org.apache.flink.test.util.JobSubmission;
3030
import org.apache.flink.util.TestLoggerExtension;
@@ -60,6 +60,7 @@
6060
import java.util.UUID;
6161
import java.util.stream.Collectors;
6262

63+
import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
6364
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
6465
import static org.assertj.core.api.Assertions.assertThat;
6566

@@ -99,8 +100,14 @@ class SmokeKafkaITCase {
99100
private static Configuration getConfiguration() {
100101
// modify configuration to have enough slots
101102
final Configuration flinkConfig = new Configuration();
102-
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
103-
flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
103+
flinkConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 3);
104+
flinkConfig.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
105+
flinkConfig.set(
106+
org.apache.flink.configuration.JobManagerOptions.TOTAL_PROCESS_MEMORY,
107+
MemorySize.ofMebiBytes(1024));
108+
flinkConfig.set(
109+
org.apache.flink.configuration.TaskManagerOptions.TOTAL_PROCESS_MEMORY,
110+
MemorySize.ofMebiBytes(1024));
104111
// Workaround for FLINK-36454 ; default config is entirely overwritten
105112
flinkConfig.setString(
106113
"env.java.opts.all",

flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.flink.streaming.kafka.test.base;
1919

20-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
21-
import org.apache.flink.api.java.utils.ParameterTool;
20+
import org.apache.flink.configuration.Configuration;
21+
import org.apache.flink.configuration.RestartStrategyOptions;
2222
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23+
import org.apache.flink.util.ParameterTool;
24+
25+
import java.time.Duration;
2326

2427
/** The util class for kafka example. */
2528
public class KafkaExampleUtil {
@@ -40,8 +43,14 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param
4043
+ "--group.id <some id>");
4144
}
4245

43-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
44-
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
46+
Configuration configuration = new Configuration();
47+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
48+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4);
49+
configuration.set(
50+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
51+
Duration.ofMillis(10000));
52+
StreamExecutionEnvironment env =
53+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
4554
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
4655
env.getConfig()
4756
.setGlobalJobParameters(

flink-connector-kafka-e2e-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.streaming.kafka.test;
1919

2020
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21-
import org.apache.flink.api.java.utils.ParameterTool;
2221
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2322
import org.apache.flink.connector.kafka.sink.KafkaSink;
2423
import org.apache.flink.connector.kafka.source.KafkaSource;
@@ -27,6 +26,7 @@
2726
import org.apache.flink.streaming.api.datastream.DataStream;
2827
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2928
import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
29+
import org.apache.flink.util.ParameterTool;
3030

3131
import org.apache.kafka.clients.producer.ProducerConfig;
3232
import org.apache.kafka.common.serialization.IntegerDeserializer;

flink-connector-kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@ under the License.
246246
<version>${flink.version}</version>
247247
<scope>test</scope>
248248
</dependency>
249+
<dependency>
250+
<groupId>org.apache.flink</groupId>
251+
<artifactId>flink-statebackend-forst</artifactId>
252+
<version>${flink.version}</version>
253+
<scope>test</scope>
254+
</dependency>
249255

250256
<!-- ArchUnit test dependencies -->
251257

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
22-
import org.apache.flink.api.connector.sink2.Sink;
22+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2323
import org.apache.flink.connector.base.DeliveryGuarantee;
2424
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
2525
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
@@ -91,7 +91,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
9191
DeliveryGuarantee deliveryGuarantee,
9292
Properties kafkaProducerConfig,
9393
String transactionalIdPrefix,
94-
Sink.InitContext sinkInitContext,
94+
WriterInitContext sinkInitContext,
9595
KafkaRecordSerializationSchema<IN> recordSerializer,
9696
SerializationSchema.InitializationContext schemaContext,
9797
Collection<KafkaWriterState> recoveredStates) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.connector.sink2.Committer;
2424
import org.apache.flink.api.connector.sink2.CommitterInitContext;
25+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2526
import org.apache.flink.api.dag.Transformation;
2627
import org.apache.flink.connector.base.DeliveryGuarantee;
2728
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
@@ -102,6 +103,7 @@ public static <IN> KafkaSinkBuilder<IN> builder() {
102103
return new KafkaSinkBuilder<>();
103104
}
104105

106+
@Internal
105107
@Override
106108
public Committer<KafkaCommittable> createCommitter(CommitterInitContext context) {
107109
return new KafkaCommitter(
@@ -119,14 +121,14 @@ public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {
119121

120122
@Internal
121123
@Override
122-
public KafkaWriter<IN> createWriter(InitContext context) throws IOException {
124+
public KafkaWriter<IN> createWriter(WriterInitContext context) throws IOException {
123125
return restoreWriter(context, Collections.emptyList());
124126
}
125127

126128
@Internal
127129
@Override
128130
public KafkaWriter<IN> restoreWriter(
129-
InitContext context, Collection<KafkaWriterState> recoveredState) {
131+
WriterInitContext context, Collection<KafkaWriterState> recoveredState) {
130132
KafkaWriter<IN> writer;
131133
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
132134
writer =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.api.common.operators.MailboxExecutor;
2222
import org.apache.flink.api.common.operators.ProcessingTimeService;
2323
import org.apache.flink.api.common.serialization.SerializationSchema;
24-
import org.apache.flink.api.connector.sink2.Sink;
24+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2525
import org.apache.flink.connector.base.DeliveryGuarantee;
2626
import org.apache.flink.connector.kafka.MetricUtil;
2727
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
@@ -110,7 +110,7 @@ class KafkaWriter<IN>
110110
KafkaWriter(
111111
DeliveryGuarantee deliveryGuarantee,
112112
Properties kafkaProducerConfig,
113-
Sink.InitContext sinkInitContext,
113+
WriterInitContext sinkInitContext,
114114
KafkaRecordSerializationSchema<IN> recordSerializer,
115115
SerializationSchema.InitializationContext schemaContext) {
116116
this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee");
@@ -135,8 +135,8 @@ class KafkaWriter<IN>
135135
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
136136
this.kafkaSinkContext =
137137
new DefaultKafkaSinkContext(
138-
sinkInitContext.getSubtaskId(),
139-
sinkInitContext.getNumberOfParallelSubtasks(),
138+
sinkInitContext.getTaskInfo().getIndexOfThisSubtask(),
139+
sinkInitContext.getTaskInfo().getNumberOfParallelSubtasks(),
140140
kafkaProducerConfig);
141141
try {
142142
recordSerializer.open(schemaContext, kafkaSinkContext);
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package org.apache.flink.connector.kafka.sink;
22

33
import org.apache.flink.annotation.Internal;
4-
import org.apache.flink.api.connector.sink2.StatefulSink;
5-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
4+
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
5+
import org.apache.flink.api.connector.sink2.Sink;
6+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
7+
import org.apache.flink.api.connector.sink2.SupportsCommitter;
8+
import org.apache.flink.api.connector.sink2.SupportsWriterState;
9+
import org.apache.flink.api.connector.sink2.WriterInitContext;
610

711
import java.io.IOException;
812
import java.util.Collection;
913

1014
/**
11-
* A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
15+
* A combination of {@link SupportsCommitter} and {@link SupportsWriterState}.
1216
*
1317
* <p>The purpose of this interface is to be able to pass an interface rather than a {@link
1418
* KafkaSink} implementation into the reducing sink which simplifies unit testing.
@@ -19,16 +23,15 @@
1923
*/
2024
@Internal
2125
public interface TwoPhaseCommittingStatefulSink<InputT, WriterStateT, CommT>
22-
extends TwoPhaseCommittingSink<InputT, CommT>, StatefulSink<InputT, WriterStateT> {
26+
extends SupportsCommitter<CommT>, SupportsWriterState<InputT, WriterStateT>, Sink<InputT> {
2327

24-
PrecommittingStatefulSinkWriter<InputT, WriterStateT, CommT> createWriter(InitContext context)
25-
throws IOException;
28+
PrecommittingStatefulSinkWriter<InputT, WriterStateT, CommT> createWriter(
29+
WriterInitContext context) throws IOException;
2630

2731
PrecommittingStatefulSinkWriter<InputT, WriterStateT, CommT> restoreWriter(
28-
InitContext context, Collection<WriterStateT> recoveredState) throws IOException;
32+
WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException;
2933

30-
/** A combination of {@link PrecommittingSinkWriter} and {@link StatefulSinkWriter}. */
34+
/** A combination of {@link StatefulSinkWriter}. */
3135
interface PrecommittingStatefulSinkWriter<InputT, WriterStateT, CommT>
32-
extends PrecommittingSinkWriter<InputT, CommT>,
33-
StatefulSinkWriter<InputT, WriterStateT> {}
36+
extends StatefulSinkWriter<InputT, WriterStateT>, CommittingSinkWriter<InputT, CommT> {}
3437
}

0 commit comments

Comments
 (0)