Skip to content

Commit f4b171d

Browse files
rrileycaartembilan
authored andcommitted
GH-10163: Fallback to default errorChannel in async mode
Fixes: #10163 If RabbitStreamMessageHandler was used in async mode, error messages were dropped unless a sendFailureChannel or sendFailureChannelName were provided. This adds the "errorChannel" name as a fallback. * Add "errorChannel" fallback behaviour to KafkaProducerMessageHandler running in async mode so that errors are not lost. * Use NullChannel for test to bypass need for BeanFactory for getChannelResolver() Signed-off-by: Ryan Riley <[email protected]> **Auto-cherry-pick to `6.4.x`** # Conflicts: # spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java
1 parent 9794a9f commit f4b171d

File tree

4 files changed

+82
-16
lines changed

4 files changed

+82
-16
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
2828
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
2929
import org.springframework.integration.amqp.support.MappingUtils;
30+
import org.springframework.integration.context.IntegrationContextUtils;
3031
import org.springframework.integration.core.MessagingTemplate;
3132
import org.springframework.integration.handler.AbstractMessageHandler;
3233
import org.springframework.messaging.Message;
@@ -43,6 +44,7 @@
4344
*
4445
* @author Gary Russell
4546
* @author Chris Bono
47+
* @author Ryan Riley
4648
* @since 6.0
4749
*
4850
*/
@@ -171,14 +173,14 @@ public RabbitStreamOperations getStreamOperations() {
171173
}
172174

