Skip to content

Commit f4670e5

Browse files
authored
GH-10163: Fallback to default errorChannel in async mode
Fixes: #10163 If RabbitStreamMessageHandler was used in async mode, error messages were dropped unless a sendFailureChannel or sendFailureChannelName were provided. This adds the "errorChannel" name as a fallback. * Add "errorChannel" fallback behaviour to KafkaProducerMessageHandler running in async mode so that errors are not lost. * Use NullChannel for test to bypass need for BeanFactory for getChannelResolver() Signed-off-by: Ryan Riley <[email protected]> **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent 93c7661 commit f4670e5

File tree

4 files changed

+82
-16
lines changed

4 files changed

+82
-16
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3030
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
3131
import org.springframework.integration.amqp.support.MappingUtils;
32+
import org.springframework.integration.context.IntegrationContextUtils;
3233
import org.springframework.integration.core.MessagingTemplate;
3334
import org.springframework.integration.handler.AbstractMessageHandler;
3435
import org.springframework.messaging.Message;
@@ -45,6 +46,7 @@
4546
*
4647
* @author Gary Russell
4748
* @author Chris Bono
49+
* @author Ryan Riley
4850
* @since 6.0
4951
*
5052
*/
@@ -173,14 +175,14 @@ public RabbitStreamOperations getStreamOperations() {
173175
}
174176

