Skip to content

Commit 547a3a3

Browse files
garyrussellartembilan
authored andcommitted
GH-1728: ReplyingKT Hook for User Errors
Resolves #1728 Add a hook allowing detection of server errors and complete the future exceptionally. **cherry-pick to 2.6.x (minus whatsnew.adoc)** # Conflicts: # spring-kafka-docs/src/main/asciidoc/whats-new.adoc
1 parent c33b0b3 commit 547a3a3

File tree

3 files changed

+117
-9
lines changed

3 files changed

+117
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -104,6 +104,8 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
104104

105105
private String replyPartitionHeaderName = KafkaHeaders.REPLY_PARTITION;
106106

107+
private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker = rec -> null;
108+
107109
private volatile boolean running;
108110

109111
private volatile boolean schedulerInitialized;
@@ -263,6 +265,16 @@ public void setReplyPartitionHeaderName(String replyPartitionHeaderName) {
263265
this.replyPartitionHeaderName = replyPartitionHeaderName;
264266
}
265267

268+
/**
269+
* Set a function to examine replies for an error returned by the server.
270+
* @param replyErrorChecker the error checker function.
271+
* @since 2.6.7
272+
*/
273+
public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker) {
274+
Assert.notNull(replyErrorChecker, "'replyErrorChecker' cannot be null");
275+
this.replyErrorChecker = replyErrorChecker;
276+
}
277+
266278
@Override
267279
public void afterPropertiesSet() {
268280
if (!this.schedulerSet && !this.schedulerInitialized) {
@@ -408,12 +420,10 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
408420
}
409421
else {
410422
boolean ok = true;
411-
if (record.value() == null) {
412-
DeserializationException de = checkDeserialization(record, this.logger);
413-
if (de != null) {
414-
ok = false;
415-
future.setException(de);
416-
}
423+
Exception exception = checkForErrors(record);
424+
if (exception != null) {
425+
ok = false;
426+
future.setException(exception);
417427
}
418428
if (ok) {
419429
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
@@ -424,6 +434,24 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
424434
});
425435
}
426436

437+
/**
438+
* Check for errors in a reply. The default implementation checks for {@link DeserializationException}s
439+
* and invokes the {@link #setReplyErrorChecker(Function) replyErrorChecker} function.
440+
* @param record the record.
441+
* @return the exception, or null if none.
442+
* @since 2.6.7
443+
*/
444+
@Nullable
445+
protected Exception checkForErrors(ConsumerRecord<K, R> record) {
446+
if (record.value() == null || record.key() == null) {
447+
DeserializationException de = checkDeserialization(record, this.logger);
448+
if (de != null) {
449+
return de;
450+
}
451+
}
452+
return this.replyErrorChecker.apply(record);
453+
}
454+
427455
/**
428456
* Return a {@link DeserializationException} if either the key or value failed
429457
* deserialization; null otherwise. If you need to determine whether it was the key or

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -110,7 +110,8 @@
110110
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
111111
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
112112
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
113-
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
113+
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
114+
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST })
114115
public class ReplyingKafkaTemplateTests {
115116

116117
public static final String A_REPLY = "aReply";
@@ -157,6 +158,10 @@ public class ReplyingKafkaTemplateTests {
157158

158159
public static final String K_REQUEST = "kRequest";
159160

161+
public static final String L_REPLY = "lReply";
162+
163+
public static final String L_REQUEST = "lRequest";
164+
160165
@Autowired
161166
private EmbeddedKafkaBroker embeddedKafka;
162167

@@ -203,6 +208,32 @@ public void testGood() throws Exception {
203208
}
204209
}
205210

211+
@Test
212+
void userDefinedException() throws Exception {
213+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(L_REPLY);
214+
try {
215+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
216+
template.setReplyErrorChecker(record -> {
217+
org.apache.kafka.common.header.Header error = record.headers().lastHeader("serverSentAnError");
218+
if (error != null) {
219+
return new IllegalStateException(new String(error.value()));
220+
}
221+
else {
222+
return null;
223+
}
224+
});
225+
ProducerRecord<Integer, String> record = new ProducerRecord<>(L_REQUEST, null, null, null, "foo", null);
226+
assertThatExceptionOfType(ExecutionException.class)
227+
.isThrownBy(() -> template.sendAndReceive(record).get(10, TimeUnit.SECONDS))
228+
.withCauseExactlyInstanceOf(IllegalStateException.class)
229+
.withMessageContaining("user error");
230+
}
231+
finally {
232+
template.stop();
233+
template.destroy();
234+
}
235+
}
236+
206237
@Test
207238
void testConsumerRecord() throws Exception {
208239
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(K_REPLY);
@@ -728,6 +759,14 @@ public String handleK(ConsumerRecord<String, String> in) {
728759
return in.value().toUpperCase();
729760
}
730761

762+
@KafkaListener(id = L_REQUEST, topics = L_REQUEST)
763+
@SendTo // default REPLY_TOPIC header
764+
public Message<String> handleL(String in) throws InterruptedException {
765+
return MessageBuilder.withPayload(in.toUpperCase())
766+
.setHeader("serverSentAnError", "user error")
767+
.build();
768+
}
769+
731770
}
732771

733772
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)

src/reference/asciidoc/kafka.adoc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,46 @@ Note that we can use Boot's auto-configured container factory to create the repl
637637
If a non-trivial deserializer is being used for replies, consider using an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> that delegates to your configured deserializer.
638638
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.
639639

640+
Starting with version 2.6.7, in addition to detecting `DeserializationException` s, the template will call the `replyErrorChecker` function, if provided.
641+
If it returns an exception, the future will be completed exceptionally.
642+
643+
Here is an example:
644+
645+
====
646+
[source, java]
647+
----
648+
template.setReplyErrorChecker(record -> {
649+
Header error = record.headers().lastHeader("serverSentAnError");
650+
if (error != null) {
651+
return new MyException(new String(error.value()));
652+
}
653+
else {
654+
return null;
655+
}
656+
});
657+
658+
...
659+
660+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
661+
try {
662+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
663+
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
664+
...
665+
}
666+
catch (InterruptedException e) {
667+
...
668+
}
669+
catch (ExecutionException e) {
670+
if (e.getCause instanceof MyException) {
671+
...
672+
}
673+
}
674+
catch (TimeoutException e) {
675+
...
676+
}
677+
----
678+
====
679+
640680
The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.
641681

642682
In this case, the following `@KafkaListener` application responds:
@@ -842,6 +882,7 @@ NOTE: If you use an <<error-handling-deserializer,`ErrorHandlingDeserializer`>>
842882
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
843883
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
844884
See its javadocs for more information.
885+
The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply.
845886

846887
[[receiving-messages]]
847888
==== Receiving Messages

0 commit comments

Comments
 (0)