Skip to content

Commit a18d7af

Browse files
committed
[FLINK-37818] Move KafkaCommitter to internal
There is no leaking of the specific class in any of the KafkaSink(Builder) signatures.
1 parent 5384ab2 commit a18d7af

File tree

7 files changed

+15
-15
lines changed

7 files changed

+15
-15
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2323
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
26-
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
27-
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
26+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
27+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
2828
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
2929
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
3030
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
22+
import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
2223

2324
import javax.annotation.Nullable;
2425

@@ -30,7 +31,7 @@
3031
* to commit transactions in {@link KafkaCommitter}.
3132
*/
3233
@Internal
33-
class KafkaCommittable {
34+
public class KafkaCommittable {
3435

3536
private final long producerId;
3637
private final short epoch;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
3232
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
3333
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
34+
import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
3435
import org.apache.flink.core.io.SimpleVersionedSerializer;
3536
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3637
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.connector.base.DeliveryGuarantee;
2626
import org.apache.flink.connector.kafka.MetricUtil;
2727
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
28+
import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
2829
import org.apache.flink.metrics.Counter;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java renamed to flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.connector.kafka.sink;
18+
package org.apache.flink.connector.kafka.sink.internal;
1919

20+
import org.apache.flink.annotation.Internal;
2021
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.connector.sink2.Committer;
22-
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
23-
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
24-
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
25-
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
23+
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
24+
import org.apache.flink.connector.kafka.sink.KafkaSink;
2625
import org.apache.flink.util.ExceptionUtils;
2726
import org.apache.flink.util.IOUtils;
2827

@@ -48,7 +47,8 @@
4847
*
4948
* <p>The committer is responsible to finalize the Kafka transactions by committing them.
5049
*/
51-
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
50+
@Internal
51+
public class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
5252

5353
private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
5454
public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java renamed to flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.connector.kafka.sink;
18+
package org.apache.flink.connector.kafka.sink.internal;
1919

2020
import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
21-
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
22-
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
23-
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
24-
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
21+
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
2522
import org.apache.flink.util.TestLoggerExtension;
2623

2724
import org.apache.kafka.clients.CommonClientConfigs;

0 commit comments

Comments
 (0)