173175
protected MessageChannel getSendFailureChannel() {
174-
if (this.sendFailureChannel != null) {
175-
return this.sendFailureChannel;
176-
}
177-
else if (this.sendFailureChannelName != null) {
178-
this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
179-
return this.sendFailureChannel;
176+
if (this.sendFailureChannel == null && (this.sendFailureChannelName != null || !this.sync)) {
177+
String sendFailureChannelNameToUse = this.sendFailureChannelName;
178+
if (sendFailureChannelNameToUse == null) {
179+
sendFailureChannelNameToUse = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
180+
}
181+
this.sendFailureChannel = getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
180182
}
181-
return null;
183+
return this.sendFailureChannel;
182184
}
183185

184186
protected MessageChannel getSendSuccessChannel() {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,37 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import com.rabbitmq.stream.Consumer;
2425
import com.rabbitmq.stream.Environment;
26+
import com.rabbitmq.stream.Message;
2527
import com.rabbitmq.stream.OffsetSpecification;
28+
import com.rabbitmq.stream.codec.SimpleCodec;
2629
import org.junit.jupiter.api.Test;
30+
import org.mockito.ArgumentMatchers;
31+
import org.mockito.Mockito;
2732

2833
import org.springframework.integration.amqp.dsl.RabbitStream;
2934
import org.springframework.integration.amqp.support.RabbitTestContainer;
35+
import org.springframework.integration.channel.QueueChannel;
3036
import org.springframework.integration.support.MessageBuilder;
37+
import org.springframework.messaging.MessageHandlingException;
38+
import org.springframework.messaging.support.ErrorMessage;
3139
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
40+
import org.springframework.rabbit.stream.producer.StreamSendException;
3241

3342
import static org.assertj.core.api.Assertions.assertThat;
43+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
3444

3545
/**
3646
* @author Gary Russell
3747
* @author Chris Bono
3848
* @author Artem Bilan
49+
* @author Ryan Riley
3950
*
4051
* @since 6.0
4152
*/
@@ -117,4 +128,52 @@ void sendNative() throws InterruptedException {
117128
streamTemplate.close();
118129
}
119130

131+
@Test
132+
void errorChanelAsync() {
133+
Environment env = Mockito.mock(Environment.class);
134+
RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream");
135+
RabbitStreamTemplate spyStreamTemplate = Mockito.spy(streamTemplate);
136+
CompletableFuture<org.springframework.messaging.Message<?>> errorFuture = new CompletableFuture<>();
137+
Mockito.doReturn(errorFuture).when(spyStreamTemplate).send(ArgumentMatchers.any(Message.class));
138+
139+
QueueChannel errorChannel = new QueueChannel();
140+
RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(spyStreamTemplate)
141+
.sync(false)
142+
.sendFailureChannel(errorChannel)
143+
.getObject();
144+
SimpleCodec codec = new SimpleCodec();
145+
org.springframework.messaging.Message<Message> testMessage = MessageBuilder.withPayload(codec.messageBuilder()
146+
.addData(new byte[1])
147+
.build())
148+
.build();
149+
handler.handleMessage(testMessage);
150+
StreamSendException streamException = new StreamSendException("Test Error Code", 99);
151+
errorFuture.completeExceptionally(streamException);
152+
ErrorMessage errorMessage = (ErrorMessage) errorChannel.receive(1000);
153+
assertThat(errorMessage).extracting(org.springframework.messaging.Message::getPayload).isEqualTo(streamException);
154+
}
155+
156+
@Test
157+
void errorChanelSync() {
158+
Environment env = Mockito.mock(Environment.class);
159+
RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream");
160+
RabbitStreamTemplate spyStreamTemplate = Mockito.spy(streamTemplate);
161+
CompletableFuture<org.springframework.messaging.Message<?>> errorFuture = new CompletableFuture<>();
162+
errorFuture.exceptionally(ErrorMessage::new);
163+
Mockito.doReturn(errorFuture).when(spyStreamTemplate).send(ArgumentMatchers.any(Message.class));
164+
165+
QueueChannel errorChannel = new QueueChannel();
166+
RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(spyStreamTemplate)
167+
.sync(true)
168+
.sendFailureChannel(errorChannel)
169+
.getObject();
170+
SimpleCodec codec = new SimpleCodec();
171+
org.springframework.messaging.Message<Message> testMessage = MessageBuilder.withPayload(codec.messageBuilder()
172+
.addData(new byte[1])
173+
.build())
174+
.build();
175+
assertThatExceptionOfType(MessageHandlingException.class)
176+
.isThrownBy(() -> handler.handleMessage(testMessage));
177+
}
178+
120179
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.expression.EvaluationContext;
3838
import org.springframework.expression.Expression;
3939
import org.springframework.integration.MessageTimeoutException;
40+
import org.springframework.integration.context.IntegrationContextUtils;
4041
import org.springframework.integration.expression.ExpressionUtils;
4142
import org.springframework.integration.expression.FunctionExpression;
4243
import org.springframework.integration.expression.ValueExpression;
@@ -89,6 +90,7 @@
8990
* @author Marius Bogoevici
9091
* @author Biju Kunjummen
9192
* @author Tom van den Berge
93+
* @author Ryan Riley
9294
*
9395
* @since 5.4
9496
*/
@@ -433,16 +435,15 @@ public String getComponentType() {
433435
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
434436
}
435437

436-
@Nullable
437-
protected MessageChannel getSendFailureChannel() {
438-
if (this.sendFailureChannel != null) {
439-
return this.sendFailureChannel;
440-
}
441-
else if (this.sendFailureChannelName != null) {
442-
this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
443-
return this.sendFailureChannel;
438+
protected @Nullable MessageChannel getSendFailureChannel() {
439+
if (this.sendFailureChannel == null && (this.sendFailureChannelName != null || !this.sync)) {
440+
String sendFailureChannelNameToUse = this.sendFailureChannelName;
441+
if (sendFailureChannelNameToUse == null) {
442+
sendFailureChannelNameToUse = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
443+
}
444+
this.sendFailureChannel = getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
444445
}
445-
return null;
446+
return this.sendFailureChannel;
446447
}
447448

448449
protected MessageChannel getSendSuccessChannel() {

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.expression.common.LiteralExpression;
5353
import org.springframework.expression.spel.standard.SpelExpressionParser;
5454
import org.springframework.integration.channel.DirectChannel;
55+
import org.springframework.integration.channel.NullChannel;
5556
import org.springframework.integration.channel.QueueChannel;
5657
import org.springframework.integration.expression.FunctionExpression;
5758
import org.springframework.integration.expression.ValueExpression;
@@ -116,6 +117,7 @@
116117
* @author Biju Kunjummen
117118
* @author Artem Bilan
118119
* @author Tom van den Berge
120+
* @author Ryan Riley
119121
*
120122
* @since 5.4
121123
*/
@@ -544,6 +546,7 @@ void testConsumeAndProduceTransaction() throws Exception {
544546
DirectChannel channel = new DirectChannel();
545547
inbound.setOutputChannel(channel);
546548
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
549+
handler.setSendFailureChannel(new NullChannel());
547550
handler.setMessageKeyExpression(new LiteralExpression("bar"));
548551
handler.setTopicExpression(new LiteralExpression("topic"));
549552
channel.subscribe(handler);
@@ -689,6 +692,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
689692
DirectChannel channel = new DirectChannel();
690693
inbound.setOutputChannel(channel);
691694
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
695+
handler.setSendFailureChannel(new NullChannel());
692696
handler.setMessageKeyExpression(new LiteralExpression("bar"));
693697
handler.setTopicExpression(new LiteralExpression("topic"));
694698
channel.subscribe(handler);

0 commit comments

Comments
 (0)