Skip to content

Commit dccf8ae

Browse files
omercelikcengolegz
authored andcommitted
Message Channel should be closed in specified cases : application shutdown, unbinding or exceeding cache size.
checkstyle fixes Resolves #2869 Resolves #3026
1 parent a9fe0c2 commit dccf8ae

File tree

7 files changed

+184
-54
lines changed

7 files changed

+184
-54
lines changed

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binding/BindingServiceTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,13 @@
9090
* @author Chris Bono
9191
* @author Artem Bilan
9292
* @author Kotaro Matsumoto
93+
* @author Omer Celik
9394
*/
9495
class BindingServiceTests {
9596

9697
@SuppressWarnings({ "unchecked", "rawtypes" })
9798
@Test
98-
void defaultGroup() throws Exception {
99+
void defaultGroup() {
99100
BindingServiceProperties properties = new BindingServiceProperties();
100101
Map<String, BindingProperties> bindingProperties = new HashMap<>();
101102
BindingProperties props = new BindingProperties();
@@ -175,7 +176,7 @@ void multipleConsumerBindings() {
175176

176177
@SuppressWarnings({ "unchecked", "rawtypes" })
177178
@Test
178-
void multipleConsumerBindingsFromIndexList() throws Exception {
179+
void multipleConsumerBindingsFromIndexList() {
179180
BindingServiceProperties properties = new BindingServiceProperties();
180181
Map<String, BindingProperties> bindingProperties = new HashMap<>();
181182
BindingProperties props = new BindingProperties();
@@ -236,7 +237,7 @@ void multipleConsumerBindingsFromIndexList() throws Exception {
236237

237238
@SuppressWarnings({ "unchecked", "rawtypes" })
238239
@Test
239-
void consumerBindingWhenMultiplexingIsEnabled() throws Exception {
240+
void consumerBindingWhenMultiplexingIsEnabled() {
240241
BindingServiceProperties properties = new BindingServiceProperties();
241242
Map<String, BindingProperties> bindingProperties = new HashMap<>();
242243
BindingProperties props = new BindingProperties();
@@ -282,7 +283,7 @@ void consumerBindingWhenMultiplexingIsEnabled() throws Exception {
282283

283284
@SuppressWarnings({ "unchecked", "rawtypes" })
284285
@Test
285-
void explicitGroup() throws Exception {
286+
void explicitGroup() {
286287
BindingServiceProperties properties = new BindingServiceProperties();
287288
Map<String, BindingProperties> bindingProperties = new HashMap<>();
288289
BindingProperties props = new BindingProperties();
@@ -528,7 +529,7 @@ void lateBindingProducer() throws Exception {
528529

529530
assertThat(service.getProducerBinding("output")).isSameAs(binding);
530531

531-
service.unbindProducers(outputChannelName);
532+
service.unbindProducers(null, outputChannelName);
532533
verify(binder, times(2)).bindProducer(eq("foo"), same(outputChannel),
533534
any(ProducerProperties.class));
534535
verify(delegate).unbind();
@@ -552,9 +553,8 @@ void bindingAutostartup() throws Exception {
552553
assertThat(inputBinding.isRunning()).isFalse();
553554
}
554555

555-
@SuppressWarnings("unchecked")
556556
@Test
557-
void bindingNameAsTopLevelProperty() throws Exception {
557+
void bindingNameAsTopLevelProperty() {
558558
ApplicationContext context = new SpringApplicationBuilder(BarConfiguration.class)
559559
.web(WebApplicationType.NONE).run();
560560

core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
*
8585
* @author Oleg Zhurakousky
8686
* @author Soby Chacko
87+
* @author Omer Celik
8788
*
8889
*/
8990
class StreamBridgeTests {
@@ -384,7 +385,7 @@ void withOutputContentTypeWildCardBindings() {
384385

385386
// See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream/issues/2805
386387
@Test
387-
void streamBridgeSendWithBinderNameAndCustomContentType() throws Exception {
388+
void streamBridgeSendWithBinderNameAndCustomContentType() {
388389
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
389390
.getCompleteConfiguration(ConsumerConfiguration.class, EmptyConfigurationWithCustomConverters.class))
390391
.web(WebApplicationType.NONE).run(
@@ -767,6 +768,45 @@ void dynamicDestinationDestroy() {
767768
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0);
768769
}
769770

771+
@Test
772+
void dynamicDestinationWithBinderNameDestroy() {
773+
BindingService bindingService;
774+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
775+
.getCompleteConfiguration(InterceptorConfiguration.class))
776+
.web(WebApplicationType.NONE).run(
777+
"--spring.jmx.enabled=false",
778+
"--spring.cloud.stream.binders.kafka1.type=kafka",
779+
"--spring.cloud.stream.binders.anotherKafka.type=kafka"
780+
)) {
781+
StreamBridge bridge = context.getBean(StreamBridge.class);
782+
bridge.send("binding1", "kafka1", "Omer Celik");
783+
bridge.send("binding2", "anotherKafka", "Omer Celik");
784+
785+
bindingService = context.getBean(BindingService.class);
786+
}
787+
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0);
788+
}
789+
790+
@Test
791+
void dynamicDestinationWithBinderNameDestroyForCacheSize() {
792+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
793+
.getCompleteConfiguration(InterceptorConfiguration.class))
794+
.web(WebApplicationType.NONE).run(
795+
"--spring.jmx.enabled=false",
796+
"--spring.cloud.stream.dynamic-destination-cache-size=1",
797+
"--spring.cloud.stream.binders.kafka1.type=kafka",
798+
"--spring.cloud.stream.binders.anotherKafka.type=kafka"
799+
)) {
800+
StreamBridge bridge = context.getBean(StreamBridge.class);
801+
bridge.send("binding1", "kafka1", "Omer Celik");
802+
bridge.send("binding2", "anotherKafka", "Omer Celik");
803+
804+
BindingService bindingService = context.getBean(BindingService.class);
805+
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(1);
806+
assertThat(bindingService.getProducerBindingNames()[0]).isEqualTo("anotherKafka:binding2");
807+
}
808+
}
809+
770810
@Test
771811
void withIntegrationFlowBecauseMarcinSaidSo() {
772812
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder;
18+
19+
/**
20+
* Holds information about the binder and channel.
21+
*
22+
* @author Omer Celik
23+
*/
24+
public record BinderWrapper(Binder binder, String destinationName, String cacheKey) {
25+
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/AbstractBindableProxyFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void unbindOutputs(BindingService bindingService) {
135135
for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.outputHolders
136136
.entrySet()) {
137137
if (boundTargetHolderEntry.getValue().bindable()) {
138-
bindingService.unbindProducers(boundTargetHolderEntry.getKey());
138+
bindingService.unbindProducers(null, boundTargetHolderEntry.getKey());
139139
}
140140
}
141141
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.beans.BeanUtils;
3636
import org.springframework.cloud.stream.binder.Binder;
3737
import org.springframework.cloud.stream.binder.BinderFactory;
38+
import org.springframework.cloud.stream.binder.BinderWrapper;
3839
import org.springframework.cloud.stream.binder.Binding;
3940
import org.springframework.cloud.stream.binder.ConsumerProperties;
4041
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -50,6 +51,9 @@
5051
import org.springframework.util.StringUtils;
5152
import org.springframework.validation.DataBinder;
5253

54+
import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.createChannelCacheKey;
55+
import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.getBinderNameIfNeeded;
56+
5357
/**
5458
* Handles binding of input/output targets by delegating to an underlying {@link Binder}.
5559
*
@@ -270,56 +274,52 @@ public <T> void reschedulePollableConsumerBinding(final T input,
270274
}
271275

272276
@SuppressWarnings({ "unchecked", "rawtypes" })
273-
public <T> Binding<T> bindProducer(T output, String outputName, boolean cache, @Nullable Binder<T, ?, ProducerProperties> binder) {
274-
String bindingTarget = this.bindingServiceProperties.getBindingDestination(outputName);
275-
Class<?> outputClass = output.getClass();
276-
if (output instanceof Advised advisedOutput) {
277-
outputClass = Stream.of(advisedOutput.getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
278-
.orElse(outputClass);
279-
}
280-
if (binder == null) {
281-
binder = (Binder<T, ?, ProducerProperties>) getBinder(outputName, outputClass);
282-
}
283-
277+
public <T> Binding<T> bindProducer(T output, boolean cache, BinderWrapper binderWrapper) {
284278
ProducerProperties producerProperties = this.bindingServiceProperties
285-
.getProducerProperties(outputName);
286-
if (binder instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
287-
Object extension = extendedPropertiesBinder.getExtendedProducerProperties(outputName);
279+
.getProducerProperties(binderWrapper.destinationName());
280+
if (binderWrapper.binder() instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
281+
Object extension = extendedPropertiesBinder.getExtendedProducerProperties(binderWrapper.destinationName());
288282
ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
289283
extension);
290284
BeanUtils.copyProperties(producerProperties, extendedProducerProperties);
291285

292286
producerProperties = extendedProducerProperties;
293287
}
294-
producerProperties.populateBindingName(outputName);
288+
producerProperties.populateBindingName(binderWrapper.destinationName());
295289
validate(producerProperties);
296-
Binding<T> binding = doBindProducer(output, bindingTarget, binder,
290+
String bindingTarget = this.bindingServiceProperties.getBindingDestination(binderWrapper.destinationName());
291+
Binding<T> binding = doBindProducer(output, bindingTarget, binderWrapper.binder(),
297292
producerProperties);
298293
// If the downstream binder modified the partition count in the extended producer properties
299294
// based on the higher number of partitions provisioned on the target middleware, update that
300295
// in the original producer properties.
301296
ProducerProperties originalProducerProperties = this.bindingServiceProperties
302-
.getProducerProperties(outputName);
297+
.getProducerProperties(binderWrapper.destinationName());
303298
if (originalProducerProperties.getPartitionCount() < producerProperties.getPartitionCount()) {
304299
originalProducerProperties.setPartitionCount(producerProperties.getPartitionCount());
305300
}
306301
if (cache) {
307-
this.producerBindings.put(outputName, binding);
302+
this.producerBindings.put(binderWrapper.cacheKey(), binding);
308303
}
309304
return binding;
310305
}
311306

312307
public <T> Binding<T> bindProducer(T output, String outputName, boolean cache) {
313-
return this.bindProducer(output, outputName, cache, null);
308+
Class<?> outputClass = output.getClass();
309+
if (output instanceof Advised advisedOutput) {
310+
outputClass = Stream.of(advisedOutput.getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
311+
.orElse(outputClass);
312+
}
313+
BinderWrapper binderWrapper = createBinderWrapper(null, outputName, outputClass);
314+
return this.bindProducer(output, cache, binderWrapper);
314315
}
315316

316317
public <T> Binding<T> bindProducer(T output, String outputName) {
317318
return this.bindProducer(output, outputName, true);
318319
}
319320

320321
@SuppressWarnings("rawtypes")
321-
public Object getExtendedProducerProperties(Object output, String outputName) {
322-
Binder binder = getBinder(outputName, output.getClass());
322+
public Object getExtendedProducerProperties(Binder binder, String outputName) {
323323
if (binder instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
324324
return extendedPropertiesBinder.getExtendedProducerProperties(outputName);
325325
}
@@ -398,16 +398,21 @@ else if (this.log.isWarnEnabled()) {
398398
}
399399
}
400400

401-
public void unbindProducers(String outputName) {
402-
Binding<?> binding = this.producerBindings.remove(outputName);
401+
public void unbindProducers(@Nullable String binderName, String outputName) {
402+
String cacheKey = createChannelCacheKey(binderName, outputName, bindingServiceProperties);
403+
unbindProducers(cacheKey);
404+
}
405+
406+
public void unbindProducers(String cacheKey) {
407+
Binding<?> binding = this.producerBindings.remove(cacheKey);
403408

404409
if (binding != null) {
405410
binding.stop();
406411
//then
407412
binding.unbind();
408413
}
409414
else if (this.log.isWarnEnabled()) {
410-
this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
415+
this.log.warn("Trying to unbind '" + cacheKey + "', but no binding found.");
411416
}
412417
}
413418

@@ -443,6 +448,14 @@ private void assertNotIllegalException(RuntimeException exception)
443448
}
444449
}
445450

451+
public BinderWrapper createBinderWrapper(@Nullable String binderName, String destinationName, Class<?> outputClass) {
452+
binderName = getBinderNameIfNeeded(binderName, destinationName, bindingServiceProperties);
453+
Binder binder = binderFactory.getBinder(binderName, outputClass);
454+
String channelCacheKey = createChannelCacheKey(binderName, destinationName);
455+
return new BinderWrapper(binder, destinationName, channelCacheKey);
456+
}
457+
458+
446459
public static class LateBinding<T> implements Binding<T> {
447460

448461
private volatile Binding<T> delegate;

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.cloud.function.core.FunctionInvocationHelper;
4444
import org.springframework.cloud.stream.binder.Binder;
4545
import org.springframework.cloud.stream.binder.BinderFactory;
46+
import org.springframework.cloud.stream.binder.BinderWrapper;
4647
import org.springframework.cloud.stream.binder.ProducerProperties;
4748
import org.springframework.cloud.stream.binding.BindingService;
4849
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
@@ -68,6 +69,8 @@
6869
import org.springframework.util.ObjectUtils;
6970
import org.springframework.util.StringUtils;
7071

72+
import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.createChannelCacheKey;
73+
7174

7275
/**
7376
* A class which allows user to send data to an output binding.
@@ -85,6 +88,7 @@
8588
* @author Soby Chacko
8689
* @author Byungjun You
8790
* @author Michał Rowicki
91+
* @author Omer Celik
8892
* @since 3.0.3
8993
*
9094
*/
@@ -128,7 +132,6 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
128132
* @param bindingServiceProperties instance of {@link BindingServiceProperties}
129133
* @param applicationContext instance of {@link ConfigurableApplicationContext}
130134
*/
131-
@SuppressWarnings("serial")
132135
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
133136
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) {
134137
this.executorService = Executors.newCachedThreadPool();
@@ -268,13 +271,7 @@ public void afterSingletonsInstantiated() {
268271
MessageChannel resolveDestination(String destinationName, ProducerProperties producerProperties, String binderName) {
269272
lock.lock();
270273
try {
271-
MessageChannel messageChannel = null;
272-
if (StringUtils.hasText(binderName)) {
273-
messageChannel = this.channelCache.get(binderName + ":" + destinationName);
274-
}
275-
else {
276-
messageChannel = this.channelCache.get(destinationName);
277-
}
274+
MessageChannel messageChannel = this.channelCache.get(createChannelCacheKey(binderName, destinationName, bindingServiceProperties));
278275
if (messageChannel == null) {
279276
if (this.applicationContext.containsBean(destinationName)) {
280277
messageChannel = this.applicationContext.getBean(destinationName, MessageChannel.class);
@@ -291,28 +288,20 @@ MessageChannel resolveDestination(String destinationName, ProducerProperties pro
291288
messageChannel = this.isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
292289
((AbstractSubscribableChannel) messageChannel).setApplicationContext(applicationContext);
293290
((AbstractSubscribableChannel) messageChannel).setComponentName(destinationName);
291+
292+
BinderWrapper binderWrapper = bindingService.createBinderWrapper(binderName, destinationName, messageChannel.getClass());
294293
if (this.destinationBindingCallback != null) {
295294
Object extendedProducerProperties = this.bindingService
296-
.getExtendedProducerProperties(messageChannel, destinationName);
295+
.getExtendedProducerProperties(binderWrapper.binder(), destinationName);
297296
this.destinationBindingCallback.configure(destinationName, messageChannel,
298297
producerProperties, extendedProducerProperties);
299298
}
300299

301-
Binder binder = null;
302-
if (StringUtils.hasText(binderName)) {
303-
BinderFactory binderFactory = this.applicationContext.getBean(BinderFactory.class);
304-
binder = binderFactory.getBinder(binderName, messageChannel.getClass());
305-
}
306300
addPartitioningInterceptorIfNeedBe(producerProperties, destinationName, (AbstractMessageChannel) messageChannel);
307301
addGlobalChannelInterceptorProcessor((AbstractMessageChannel) messageChannel, destinationName);
308302

309-
this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
310-
if (StringUtils.hasText(binderName)) {
311-
this.channelCache.put(binderName + ":" + destinationName, messageChannel);
312-
}
313-
else {
314-
this.channelCache.put(destinationName, messageChannel);
315-
}
303+
this.bindingService.bindProducer(messageChannel, true, binderWrapper);
304+
this.channelCache.put(binderWrapper.cacheKey(), messageChannel);
316305
}
317306
}
318307

0 commit comments

Comments
 (0)