Skip to content

Commit 74a51d4

Browse files
garyrussellartembilan
authored andcommitted
GH-1353: Clarify AckMode with transactions
Resolves #1353 `AckMode` is not used with transactions - clarify the semantics in docs and javadocs. Also fix that offsets were not previously sent to the transaction with an incorrectly configured MANUAL ack mode.
1 parent e3868dc commit 74a51d4

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,11 @@ public void setMessageListener(Object messageListener) {
287287
* <li>MANUAL: Listener is responsible for acking - use a
288288
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
289289
* </ul>
290+
* Ignored when transactions are being used. Transactional consumers commit offsets
291+
* with semantics equivalent to {@code RECORD} or {@code BATCH}, depending on
292+
* the listener type.
290293
* @param ackMode the {@link AckMode}; default BATCH.
294+
* @see #setTransactionManager(PlatformTransactionManager)
291295
*/
292296
public void setAckMode(AckMode ackMode) {
293297
Assert.notNull(ackMode, "'ackMode' cannot be null");

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
14511451
}
14521452
}
14531453
}
1454-
if (!this.isAnyManualAck && !this.autoCommit) {
1454+
if (producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
14551455
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
14561456
this.acks.put(record);
14571457
}
@@ -1811,7 +1811,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record,
18111811
this.acks.add(record);
18121812
}
18131813
}
1814-
else if (!this.isAnyManualAck && !this.autoCommit) {
1814+
else if (producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
18151815
this.acks.add(record);
18161816
}
18171817
if (producer != null) {

src/reference/asciidoc/kafka.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ Previously, the Kafka default (`true`) was used if the property was not set.
829829

830830
The consumer `poll()` method returns one or more `ConsumerRecords`.
831831
The `MessageListener` is called for each record.
832-
The following lists describes the action taken by the container for each `AckMode`:
832+
The following lists describes the action taken by the container for each `AckMode` (when transactions are not being used):
833833

834834
* `RECORD`: Commit the offset when the listener returns after processing the record.
835835
* `BATCH`: Commit the offset when all the records returned by the `poll()` have been processed.
@@ -840,6 +840,8 @@ The following lists describes the action taken by the container for each `AckMod
840840
After that, the same semantics as `BATCH` are applied.
841841
* `MANUAL_IMMEDIATE`: Commit the offset immediately when the `Acknowledgment.acknowledge()` method is called by the listener.
842842

843+
When using <<transactions, transactions>>, the offset(s) are sent to the transaction and the semantics are equivalent to `RECORD` or `BATCH`, depending on the listener type (record or batch).
844+
843845
NOTE: `MANUAL`, and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener` or a `BatchAcknowledgingMessageListener`.
844846
See <<message-listeners, Message Listeners>>.
845847

0 commit comments

Comments
 (0)