You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Resolves#1319
- change the release strategy to a `BiConsumer`
- call the strategy during normal message delivery as before
- now call the strategy when releasing a partial result after timeout
- allow the strategy to modify the list (change from `Collection` to `List`).
**cherry-pick to 2.3.x**
# Conflicts:
# src/reference/asciidoc/whats-new.adoc
Copy file name to clipboardExpand all lines: src/reference/asciidoc/kafka.adoc
+5-1Lines changed: 5 additions & 1 deletion
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -503,11 +503,15 @@ The template in <<replying-template>> is strictly for a single request/reply sce
503
503
For cases where multiple receivers of a single message return a reply, you can use the `AggregatingReplyingKafkaTemplate`.
504
504
This is an implementation of the client-side of the https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html[Scatter-Gather Enterprise Integration Pattern].
505
505
506
-
Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord` s is used to complete the `Future` returned by the `sendAndReceive` method.
506
+
Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord` s is used to complete the `Future` returned by the `sendAndReceive` method.
507
507
508
508
There is an additional property `returnPartialOnTimeout` (default false).
509
509
When this is set to `true`, instead of completing the future with a `KafkaReplyTimeoutException`, a partial result completes the future normally (as long as at least one reply record has been received).
510
510
511
+
Starting with version 2.3.5, the predicate is also called after a timeout (if `returnPartialOnTimeout` is `true`).
512
+
The first argument is the current list of records; the second is `true` if this call is due to a timeout.
0 commit comments