Skip to content

Commit ff35062

Browse files
garyrussellartembilan
authored andcommitted
GH-1189: Support Out-of-Order Manual Commits
Preparation for #1189 Defer committing out of order offsets until the gaps are filled. Pause the consumer until all acks are acknowledged.
1 parent c1854a2 commit ff35062

File tree

7 files changed

+504
-119
lines changed

7 files changed

+504
-119
lines changed

spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,115 @@
11
[[migration]]
2+
=== Changes between 2.6 and 2.7
3+
4+
[[x27-kafka-client]]
5+
==== Kafka Client Version
6+
7+
This version requires the 2.7.0 `kafka-clients`.
8+
It is also compatible with the 2.8.0 clients, since version 2.7.1; see <<update-deps>>.
9+
10+
[[x-27-nonblock-retry]]
11+
==== Non-Blocking Delayed Retries Using Topics
12+
13+
This significant new feature is added in this release.
14+
When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later.
15+
A series of such retry topics can be configured, with increasing delays.
16+
See <<retry-topic>> for more information.
17+
18+
[[x27-container]]
19+
==== Listener Container Changes
20+
21+
The `onlyLogRecordMetadata` container property is now `true` by default.
22+
23+
A new container property `stopImmediate` is now available.
24+
25+
See <<container-props>> for more information.
26+
27+
Error handlers that use a `BackOff` between delivery attempts (e.g. `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor`) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.
28+
See <<after-rollback>> and <<seek-to-current>> for more information.
29+
30+
Error handlers and after rollback processors that extend `FailedRecordProcessor` can now be configured with one or more `RetryListener` s to receive information about retry and recovery progress.
31+
32+
See See <<after-rollback>>, <<seek-to-current>>, and <<recovering-batch-eh>> for more information.
33+
34+
The `RecordInterceptor` now has additional methods called after the listener returns (normally, or by throwing an exception).
35+
It also has a sub-interface `ConsumerAwareRecordInterceptor`.
36+
In addition, there is now a `BatchInterceptor` for batch listeners.
37+
See <<message-listener-container>> for more information.
38+
39+
[[x27-listener]]
40+
==== `@KafkaListener` Changes
41+
42+
You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners).
43+
See <<kafka-validation>> for more information.
44+
45+
You can now set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>`.
46+
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
47+
See <<listener-error-handlers>> for more information.
48+
49+
You can now modify `@KafkaListener` annotations during application initialization.
50+
See <<kafkalistener-attrs>> for more information.
51+
52+
[[x27-dlt]]
53+
==== `DeadLetterPublishingRecover` Changes
54+
55+
Now, if both the key and value fail deserialization, the original values are published to the DLT.
56+
Previously, the value was populated but the key `DeserializationException` remained in the headers.
57+
There is a breaking API change, if you subclassed the recoverer and overrode the `createProducerRecord` method.
58+
59+
In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.
60+
61+
See <<dead-letters>> for more information.
62+
63+
[[x27-CKTM]]
64+
==== `ChainedKafkaTransactionManager` is Deprecated
65+
66+
See <<transactions>> for more information.
67+
68+
[[x27-RKT]]
69+
==== `ReplyingKafkaTemplate` Changes
70+
71+
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
72+
73+
Support for sending and receiving `spring-messaging` `Message<?>` s has been added.
74+
75+
See <<replying-template>> for more information.
76+
77+
[[x27-streams]]
78+
==== Kafka Streams Changes
79+
80+
By default, the `StreamsBuilderFactoryBean` is now configured to not clean up local state.
81+
See <<streams-config>> for more information.
82+
83+
[[x27-admin]]
84+
==== `KafkaAdmin` Changes
85+
86+
New methods `createOrModifyTopics` and `describeTopics` have been added.
87+
`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean.
88+
See <<configuring-topics>> for more information.
89+
90+
[[x27-conv]]
91+
==== `MessageConverter` Changes
92+
93+
It is now possible to add a `spring-messaging` `SmartMessageConverter` to the `MessagingMessageConverter`, allowing content negotiation based on the `contentType` header.
94+
See <<messaging-message-conversion>> for more information.
95+
96+
[[x27-sequencing]]
97+
==== Sequencing `@KafkaListener` s
98+
99+
See <<sequencing>> for more information.
100+
101+
[[x27-exp-backoff]]
102+
==== `ExponentialBackOffWithMaxRetries`
103+
104+
A new `BackOff` implementation is provided, making it more convenient to configure the max retries.
105+
See <<exp-backoff>> for more information.
106+
107+
[[x27-delegating-eh]]
108+
==== Conditional Delegating Error Handlers
109+
110+
These new error handlers can be configured to delegate to different error handlers, depending on the exception type.
111+
See <<cond-eh>> for more information.
112+
2113
=== Changes between 2.5 and 2.6
3114

