Skip to content

Commit a64a6e5

Browse files
committed
Avoid duplicate error logging when multiple destinations per binding
1 parent 64dd18c commit a64a6e5

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.function.Consumer;
2020
import java.util.function.Function;
2121

22+
import org.assertj.core.api.InstanceOfAssertFactories;
2223
import org.junit.jupiter.api.Test;
2324

2425
import org.springframework.boot.WebApplicationType;
@@ -29,6 +30,9 @@
2930
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
3031
import org.springframework.context.ApplicationContext;
3132
import org.springframework.context.annotation.Bean;
33+
import org.springframework.integration.channel.AbstractMessageChannel;
34+
import org.springframework.integration.context.IntegrationContextUtils;
35+
import org.springframework.integration.handler.BridgeHandler;
3236
import org.springframework.messaging.support.GenericMessage;
3337

3438
import static org.assertj.core.api.Assertions.assertThat;
@@ -99,7 +103,7 @@ void configurationWithoutBinderSpecificErrorHandler() {
99103

100104
@Test
101105
void errorBindingWithMultipleDestinationPerBinding() {
102-
new SpringApplicationBuilder(
106+
ApplicationContext context = new SpringApplicationBuilder(
103107
TestChannelBinderConfiguration.getCompleteConfiguration(NoErrorHandler.class))
104108
.web(WebApplicationType.NONE)
105109
.run("--spring.cloud.stream.bindings.process-in-0.consumer.max-attempts=1",
@@ -108,6 +112,27 @@ void errorBindingWithMultipleDestinationPerBinding() {
108112
"--spring.jmx.enabled=false");
109113

110114
// must not fail GH-2599
115+
InputDestination source = context.getBean(InputDestination.class);
116+
source.send(new GenericMessage<byte[]>("Hello".getBytes()));
117+
// We validate that error is logged only once : BridgeHandler to bean 'errorChannel' subscribed only once
118+
BinderErrorChannel binderErrorChannel = context.getBean(BinderErrorChannel.class);
119+
assertThat(binderErrorChannel.getSubscriberCount())
120+
.isEqualTo(2); // binderProvidedErrorHandler and BridgeHandler to bean 'errorChannel'
121+
// The BridgeHandler bean associated with this binding bridges to 'errorChannel' bean
122+
assertThat(
123+
context.getBeansOfType(BridgeHandler.class)
124+
.entrySet()
125+
.stream()
126+
.filter(entry -> entry.getKey().endsWith("process-in-0.errors.bridge"))
127+
.findAny())
128+
.isPresent()
129+
.get()
130+
.satisfies(bridgeHandler -> assertThat(bridgeHandler.getValue().getOutputChannel())
131+
.isNotNull()
132+
.asInstanceOf(InstanceOfAssertFactories.type(AbstractMessageChannel.class))
133+
.extracting(AbstractMessageChannel::getBeanName)
134+
.isEqualTo(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
135+
);
111136
}
112137

113138
@EnableAutoConfiguration

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -886,10 +886,14 @@ protected final ErrorInfrastructure registerErrorInfrastructure(
886886

887887
// Setup a bridge to global errorChannel to ensure logging of errors could be controlled via standard SI way
888888
if (this.getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) && this.isSubscribable(binderErrorChannel)) {
889-
SubscribableChannel globalErrorChannel = this.getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
890-
BridgeHandler bridge = new BridgeHandler();
891-
bridge.setOutputChannel(globalErrorChannel);
892-
binderErrorChannel.subscribe(bridge);
889+
String errorBridgeHandlerName = getErrorBridgeName(destination, group, consumerProperties);
890+
if (!getApplicationContext().containsBean(errorBridgeHandlerName)) {
891+
SubscribableChannel globalErrorChannel = this.getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
892+
BridgeHandler bridge = new BridgeHandler();
893+
bridge.setOutputChannel(globalErrorChannel);
894+
binderErrorChannel.subscribe(bridge);
895+
((GenericApplicationContext) getApplicationContext()).registerBean(errorBridgeHandlerName, BridgeHandler.class, () -> bridge);
896+
}
893897
}
894898
return new ErrorInfrastructure(binderErrorChannel, recoverer, binderProvidedErrorHandler);
895899
}

0 commit comments

Comments
 (0)