Skip to content

Commit 24b6fc0

Browse files
authored
MINOR: Cleanup in ShareConsumerTest (#20940)
*What* PR fixes some typos/nit in `ShareConsumerTest` and fixes a test assertion in `ConsumerNetworkThreadTest`. Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 5ab7f0b commit 24b6fc0

File tree

2 files changed

+5
-7
lines changed

2 files changed

+5
-7
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3163,7 +3163,7 @@ public void testDescribeShareGroupOffsetsForEmptySharePartition() {
31633163
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId);
31643164
Admin adminClient = createAdminClient()) {
31653165
shareConsumer.subscribe(List.of(tp.topic()));
3166-
// Polling share consumer to make sure the share partition in created.
3166+
// Polling share consumer to make sure the share partition is created.
31673167
shareConsumer.poll(Duration.ofMillis(2000));
31683168
SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp);
31693169
// Since the partition is empty, and no records have been consumed, the share partition startOffset will be
@@ -3185,7 +3185,7 @@ public void testSharePartitionLagForSingleShareConsumer() {
31853185
producer.send(record);
31863186
producer.flush();
31873187
shareConsumer.subscribe(List.of(tp.topic()));
3188-
// Polling share consumer to make sure the share partition in created and teh record is consumed.
3188+
// Polling share consumer to make sure the share partition is created and the record is consumed.
31893189
waitedPoll(shareConsumer, 2500L, 1);
31903190
// Acknowledge and commit the consumed record to update the share partition state.
31913191
shareConsumer.commitSync();
@@ -3212,10 +3212,9 @@ public void testSharePartitionLagForMultipleShareConsumers() {
32123212
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes());
32133213
producer.send(record);
32143214
producer.flush();
3215-
producer.flush();
32163215
shareConsumer1.subscribe(List.of(tp.topic()));
32173216
shareConsumer2.subscribe(List.of(tp.topic()));
3218-
// Polling share consumer 1 to make sure the share partition in created and the records are consumed.
3217+
// Polling share consumer 1 to make sure the share partition is created and the records are consumed.
32193218
waitedPoll(shareConsumer1, 2500L, 1);
32203219
// Acknowledge and commit the consumed records to update the share partition state.
32213220
shareConsumer1.commitSync();
@@ -3769,7 +3768,6 @@ private SharePartitionOffsetInfo sharePartitionOffsetInfo(Admin adminClient, Str
37693768
private void verifySharePartitionLag(Admin adminClient, String groupId, TopicPartition tp, long expectedLag) throws InterruptedException {
37703769
TestUtils.waitForCondition(() -> {
37713770
SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp);
3772-
System.out.println("Current share partition description: " + sharePartitionOffsetInfo);
37733771
return sharePartitionOffsetInfo != null &&
37743772
sharePartitionOffsetInfo.lag().isPresent() &&
37753773
sharePartitionOffsetInfo.lag().get() == expectedLag;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.junit.jupiter.params.ParameterizedTest;
3636
import org.junit.jupiter.params.provider.MethodSource;
3737
import org.junit.jupiter.params.provider.ValueSource;
38-
import org.mockito.ArgumentMatchers;
3938

4039
import java.time.Duration;
4140
import java.util.LinkedList;
@@ -52,6 +51,7 @@
5251
import static org.junit.jupiter.api.Assertions.assertTrue;
5352
import static org.mockito.ArgumentMatchers.any;
5453
import static org.mockito.ArgumentMatchers.anyLong;
54+
import static org.mockito.ArgumentMatchers.eq;
5555
import static org.mockito.Mockito.mock;
5656
import static org.mockito.Mockito.times;
5757
import static org.mockito.Mockito.verify;
@@ -204,7 +204,7 @@ public void testRunOnceInvokesReaper() {
204204
public void testSendUnsentRequests() {
205205
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
206206
consumerNetworkThread.cleanup();
207-
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong(), ArgumentMatchers.booleanThat(onClose -> onClose));
207+
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong(), eq(true));
208208
}
209209

210210
@ParameterizedTest

0 commit comments

Comments
 (0)