Skip to content

Commit a8c7b83

Browse files
damn1kkAlexander Strelchenko
authored andcommitted
fixed long rebalancing for AckMode.EXACTLY_ONCE (#371)
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce. Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease. Fixes #371.
1 parent 9b09225 commit a8c7b83

File tree

2 files changed

+79
-8
lines changed

2 files changed

+79
-8
lines changed

src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -172,7 +172,11 @@ private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
172172
long end = maxDelayRebalance + System.currentTimeMillis();
173173
do {
174174
try {
175-
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline);
175+
log.debug(
176+
"Rebalancing; waiting for {} records in pipeline or awaitingTransaction: {}",
177+
inPipeline,
178+
this.awaitingTransaction.get()
179+
);
176180
Thread.sleep(interval);
177181
commitEvent.runIfRequired(true);
178182
} catch (InterruptedException e) {
@@ -374,7 +378,10 @@ public void run() {
374378
}
375379

376380
if (!records.isEmpty()) {
377-
this.commitBatch.addUncommitted(records);
381+
// Handled separately using transactional KafkaSender
382+
if (ackMode != AckMode.EXACTLY_ONCE) {
383+
this.commitBatch.addUncommitted(records);
384+
}
378385
r = Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
379386
log.debug("Emitting {} records, requested now {}", records.count(), r);
380387
sink.emitNext(records, ConsumerEventLoop.this);

src/test/java/reactor/kafka/receiver/internals/ConsumerEventLoopTest.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323
import org.apache.kafka.clients.consumer.ConsumerRecords;
2424
import org.apache.kafka.common.TopicPartition;
25-
import org.apache.kafka.common.record.TimestampType;
2625
import org.junit.Test;
2726
import org.mockito.ArgumentCaptor;
2827
import reactor.core.publisher.Sinks.Many;
@@ -39,6 +38,7 @@
3938
import java.util.Map;
4039
import java.util.Set;
4140
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.Executors;
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.atomic.AtomicBoolean;
4444

@@ -73,16 +73,13 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
7373
t.printStackTrace();
7474
return null;
7575
}).given(sink).emitError(any(), any());
76-
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
77-
scheduler, consumer, t -> false, sink, new AtomicBoolean());
7876
Set<String> topics = new HashSet<>();
7977
topics.add("test");
8078
Collection<TopicPartition> partitions = new ArrayList<>();
8179
TopicPartition tp = new TopicPartition("test", 0);
8280
partitions.add(tp);
8381
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
84-
record.put(tp, Collections.singletonList(
85-
new ConsumerRecord("test", 0, 0, 0, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, null)));
82+
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
8683
ConsumerRecords records = new ConsumerRecords(record);
8784
CountDownLatch latch = new CountDownLatch(2);
8885
AtomicBoolean paused = new AtomicBoolean();
@@ -102,6 +99,8 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
10299
paused.set(false);
103100
return null;
104101
}).given(consumer).resume(any());
102+
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
103+
scheduler, consumer, t -> false, sink, new AtomicBoolean());
105104
loop.onRequest(1);
106105
loop.onRequest(1);
107106
CommittableBatch batch = loop.commitEvent.commitBatch;
@@ -115,4 +114,69 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
115114
assertThat(batch.deferred).hasSize(0);
116115
}
117116

117+
@SuppressWarnings({ "rawtypes", "unchecked" })
118+
@Test
119+
public void revokePartitionsForExactlyOnce() throws InterruptedException {
120+
AtomicBoolean isPartitionRevokeFinished = new AtomicBoolean();
121+
ReceiverOptions opts = ReceiverOptions.create(
122+
Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "deferredCommitsWithRevoke"))
123+
.maxDelayRebalance(Duration.ofSeconds(30))
124+
.addRevokeListener(p -> isPartitionRevokeFinished.set(true))
125+
.subscription(Collections.singletonList("test"));
126+
Consumer consumer = mock(Consumer.class);
127+
Scheduler scheduler = KafkaSchedulers.newEvent(opts.groupId());
128+
Many sink = mock(Many.class);
129+
willAnswer(inv -> {
130+
Throwable t = inv.getArgument(0);
131+
t.printStackTrace();
132+
return null;
133+
}).given(sink).emitError(any(), any());
134+
Set<String> topics = new HashSet<>();
135+
topics.add("test");
136+
Collection<TopicPartition> partitions = new ArrayList<>();
137+
TopicPartition tp = new TopicPartition("test", 0);
138+
partitions.add(tp);
139+
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
140+
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
141+
ConsumerRecords records = new ConsumerRecords(record);
142+
CountDownLatch latch = new CountDownLatch(2);
143+
AtomicBoolean paused = new AtomicBoolean();
144+
willAnswer(inv -> {
145+
Thread.sleep(10);
146+
latch.countDown();
147+
if (paused.get()) {
148+
return ConsumerRecords.empty();
149+
}
150+
return records;
151+
}).given(consumer).poll(any());
152+
willAnswer(inv -> {
153+
paused.set(true);
154+
return null;
155+
}).given(consumer).pause(any());
156+
willAnswer(inv -> {
157+
paused.set(false);
158+
return null;
159+
}).given(consumer).resume(any());
160+
161+
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.EXACTLY_ONCE, null, opts,
162+
scheduler, consumer, t -> false, sink, new AtomicBoolean());
163+
164+
loop.onRequest(1);
165+
loop.onRequest(1);
166+
167+
CommittableBatch batch = loop.commitEvent.commitBatch;
168+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
169+
assertThat(batch.uncommitted).hasSize(0);
170+
171+
loop.awaitingTransaction.set(true);
172+
ArgumentCaptor<ConsumerRebalanceListener> rebal = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
173+
verify(consumer).subscribe(any(Collection.class), rebal.capture());
174+
175+
Executors.newSingleThreadExecutor().execute(() -> rebal.getValue().onPartitionsRevoked(partitions));
176+
await().pollDelay(Duration.ofSeconds(5)).until(() -> true);
177+
assertThat(isPartitionRevokeFinished.get()).isFalse();
178+
loop.awaitingTransaction.set(false);
179+
await().atMost(Duration.ofSeconds(10)).until(isPartitionRevokeFinished::get);
180+
}
181+
118182
}

0 commit comments

Comments
 (0)