Skip to content

Commit dd914d2

Browse files
garyrussellartembilan
authored andcommitted
GH-1319: Agg. Replying Template improvement
Resolves #1319 - change the release strategy to a `BiConsumer` - call the strategy during normal message delivery as before - now call the strategy when releasing a partial result after timeout - allow the strategy to modify the list (change from `Collection` to `List`). **cherry-pick to 2.3.x**
1 parent b2d6518 commit dd914d2

File tree

4 files changed

+57
-15
lines changed

4 files changed

+57
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.function.BiPredicate;
2728
import java.util.function.Predicate;
2829
import java.util.stream.Collectors;
2930

@@ -74,7 +75,7 @@ public class AggregatingReplyingKafkaTemplate<K, V, R>
7475

7576
private final Map<TopicPartition, Long> offsets = new HashMap<>();
7677

77-
private final Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy;
78+
private final BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy;
7879

7980
private Duration commitTimeout = Duration.ofSeconds(DEFAULT_COMMIT_TIMEOUT);
8081

@@ -86,11 +87,32 @@ public class AggregatingReplyingKafkaTemplate<K, V, R>
8687
* @param producerFactory the producer factory.
8788
* @param replyContainer the reply container.
8889
* @param releaseStrategy the release strategy.
90+
* @deprecated in favor of
91+
* {@link #AggregatingReplyingKafkaTemplate(ProducerFactory, GenericMessageListenerContainer, BiPredicate)}
8992
*/
93+
@Deprecated
9094
public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
9195
GenericMessageListenerContainer<K, Collection<ConsumerRecord<K, R>>> replyContainer,
9296
Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy) {
9397

98+
this(producerFactory, replyContainer, (records, timeout) -> timeout || releaseStrategy.test(records));
99+
}
100+
101+
/**
102+
* Construct an instance using the provided parameter arguments. The releaseStrategy
103+
* is consulted to determine when a collection is "complete".
104+
* @param producerFactory the producer factory.
105+
* @param replyContainer the reply container.
106+
* @param releaseStrategy the release strategy which is a {@link BiPredicate} which is
107+
* passed the current list and a boolean to indicate if this is for a normal delivery
108+
* or a timeout (when {@link #setReturnPartialOnTimeout(boolean)} is true. The
109+
* predicate may modify the list of records.
110+
* @since 2.3.5
111+
*/
112+
public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
113+
GenericMessageListenerContainer<K, Collection<ConsumerRecord<K, R>>> replyContainer,
114+
BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy) {
115+
94116
super(producerFactory, replyContainer);
95117
Assert.notNull(releaseStrategy, "'releaseStrategy' cannot be null");
96118
AckMode ackMode = replyContainer.getContainerProperties().getAckMode();
@@ -133,7 +155,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
133155
List<ConsumerRecord<K, R>> list = addToCollection(record, correlationId).stream()
134156
.map(RecordHolder::getRecord)
135157
.collect(Collectors.toList());
136-
if (this.releaseStrategy.test(list)) {
158+
if (this.releaseStrategy.test(list, false)) {
137159
ConsumerRecord<K, Collection<ConsumerRecord<K, R>>> done =
138160
new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
139161
done.headers()
@@ -164,12 +186,12 @@ protected synchronized boolean handleTimeout(CorrelationKey correlationId,
164186
List<ConsumerRecord<K, R>> list = removed.stream()
165187
.map(RecordHolder::getRecord)
166188
.collect(Collectors.toList());
167-
future.set(new ConsumerRecord<>(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, null, list));
168-
return true;
169-
}
170-
else {
171-
return false;
189+
if (this.releaseStrategy.test(list, true)) {
190+
future.set(new ConsumerRecord<>(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, null, list));
191+
return true;
192+
}
172193
}
194+
return false;
173195
}
174196

175197
private void checkOffsetsAndCommitIfNecessary(List<ConsumerRecord<K, R>> list, Consumer<?, ?> consumer) {

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.CountDownLatch;
3939
import java.util.concurrent.ExecutionException;
4040
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
4142
import java.util.concurrent.atomic.AtomicReference;
4243

4344
import org.apache.kafka.clients.consumer.Consumer;
@@ -295,7 +296,7 @@ public void testGoodWithSimpleMapper() throws Exception {
295296
@Test
296297
public void testAggregateNormal() throws Exception {
297298
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
298-
new TopicPartitionOffset(D_REPLY, 0), 2);
299+
new TopicPartitionOffset(D_REPLY, 0), 2, new AtomicInteger());
299300
try {
300301
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
301302
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
@@ -324,7 +325,7 @@ public void testAggregateNormal() throws Exception {
324325
@Disabled("time sensitive")
325326
public void testAggregateTimeout() throws Exception {
326327
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
327-
new TopicPartitionOffset(E_REPLY, 0), 3);
328+
new TopicPartitionOffset(E_REPLY, 0), 3, new AtomicInteger());
328329
try {
329330
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
330331
ProducerRecord<Integer, String> record = new ProducerRecord<>(E_REQUEST, null, null, null, "foo");
@@ -357,8 +358,9 @@ public void testAggregateTimeout() throws Exception {
357358
@Test
358359
@Disabled("time sensitive")
359360
public void testAggregateTimeoutPartial() throws Exception {
361+
AtomicInteger releaseCount = new AtomicInteger();
360362
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
361-
new TopicPartitionOffset(F_REPLY, 0), 3);
363+
new TopicPartitionOffset(F_REPLY, 0), 3, releaseCount);
362364
template.setReturnPartialOnTimeout(true);
363365
try {
364366
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
@@ -377,6 +379,7 @@ public void testAggregateTimeoutPartial() throws Exception {
377379
assertThat(value2).isNotSameAs(value1);
378380
assertThat(consumerRecord.topic())
379381
.isEqualTo(AggregatingReplyingKafkaTemplate.PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC);
382+
assertThat(releaseCount.get()).isEqualTo(2);
380383
}
381384
finally {
382385
template.stop();
@@ -400,7 +403,8 @@ public void testAggregateOrphansNotStored() throws Exception {
400403
correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
401404
return null;
402405
}).given(producer).send(any(), any());
403-
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container, coll -> true);
406+
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container,
407+
(list, timeout) -> true);
404408
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
405409
template.start();
406410
List<ConsumerRecord> records = new ArrayList<>();
@@ -475,7 +479,7 @@ public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartit
475479
}
476480

477481
public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemplate(
478-
TopicPartitionOffset topic, int releaseSize) {
482+
TopicPartitionOffset topic, int releaseSize, AtomicInteger releaseCount) {
479483

480484
ContainerProperties containerProperties = new ContainerProperties(topic);
481485
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
@@ -488,7 +492,11 @@ public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemp
488492
new KafkaMessageListenerContainer<>(cf, containerProperties);
489493
container.setBeanName(this.testName);
490494
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
491-
new AggregatingReplyingKafkaTemplate<>(this.config.pf(), container, coll -> coll.size() == releaseSize);
495+
new AggregatingReplyingKafkaTemplate<>(this.config.pf(), container,
496+
(list, timeout) -> {
497+
releaseCount.incrementAndGet();
498+
return list.size() == releaseSize;
499+
});
492500
template.setSharedReplyTopic(true);
493501
template.start();
494502
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,11 +503,15 @@ The template in <<replying-template>> is strictly for a single request/reply sce
503503
For cases where multiple receivers of a single message return a reply, you can use the `AggregatingReplyingKafkaTemplate`.
504504
This is an implementation of the client-side of the https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html[Scatter-Gather Enterprise Integration Pattern].
505505

506-
Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord` s is used to complete the `Future` returned by the `sendAndReceive` method.
506+
Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord` s is used to complete the `Future` returned by the `sendAndReceive` method.
507507

508508
There is an additional property `returnPartialOnTimeout` (default false).
509509
When this is set to `true`, instead of completing the future with a `KafkaReplyTimeoutException`, a partial result completes the future normally (as long as at least one reply record has been received).
510510

511+
Starting with version 2.3.5, the predicate is also called after a timeout (if `returnPartialOnTimeout` is `true`).
512+
The first argument is the current list of records; the second is `true` if this call is due to a timeout.
513+
The predicate can modify the list of records.
514+
511515
====
512516
[source, java]
513517
----

src/reference/asciidoc/whats-new.adoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Also see <<new-in-sik>>.
77
[[kafka-client-2.4]]
88
==== Kafka Client Version
99

10-
This version requires the 2.4.0 `kafka-clients` or higher.
10+
This version requires the 2.4.0 `kafka-clients` or higher and supports the new incremental rebalancing feature.
1111

1212
[[x24-carl]]
1313
==== ConsumerAwareRabalanceListener
@@ -25,6 +25,14 @@ See the IMPORTANT note at the end of <<rebalance-listeners>> for more informatio
2525

2626
The `isAckAfterHandle()` default implementation now returns true by default.
2727

28+
[[x24-agg]]
29+
==== AggregatingReplyingKafkaTemplate
30+
31+
The `releastStrategy` is now a `BiConsumer`.
32+
It is now called after a timeout (as well as when records arrive); the second parameter is `true` in the case of a call after a timeout.
33+
34+
See <<aggregating-request-reply>> for more information.
35+
2836
=== Migration Guide
2937

3038
* This release is essentially the same as the 2.3.x line, except it has been compiled against the 2.4 `kafka-clients` jar, due to a binary incompatibility.

0 commit comments

Comments
 (0)