175177
protected @Nullable MessageChannel getSendFailureChannel() {
176-
if (this.sendFailureChannel != null) {
177-
return this.sendFailureChannel;
178-
}
179-
else if (this.sendFailureChannelName != null) {
180-
this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
181-
return this.sendFailureChannel;
178+
if (this.sendFailureChannel == null && (this.sendFailureChannelName != null || !this.sync)) {
179+
String sendFailureChannelNameToUse = this.sendFailureChannelName;
180+
if (sendFailureChannelNameToUse == null) {
181+
sendFailureChannelNameToUse = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
182+
}
183+
this.sendFailureChannel = getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
182184
}
183-
return null;
185+
return this.sendFailureChannel;
184186
}
185187

186188
protected @Nullable MessageChannel getSendSuccessChannel() {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,37 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import com.rabbitmq.stream.Consumer;
2425
import com.rabbitmq.stream.Environment;
26+
import com.rabbitmq.stream.Message;
2527
import com.rabbitmq.stream.OffsetSpecification;
28+
import com.rabbitmq.stream.codec.SimpleCodec;
2629
import org.junit.jupiter.api.Test;
30+
import org.mockito.ArgumentMatchers;
31+
import org.mockito.Mockito;
2732

2833
import org.springframework.integration.amqp.dsl.RabbitStream;
2934
import org.springframework.integration.amqp.support.RabbitTestContainer;
35+
import org.springframework.integration.channel.QueueChannel;
3036
import org.springframework.integration.support.MessageBuilder;
37+
import org.springframework.messaging.MessageHandlingException;
38+
import org.springframework.messaging.support.ErrorMessage;
3139
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
40+
import org.springframework.rabbit.stream.producer.StreamSendException;
3241

3342
import static org.assertj.core.api.Assertions.assertThat;
43+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
3444

3545
/**
3646
* @author Gary Russell
3747
* @author Chris Bono
3848
* @author Artem Bilan
49+
* @author Ryan Riley
3950
*
4051
* @since 6.0
4152
*/
@@ -117,4 +128,52 @@ void sendNative() throws InterruptedException {
117128
streamTemplate.close();
118129
}
119130

131+
@Test
132+
void errorChanelAsync() {
133+
Environment env = Mockito.mock(Environment.class);
134+
RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream");
135+
RabbitStreamTemplate spyStreamTemplate = Mockito.spy(streamTemplate);
136+
CompletableFuture<org.springframework.messaging.Message<?>> errorFuture = new CompletableFuture<>();
137+
Mockito.doReturn(errorFuture).when(spyStreamTemplate).send(ArgumentMatchers.any(Message.class));
138+
139+
QueueChannel errorChannel = new QueueChannel();
140+
RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(spyStreamTemplate)
141+
.sync(false)
142+
.sendFailureChannel(errorChannel)
143+
.getObject();
144+
SimpleCodec codec = new SimpleCodec();
145+
org.springframework.messaging.Message<Message> testMessage = MessageBuilder.withPayload(codec.messageBuilder()
146+
.addData(new byte[1])
147+
.build())
148+
.build();
149+
handler.handleMessage(testMessage);
150+
StreamSendException streamException = new StreamSendException("Test Error Code", 99);
151+
errorFuture.completeExceptionally(streamException);
152+
ErrorMessage errorMessage = (ErrorMessage) errorChannel.receive(1000);
153+
assertThat(errorMessage).extracting(org.springframework.messaging.Message::getPayload).isEqualTo(streamException);
154+
}
155+
156+
@Test
157+
void errorChanelSync() {
158+
Environment env = Mockito.mock(Environment.class);
159+
RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream");
160+
RabbitStreamTemplate spyStreamTemplate = Mockito.spy(streamTemplate);
161+
CompletableFuture<org.springframework.messaging.Message<?>> errorFuture = new CompletableFuture<>();
162+
errorFuture.exceptionally(ErrorMessage::new);
163+
Mockito.doReturn(errorFuture).when(spyStreamTemplate).send(ArgumentMatchers.any(Message.class));
164+
165+
QueueChannel errorChannel = new QueueChannel();
166+
RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(spyStreamTemplate)
167+
.sync(true)
168+
.sendFailureChannel(errorChannel)
169+
.getObject();
170+
SimpleCodec codec = new SimpleCodec();
171+
org.springframework.messaging.Message<Message> testMessage = MessageBuilder.withPayload(codec.messageBuilder()
172+
.addData(new byte[1])
173+
.build())
174+
.build();
175+
assertThatExceptionOfType(MessageHandlingException.class)
176+
.isThrownBy(() -> handler.handleMessage(testMessage));
177+
}
178+
120179
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.expression.EvaluationContext;
3939
import org.springframework.expression.Expression;
4040
import org.springframework.integration.MessageTimeoutException;
41+
import org.springframework.integration.context.IntegrationContextUtils;
4142
import org.springframework.integration.expression.ExpressionUtils;
4243
import org.springframework.integration.expression.FunctionExpression;
4344
import org.springframework.integration.expression.ValueExpression;
@@ -89,6 +90,7 @@
8990
* @author Marius Bogoevici
9091
* @author Biju Kunjummen
9192
* @author Tom van den Berge
93+
* @author Ryan Riley
9294
*
9395
* @since 5.4
9496
*/
@@ -434,16 +436,15 @@ public String getComponentType() {
434436
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
435437
}
436438

437-
@Nullable
438-
protected MessageChannel getSendFailureChannel() {
439-
if (this.sendFailureChannel != null) {
440-
return this.sendFailureChannel;
441-
}
442-
else if (this.sendFailureChannelName != null) {
443-
this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
444-
return this.sendFailureChannel;
439+
protected @Nullable MessageChannel getSendFailureChannel() {
440+
if (this.sendFailureChannel == null && (this.sendFailureChannelName != null || !this.sync)) {
441+
String sendFailureChannelNameToUse = this.sendFailureChannelName;
442+
if (sendFailureChannelNameToUse == null) {
443+
sendFailureChannelNameToUse = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
444+
}
445+
this.sendFailureChannel = getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
445446
}
446-
return null;
447+
return this.sendFailureChannel;
447448
}
448449

449450
protected @Nullable MessageChannel getSendSuccessChannel() {

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.expression.common.LiteralExpression;
5353
import org.springframework.expression.spel.standard.SpelExpressionParser;
5454
import org.springframework.integration.channel.DirectChannel;
55+
import org.springframework.integration.channel.NullChannel;
5556
import org.springframework.integration.channel.QueueChannel;
5657
import org.springframework.integration.expression.FunctionExpression;
5758
import org.springframework.integration.expression.ValueExpression;
@@ -116,6 +117,7 @@
116117
* @author Biju Kunjummen
117118
* @author Artem Bilan
118119
* @author Tom van den Berge
120+
* @author Ryan Riley
119121
*
120122
* @since 5.4
121123
*/
@@ -544,6 +546,7 @@ void testConsumeAndProduceTransaction() throws Exception {
544546
DirectChannel channel = new DirectChannel();
545547
inbound.setOutputChannel(channel);
546548
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
549+
handler.setSendFailureChannel(new NullChannel());
547550
handler.setMessageKeyExpression(new LiteralExpression("bar"));
548551
handler.setTopicExpression(new LiteralExpression("topic"));
549552
channel.subscribe(handler);
@@ -690,6 +693,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
690693
DirectChannel channel = new DirectChannel();
691694
inbound.setOutputChannel(channel);
692695
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
696+
handler.setSendFailureChannel(new NullChannel());
693697
handler.setMessageKeyExpression(new LiteralExpression("bar"));
694698
handler.setTopicExpression(new LiteralExpression("topic"));
695699
channel.subscribe(handler);

0 commit comments

Comments
 (0)