Skip to content

Commit b8ce3f0

Browse files
garyrussellartembilan
authored andcommitted
GH-1189: asyncAcks Improvement
When pending acks are released due to a gap being "filled", wake the consumer if it is currently paused.
1 parent 07b6240 commit b8ce3f0

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,9 +1190,6 @@ public void run() {
11901190
try {
11911191
pollAndInvoke();
11921192
}
1193-
catch (@SuppressWarnings(UNUSED) WakeupException e) {
1194-
// Ignore, we're stopping or applying immediate foreign acks
1195-
}
11961193
catch (NoOffsetForPartitionException nofpe) {
11971194
this.fatalError = true;
11981195
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
@@ -1425,7 +1422,7 @@ private ConsumerRecords<K, V> doPoll() {
14251422
ConsumerRecords<K, V> records;
14261423
if (this.isBatchListener && this.subBatchPerPartition) {
14271424
if (this.batchIterator == null) {
1428-
this.lastBatch = this.consumer.poll(this.pollTimeout);
1425+
this.lastBatch = pollConsumer();
14291426
captureOffsets(this.lastBatch);
14301427
if (this.lastBatch.count() == 0) {
14311428
return this.lastBatch;
@@ -1442,13 +1439,22 @@ private ConsumerRecords<K, V> doPoll() {
14421439
}
14431440
}
14441441
else {
1445-
records = this.consumer.poll(this.pollTimeout);
1442+
records = pollConsumer();
14461443
captureOffsets(records);
14471444
checkRebalanceCommits();
14481445
}
14491446
return records;
14501447
}
14511448

1449+
private ConsumerRecords<K, V> pollConsumer() {
1450+
try {
1451+
return this.consumer.poll(this.pollTimeout);
1452+
}
1453+
catch (WakeupException ex) {
1454+
return ConsumerRecords.empty();
1455+
}
1456+
}
1457+
14521458
private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
14531459
if (this.offsetsInThisBatch != null && records.count() > 0) {
14541460
this.offsetsInThisBatch.clear();
@@ -1733,7 +1739,7 @@ private void processAck(ConsumerRecord<K, V> record) {
17331739
if (!Thread.currentThread().equals(this.consumerThread)) {
17341740
try {
17351741
this.acks.put(record);
1736-
if (this.isManualImmediateAck) {
1742+
if (this.isManualImmediateAck || this.pausedForAsyncAcks) { // NOSONAR (sync)
17371743
this.consumer.wakeup();
17381744
}
17391745
}

0 commit comments

Comments
 (0)