4115
[[x26-kafka-client]]

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,13 @@ The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
12671267
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
12681268
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
12691269

1270+
[[ooo-commits]]
1271+
===== Manually Committing Offsets
1272+
1273+
Normally, when using `AckMode.MANUAL` or `AckMode.MANUAL_IMMEDIATE`, the acknowledgments must be acknowledged in order, because Kafka does not maintain state for each record, only a committed offset for each group/partition.
1274+
Starting with version 2.8, you can now set the container property `asyncAcks`, which allows the acknowledgments for records returned by the poll to be acknowledged in any order.
1275+
The listener container will defer the out-of-order commits until the missing acknowledgments are received.
1276+
The consumer will be paused (no new records delivered) until all the offsets for the previous poll have been committed.
12701277

12711278
[[kafka-listener-annotation]]
12721279
===== `@KafkaListener` Annotation

spring-kafka-docs/src/main/asciidoc/quick-tour.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ However, the quickest way to get started is to use https://start.spring.io[start
5151

5252
This quick tour works with the following versions:
5353

54-
* Apache Kafka Clients 2.7.0 or 2.8.0 (see <<update-deps>>).
54+
* Apache Kafka Clients 2.8.0
5555
* Spring Framework 5.3.x
5656
* Minimum Java version: 8
5757

Lines changed: 9 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,16 @@
1-
=== What's New in 2.7 Since 2.6
1+
=== What's New in 2.8 Since 2.7
22

3-
This section covers the changes made from version 2.6 to version 2.7.
3+
This section covers the changes made from version 2.7 to version 2.8.
44
For changes in earlier version, see <<history>>.
55

6-
[[x27-kafka-client]]
6+
[[x28-kafka-client]]
77
==== Kafka Client Version
88

9-
This version requires the 2.7.0 `kafka-clients`.
10-
It is also compatible with the 2.8.0 clients, since version 2.7.1; see <<update-deps>>.
9+
This version requires the 2.8.0 `kafka-clients`.
1110

12-
[[x-27-nonblock-retry]]
13-
==== Non-Blocking Delayed Retries Using Topics
11+
[[x28-ooo-commits]]
12+
==== Out of Order Manual Commits
1413

