Skip to content

Commit c1442ea

Browse files
committed
[FLINK-37644] Remove mockito
1 parent d44f574 commit c1442ea

File tree

4 files changed

+40
-38
lines changed

4 files changed

+40
-38
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,26 @@ Method <org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource.getKa
1313
Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close()> calls method <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in (KafkaClusterMetricGroupManager.java:73)
1414
Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close(java.lang.String)> calls method <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in (KafkaClusterMetricGroupManager.java:62)
1515
Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.register(java.lang.String, org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup)> checks instanceof <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in (KafkaClusterMetricGroupManager.java:42)
16-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:475)
17-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:474)
18-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:485)
19-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()> has return type <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in (DynamicKafkaSourceReader.java:0)
20-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0)
16+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:479)
17+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:476)
18+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:489)
19+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelperSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0)
2120
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isActivelyConsumingSplits()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0)
22-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:383)
23-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:381)
24-
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:496)
21+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:385)
22+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:383)
23+
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2524
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2625
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2726
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
2827
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
29-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:177)
30-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:180)
31-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:176)
32-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:179)
33-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:176)
28+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
29+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
30+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
31+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:180)
32+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:177)
3433
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
3534
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
36-
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:153)
35+
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
3736
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
3837
Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0)
3938
Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -51,6 +50,6 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffs
5150
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
5251
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
5352
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
54-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:401)
53+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5554
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
5655
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
9191
private final List<DynamicKafkaSourceSplit> pendingSplits;
9292

