Skip to content

Commit 56047dd

Browse files
committed
GH-2997 Fix support for producer's error-handler-definition
Resolves #2997
1 parent 5d881b2 commit 56047dd

File tree

3 files changed

+15
-10
lines changed

3 files changed

+15
-10
lines changed

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitTestContainer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
*/
2929
public final class RabbitTestContainer {
3030

31-
private RabbitTestContainer() {
32-
}
33-
3431
private static final RabbitMQContainer RABBITMQ;
3532
static {
3633
String image = "rabbitmq:3.11-management";
@@ -46,6 +43,9 @@ private RabbitTestContainer() {
4643
RABBITMQ.start();
4744
}
4845

46+
private RabbitTestContainer() {
47+
48+
}
4949
/**
5050
* Should be called early by test that wants to ensure a shared {@link RabbitMQContainer} is up and running.
5151
*/

core/spring-cloud-stream-test-binder/src/test/java/org/springframework/cloud/stream/binder/test/TestChannelBinderTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void test() throws Exception {
4343
"--spring.cloud.stream.bindings.function-in-0.destination=input")) {
4444
TestChannelBinder binder = context.getBean(TestChannelBinder.class);
4545
Method registerErrorInfrastructure = ReflectionUtils
46-
.findMethod(TestChannelBinder.class, "registerErrorInfrastructure", ProducerDestination.class, String.class);
46+
.findMethod(TestChannelBinder.class, "registerErrorInfrastructure", ProducerDestination.class, String.class, boolean.class);
4747
registerErrorInfrastructure.setAccessible(true);
4848
ProducerDestination destination = new ProducerDestination() {
4949
@Override
@@ -56,7 +56,7 @@ public String getName() {
5656
return "sample";
5757
}
5858
};
59-
registerErrorInfrastructure.invoke(binder, destination, "function-in-0");
59+
registerErrorInfrastructure.invoke(binder, destination, "function-in-0", false);
6060
destination = new ProducerDestination() {
6161
@Override
6262
public String getNameForPartition(int partition) {
@@ -68,7 +68,7 @@ public String getName() {
6868
return "sample";
6969
}
7070
};
71-
registerErrorInfrastructure.invoke(binder, destination, "function-in-0");
71+
registerErrorInfrastructure.invoke(binder, destination, "function-in-0", false);
7272
}
7373
}
7474

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,13 @@ public final Binding<MessageChannel> doBindProducer(final String destination,
305305
bp = bsp.getBindingProperties(bindingName);
306306
}
307307

308-
SubscribableChannel errorChannel = (bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition())) || producerProperties.isErrorChannelEnabled()
309-
? registerErrorInfrastructure(producerDestination, producerProperties.getBindingName()) : null;
308+
boolean errorHandlerDefined = bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition());
309+
SubscribableChannel errorChannel = errorHandlerDefined || producerProperties.isErrorChannelEnabled()
310+
? registerErrorInfrastructure(producerDestination, producerProperties.getBindingName(), errorHandlerDefined)
311+
: null;
312+
313+
String errorChannelName = errorsBaseName(producerDestination, producerProperties.getBindingName());
314+
this.subscribeFunctionErrorHandler(errorChannelName, producerProperties.getBindingName());
310315

311316
producerMessageHandler = createProducerMessageHandler(producerDestination,
312317
producerProperties, outputChannel, errorChannel);
@@ -727,7 +732,7 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group
727732
* @return the channel.
728733
*/
729734
private SubscribableChannel registerErrorInfrastructure(
730-
ProducerDestination destination, String bindingName) {
735+
ProducerDestination destination, String bindingName, boolean errorHandlerDefinitionAvailable) {
731736

732737
String errorChannelName = errorsBaseName(destination, bindingName);
733738
SubscribableChannel errorChannel = new PublishSubscribeChannel();
@@ -751,7 +756,7 @@ private SubscribableChannel registerErrorInfrastructure(
751756
}
752757

753758
MessageChannel defaultErrorChannel = null;
754-
if (getApplicationContext()
759+
if (!errorHandlerDefinitionAvailable && getApplicationContext()
755760
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
756761
defaultErrorChannel = getApplicationContext().getBean(
757762
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,

0 commit comments

Comments
 (0)