15-
This significant new feature is added in this release.
16-
When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later.
17-
A series of such retry topics can be configured, with increasing delays.
18-
See <<retry-topic>> for more information.
19-
20-
[[x27-container]]
21-
==== Listener Container Changes
22-
23-
The `onlyLogRecordMetadata` container property is now `true` by default.
24-
25-
A new container property `stopImmediate` is now available.
26-
27-
See <<container-props>> for more information.
28-
29-
Error handlers that use a `BackOff` between delivery attempts (e.g. `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor`) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.
30-
See <<after-rollback>> and <<seek-to-current>> for more information.
31-
32-
Error handlers and after rollback processors that extend `FailedRecordProcessor` can now be configured with one or more `RetryListener` s to receive information about retry and recovery progress.
33-
34-
See See <<after-rollback>>, <<seek-to-current>>, and <<recovering-batch-eh>> for more information.
35-
36-
The `RecordInterceptor` now has additional methods called after the listener returns (normally, or by throwing an exception).
37-
It also has a sub-interface `ConsumerAwareRecordInterceptor`.
38-
In addition, there is now a `BatchInterceptor` for batch listeners.
39-
See <<message-listener-container>> for more information.
40-
41-
[[x27-listener]]
42-
==== `@KafkaListener` Changes
43-
44-
You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners).
45-
See <<kafka-validation>> for more information.
46-
47-
You can now set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>`.
48-
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
49-
See <<listener-error-handlers>> for more information.
50-
51-
You can now modify `@KafkaListener` annotations during application initialization.
52-
See <<kafkalistener-attrs>> for more information.
53-
54-
[[x27-dlt]]
55-
==== `DeadLetterPublishingRecover` Changes
56-
57-
Now, if both the key and value fail deserialization, the original values are published to the DLT.
58-
Previously, the value was populated but the key `DeserializationException` remained in the headers.
59-
There is a breaking API change, if you subclassed the recoverer and overrode the `createProducerRecord` method.
60-
61-
In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.
62-
63-
See <<dead-letters>> for more information.
64-
65-
[[x27-CKTM]]
66-
==== `ChainedKafkaTransactionManager` is Deprecated
67-
68-
See <<transactions>> for more information.
69-
70-
[[x27-RKT]]
71-
==== `ReplyingKafkaTemplate` Changes
72-
73-
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
74-
75-
Support for sending and receiving `spring-messaging` `Message<?>` s has been added.
76-
77-
See <<replying-template>> for more information.
78-
79-
[[x27-streams]]
80-
==== Kafka Streams Changes
81-
82-
By default, the `StreamsBuilderFactoryBean` is now configured to not clean up local state.
83-
See <<streams-config>> for more information.
84-
85-
[[x27-admin]]
86-
==== `KafkaAdmin` Changes
87-
88-
New methods `createOrModifyTopics` and `describeTopics` have been added.
89-
`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean.
90-
See <<configuring-topics>> for more information.
91-
92-
[[x27-conv]]
93-
==== `MessageConverter` Changes
94-
95-
It is now possible to add a `spring-messaging` `SmartMessageConverter` to the `MessagingMessageConverter`, allowing content negotiation based on the `contentType` header.
96-
See <<messaging-message-conversion>> for more information.
97-
98-
[[x27-sequencing]]
99-
==== Sequencing `@KafkaListener` s
100-
101-
See <<sequencing>> for more information.
102-
103-
[[x27-exp-backoff]]
104-
==== `ExponentialBackOffWithMaxRetries`
105-
106-
A new `BackOff` implementation is provided, making it more convenient to configure the max retries.
107-
See <<exp-backoff>> for more information.
108-
109-
[[x27-delegating-eh]]
110-
==== Conditional Delegating Error Handlers
111-
112-
These new error handlers can be configured to delegate to different error handlers, depending on the exception type.
113-
See <<cond-eh>> for more information.
14+
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously).
15+
The container will defer the commit until the missing offset is acknowledged.
16+
See <<ooo-commits>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ public enum EOSMode {
279279

280280
private boolean stopImmediate;
281281

282+
private boolean asyncAcks;
283+
282284
/**
283285
* Create properties for a container that will subscribe to the specified topics.
284286
* @param topics the topics.
@@ -830,6 +832,28 @@ public void setStopImmediate(boolean stopImmediate) {
830832
this.stopImmediate = stopImmediate;
831833
}
832834

835+
/**
836+
* When true, async manual acknowledgments are supported.
837+
* @return true for async ack support.
838+
* @since 2.8
839+
*/
840+
public boolean isAsyncAcks() {
841+
return this.asyncAcks;
842+
}
843+
844+
/**
845+
* Set to true to support asynchronous record acknowledgments. Only applies with
846+
* {@link AckMode#MANUAL} or {@link AckMode#MANUAL_IMMEDIATE}. Out of order offset
847+
* commits are deferred until all previous offsets in the partition have been
848+
* committed. The consumer is paused, if necessary, until all acks have been
849+
* completed.
850+
* @param asyncAcks true to use async acks.
851+
* @since 2.8
852+
*/
853+
public void setAsyncAcks(boolean asyncAcks) {
854+
this.asyncAcks = asyncAcks;
855+
}
856+
833857
private void adviseListenerIfNeeded() {
834858
if (!CollectionUtils.isEmpty(this.adviceChain)) {
835859
if (AopUtils.isAopProxy(this.messageListener)) {

0 commit comments

Comments
 (0)