Skip to content

Commit fb28015

Browse files
committed
[FLINK-34554] Adding listing abort strategy
Use ListTransactionAPI to exactly query the lingering transactions and abort them. The writer needs to keep transactions that are to be committed in state to not accidentally also cancel them.
1 parent d43fcbd commit fb28015

20 files changed

+892
-133
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$
33
Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
44
Constructor <org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator.<init>(org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber, org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService, org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, java.util.Properties, org.apache.flink.api.connector.source.Boundedness, org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState, org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy$StoppableKafkaEnumContextProxyFactory)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceEnumerator.java:0)
55
Constructor <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.<init>(org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema, java.util.Properties)> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:114)
6+
Constructor <org.apache.flink.connector.kafka.sink.KafkaWriterState.<init>(java.lang.String, int, int, org.apache.flink.connector.kafka.sink.internal.TransactionOwnership, java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriterState.java:0)
67
Constructor <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.<init>(java.util.List, java.util.regex.Pattern, org.apache.flink.connector.kafka.sink.KafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
78
Field <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.metricGroups> has generic type <java.util.Map<java.lang.String, org.apache.flink.runtime.metrics.groups.AbstractMetricGroup>> with type argument depending on <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in (KafkaClusterMetricGroupManager.java:0)
89
Field <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.availabilityHelper> has type <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in (DynamicKafkaSourceReader.java:0)
@@ -25,14 +26,14 @@ Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducer
2526
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2627
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
2728
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
28-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:170)
29-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:173)
30-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:169)
31-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:172)
32-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:169)
29+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:177)
30+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:180)
31+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:176)
32+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:179)
33+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:176)
3334
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
3435
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
35-
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152)
36+
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:153)
3637
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
3738
Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0)
3839
Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -48,8 +49,8 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
4849
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
4950
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
5051
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
51-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:525)
52-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:569)
52+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
53+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
5354
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:401)
5455
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:580)
56+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)

0 commit comments

Comments
 (0)