Skip to content

Commit 3f8b700

Browse files
committed
GH-3054 Clear channel cache in StreamBridge during the refresh
Resolves #3054
1 parent 637636d commit 3f8b700

File tree

1 file changed

+13
-4
lines changed
  • core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function

1 file changed

+13
-4
lines changed

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import org.springframework.cloud.stream.config.BindingProperties;
5454
import org.springframework.cloud.stream.config.BindingServiceProperties;
5555
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
56+
import org.springframework.context.ApplicationEvent;
57+
import org.springframework.context.ApplicationListener;
5658
import org.springframework.context.ConfigurableApplicationContext;
5759
import org.springframework.core.ResolvableType;
5860
import org.springframework.integration.channel.AbstractMessageChannel;
@@ -95,7 +97,7 @@
9597
*
9698
*/
9799
@SuppressWarnings("rawtypes")
98-
public final class StreamBridge implements StreamOperations, SmartInitializingSingleton, DisposableBean {
100+
public final class StreamBridge implements StreamOperations, SmartInitializingSingleton, DisposableBean, ApplicationListener<ApplicationEvent> {
99101

100102
private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge";
101103

@@ -135,6 +137,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
135137
* @param bindingServiceProperties instance of {@link BindingServiceProperties}
136138
* @param applicationContext instance of {@link ConfigurableApplicationContext}
137139
*/
140+
@SuppressWarnings("serial")
138141
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
139142
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider<ObservationRegistry> observationRegistries) {
140143
this.executorService = Executors.newCachedThreadPool();
@@ -292,12 +295,9 @@ MessageChannel resolveDestination(String destinationName, ProducerProperties pro
292295
messageChannel = this.isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
293296
((AbstractSubscribableChannel) messageChannel).setApplicationContext(applicationContext);
294297
((AbstractSubscribableChannel) messageChannel).setComponentName(destinationName);
295-
//<<<<<<< HEAD
296298

297299
BinderWrapper binderWrapper = bindingService.createBinderWrapper(binderName, destinationName, messageChannel.getClass());
298-
//=======
299300
((AbstractSubscribableChannel) messageChannel).registerObservationRegistry(observationRegistry);
300-
//>>>>>>> a1418283c (GH-3033: Register ObservationRegistry for Dynamic MessageChannels)
301301
if (this.destinationBindingCallback != null) {
302302
Object extendedProducerProperties = this.bindingService
303303
.getExtendedProducerProperties(binderWrapper.binder(), destinationName);
@@ -369,6 +369,15 @@ public void setAsync(boolean async) {
369369
this.async = async;
370370
}
371371

372+
// see https://github.com/spring-cloud/spring-cloud-stream/issues/3054
373+
@Override
374+
public void onApplicationEvent(ApplicationEvent event) {
375+
// we need to do it by String to avoid cloud-bus and context dependencies
376+
if (event.getClass().getName().equals("org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent")) {
377+
this.channelCache.clear();
378+
}
379+
}
380+
372381
private static final class ContextPropagationHelper {
373382
static ExecutorService wrap(ExecutorService executorService) {
374383
return ContextExecutorService.wrap(executorService, () -> ContextSnapshotFactory.builder().build().captureAll());

0 commit comments

Comments
 (0)