9393
private MultipleFuturesAvailabilityHelper availabilityHelper;
94+
private int availabilityHelperSize;
9495
private boolean isActivelyConsumingSplits;
9596
private boolean isNoMoreSplits;
9697
private AtomicBoolean restartingReaders;
@@ -110,7 +111,8 @@ public DynamicKafkaSourceReader(
110111
.addGroup(KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP);
111112
this.kafkaClusterMetricGroupManager = new KafkaClusterMetricGroupManager();
112113
this.pendingSplits = new ArrayList<>();
113-
this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0);
114+
this.availabilityHelper =
115+
new MultipleFuturesAvailabilityHelper(this.availabilityHelperSize = 0);
114116
this.isNoMoreSplits = false;
115117
this.isActivelyConsumingSplits = false;
116118
this.restartingReaders = new AtomicBoolean();
@@ -472,7 +474,9 @@ public UserCodeClassLoader getUserCodeClassLoader() {
472474
*/
473475
private void completeAndResetAvailabilityHelper() {
474476
CompletableFuture<?> cachedPreviousFuture = availabilityHelper.getAvailableFuture();
475-
availabilityHelper = new MultipleFuturesAvailabilityHelper(clusterReaderMap.size());
477+
availabilityHelper =
478+
new MultipleFuturesAvailabilityHelper(
479+
this.availabilityHelperSize = clusterReaderMap.size());
476480
syncAvailabilityHelperWithReaders();
477481

478482
// We cannot immediately complete the previous future here. We must complete it only when
@@ -538,8 +542,8 @@ private InputStatus logAndReturnInputStatus(InputStatus inputStatus) {
538542
}
539543

540544
@VisibleForTesting
541-
public MultipleFuturesAvailabilityHelper getAvailabilityHelper() {
542-
return availabilityHelper;
545+
public int getAvailabilityHelperSize() {
546+
return availabilityHelperSize;
543547
}
544548

545549
@VisibleForTesting

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.junit.jupiter.api.AfterAll;
4040
import org.junit.jupiter.api.BeforeAll;
4141
import org.junit.jupiter.api.Test;
42-
import org.powermock.reflect.Whitebox;
4342

4443
import java.util.ArrayList;
4544
import java.util.Collections;
@@ -175,7 +174,7 @@ void testAvailabilityFutureUpdates() throws Exception {
175174
assertThat(futureAtInit)
176175
.as("future is not complete at fresh startup since no readers are created")
177176
.isNotDone();
178-
assertThat(getAvailabilityHelperSize(reader)).isZero();
177+
assertThat(reader.getAvailabilityHelperSize()).isZero();
179178

180179
reader.start();
181180
MetadataUpdateEvent metadata =
@@ -193,7 +192,7 @@ void testAvailabilityFutureUpdates() throws Exception {
193192
.as(
194193
"New future should have been produced since metadata triggers reader creation")
195194
.isNotSameAs(futureAfterSplitAssignment);
196-
assertThat(getAvailabilityHelperSize(reader)).isEqualTo(2);
195+
assertThat(reader.getAvailabilityHelperSize()).isEqualTo(2);
197196

198197
// remove cluster 0
199198
KafkaStream kafkaStream = DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
@@ -204,17 +203,10 @@ void testAvailabilityFutureUpdates() throws Exception {
204203
assertThat(futureAfterRemovingCluster0)
205204
.as("There should new future since the metadata has changed")
206205
.isNotSameAs(futureAfterSplitAssignment);
207-
assertThat(getAvailabilityHelperSize(reader)).isEqualTo(1);
206+
assertThat(reader.getAvailabilityHelperSize()).isEqualTo(1);
208207
}
209208
}
210209

211-
private int getAvailabilityHelperSize(DynamicKafkaSourceReader<?> reader) {
212-
return ((CompletableFuture<?>[])
213-
Whitebox.getInternalState(
214-
reader.getAvailabilityHelper(), "futuresToCombine"))
215-
.length;
216-
}
217-
218210
@Test
219211
void testReaderMetadataChangeWhenOneTopicChanges() throws Exception {
220212
try (DynamicKafkaSourceReader<Integer> reader =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.junit.jupiter.api.AfterAll;
5555
import org.junit.jupiter.api.BeforeAll;
5656
import org.junit.jupiter.api.Test;
57-
import org.mockito.Mockito;
5857

5958
import java.time.Duration;
6059
import java.util.ArrayList;
@@ -68,6 +67,7 @@
6867
import java.util.Optional;
6968
import java.util.Properties;
7069
import java.util.Set;
70+
import java.util.concurrent.atomic.AtomicBoolean;
7171
import java.util.function.Consumer;
7272
import java.util.function.Supplier;
7373

@@ -82,8 +82,6 @@
8282
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
8383
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
8484
import static org.assertj.core.api.Assertions.assertThat;
85-
import static org.mockito.Mockito.never;
86-
import static org.mockito.Mockito.verify;
8785

8886
/** Unit tests for {@link KafkaSourceReader}. */
8987
public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> {
@@ -508,7 +506,12 @@ public void testSupportsPausingOrResumingSplits() throws Exception {
508506

509507
@Test
510508
public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
511-
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);
509+
AtomicBoolean called = new AtomicBoolean();
510+
SerializableSupplier<String> rackIdSupplier =
511+
() -> {
512+
called.set(true);
513+
return "dummy";
514+
};
512515

513516
try (KafkaSourceReader<Integer> reader =
514517
(KafkaSourceReader<Integer>)
@@ -521,13 +524,17 @@ public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
521524
// Do nothing here
522525
}
523526

524-
verify(rackIdSupplier, never()).get();
527+
assertThat(called).isFalse();
525528
}
526529

527530
@Test
528531
public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Exception {
529-
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);
530-
Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
532+
AtomicBoolean called = new AtomicBoolean();
533+
SerializableSupplier<String> rackIdSupplier =
534+
() -> {
535+
called.set(true);
536+
return "use1-az1";
537+
};
531538

532539
try (KafkaSourceReader<Integer> reader =
533540
(KafkaSourceReader<Integer>)
@@ -542,7 +549,7 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
542549
new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L)));
543550
}
544551

545-
verify(rackIdSupplier).get();
552+
assertThat(called).isTrue();
546553
}
547554

548555
// ------------------------------------------

0 commit comments

Comments
 (0)