Skip to content

Commit 36c9f72

Browse files
artembilangaryrussell
authored andcommitted
Use EmitterProcessor for Channels adaptation (#3100)
* Use `EmitterProcessor` for Channels adaptation Related spring-cloud/spring-cloud-stream#1835 To honor a back-pressure after `MessageChannel` adaptation it is better to use an `EmitterProcessor.create(1)` instead of `Flux.create()`. This way whenever an emitter buffer is full, we block upstream producer and don't allow it to produce more messages **Cherry-pick to 5.1.x** * * Wrap every new subscription into a `Flux.defer()` * Fix `ReactiveStreamsConsumerTests` to use a new `Subscription` after each `stop()/start()` on the `ReactiveStreamsConsumer` * * Remove unused imports
1 parent 69401c2 commit 36c9f72

File tree

4 files changed

+90
-110
lines changed

4 files changed

+90
-110
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.messaging.PollableChannel;
2626
import org.springframework.messaging.SubscribableChannel;
2727

28+
import reactor.core.publisher.EmitterProcessor;
2829
import reactor.core.publisher.Flux;
2930
import reactor.core.publisher.FluxSink;
3031
import reactor.core.scheduler.Schedulers;
@@ -60,37 +61,20 @@ else if (messageChannel instanceof PollableChannel) {
6061
}
6162

6263
private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
63-
return new SubscribableChannelPublisherAdapter<>(inputChannel);
64+
return Flux.defer(() -> {
65+
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
66+
@SuppressWarnings("unchecked")
67+
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
68+
inputChannel.subscribe(messageHandler);
69+
return publisher
70+
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
71+
});
6472
}
6573

6674
private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
6775
return new PollableChannelPublisherAdapter<>(inputChannel);
6876
}
6977

70-
71-
private static final class SubscribableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
72-
73-
private final SubscribableChannel channel;
74-
75-
SubscribableChannelPublisherAdapter(SubscribableChannel channel) {
76-
this.channel = channel;
77-
}
78-
79-
@Override
80-
@SuppressWarnings("unchecked")
81-
public void subscribe(Subscriber<? super Message<T>> subscriber) {
82-
Flux.
83-
<Message<?>>create(emitter -> {
84-
MessageHandler messageHandler = emitter::next;
85-
this.channel.subscribe(messageHandler);
86-
emitter.onCancel(() -> this.channel.unsubscribe(messageHandler));
87-
},
88-
FluxSink.OverflowStrategy.IGNORE)
89-
.subscribe((Subscriber<? super Message<?>>) subscriber);
90-
}
91-
92-
}
93-
9478
private static final class PollableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
9579

9680
private final PollableChannel channel;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ private final class DelegatingSubscriber extends BaseSubscriber<Message<?>> {
155155

156156
@Override
157157
public void hookOnSubscribe(Subscription s) {
158-
this.delegate.onSubscribe(s);
159158
ReactiveStreamsConsumer.this.subscription = s;
159+
this.delegate.onSubscribe(s);
160160
}
161161

162162
@Override
Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,9 +20,8 @@
2020

2121
import java.time.Duration;
2222

23-
import org.junit.Test;
23+
import org.junit.jupiter.api.Test;
2424

25-
import org.springframework.messaging.SubscribableChannel;
2625
import org.springframework.messaging.support.GenericMessage;
2726

2827
import reactor.core.Disposable;
@@ -31,14 +30,19 @@
3130
import reactor.test.StepVerifier;
3231
import reactor.util.concurrent.Queues;
3332

34-
public class MessageChannelReactiveUtilsTest {
33+
/**
34+
* @author Sergei Egorov
35+
* @author Artem Bilan
36+
*
37+
* @since 5.1.9
38+
*/
39+
class MessageChannelReactiveUtilsTests {
3540

3641
@Test
37-
public void testBackpressureWithSubscribableChannel() {
42+
void testBackpressureWithSubscribableChannel() {
3843
Disposable.Composite compositeDisposable = Disposables.composite();
3944
try {
4045
DirectChannel channel = new DirectChannel();
41-
assertThat(channel).isInstanceOf(SubscribableChannel.class);
4246
int initialRequest = 10;
4347
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
4448
.expectSubscription()
@@ -64,27 +68,25 @@ public void testBackpressureWithSubscribableChannel() {
6468
}
6569

6670
@Test
67-
public void testOverproducingWithSubscribableChannel() {
71+
void testOverproducingWithSubscribableChannel() {
6872
DirectChannel channel = new DirectChannel();
6973
channel.setCountsEnabled(true);
70-
assertThat(channel).isInstanceOf(SubscribableChannel.class);
7174

7275
Disposable.Composite compositeDisposable = Disposables.composite();
7376
try {
7477
int initialRequest = 10;
7578
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
7679
.expectSubscription()
77-
.then(() -> {
78-
compositeDisposable.add(
79-
Schedulers.boundedElastic().schedule(() -> {
80-
while (true) {
81-
if (channel.getSubscriberCount() > 0) {
82-
channel.send(new GenericMessage<>("foo"));
80+
.then(() ->
81+
compositeDisposable.add(
82+
Schedulers.boundedElastic().schedule(() -> {
83+
while (true) {
84+
if (channel.getSubscriberCount() > 0) {
85+
channel.send(new GenericMessage<>("foo"));
86+
}
8387
}
84-
}
85-
})
86-
);
87-
})
88+
})
89+
))
8890
.expectNextCount(initialRequest)
8991
.thenAwait(Duration.ofMillis(100))
9092
.thenCancel()
@@ -98,4 +100,5 @@ public void testOverproducingWithSubscribableChannel() {
98100
.as("produced")
99101
.isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
100102
}
103+
101104
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
package org.springframework.integration.channel.reactive;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.fail;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22-
import static org.mockito.BDDMockito.willAnswer;
2322
import static org.mockito.Mockito.mock;
2423
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.times;
2624
import static org.mockito.Mockito.verify;
2725

2826
import java.util.LinkedList;
@@ -33,7 +31,6 @@
3331
import java.util.concurrent.TimeUnit;
3432

3533
import org.junit.Test;
36-
import org.mockito.ArgumentCaptor;
3734
import org.mockito.Mockito;
3835
import org.reactivestreams.Subscriber;
3936
import org.reactivestreams.Subscription;
@@ -82,14 +79,10 @@ public void testReactiveStreamsConsumerFluxMessageChannel() throws InterruptedEx
8279

8380
reactiveConsumer.stop();
8481

85-
try {
86-
testChannel.send(testMessage);
87-
}
88-
catch (Exception e) {
89-
assertThat(e).isInstanceOf(MessageDeliveryException.class);
90-
assertThat(e.getCause()).isInstanceOf(IllegalStateException.class);
91-
assertThat(e.getMessage()).contains("doesn't have subscribers to accept messages");
92-
}
82+
assertThatExceptionOfType(MessageDeliveryException.class)
83+
.isThrownBy(() -> testChannel.send(testMessage))
84+
.withCauseInstanceOf(IllegalStateException.class)
85+
.withMessageContaining("doesn't have subscribers to accept messages");
9386

9487
reactiveConsumer.start();
9588

@@ -102,54 +95,53 @@ public void testReactiveStreamsConsumerFluxMessageChannel() throws InterruptedEx
10295

10396

10497
@Test
105-
@SuppressWarnings("unchecked")
10698
public void testReactiveStreamsConsumerDirectChannel() throws InterruptedException {
10799
DirectChannel testChannel = new DirectChannel();
108100

109-
Subscriber<Message<?>> testSubscriber = (Subscriber<Message<?>>) Mockito.mock(Subscriber.class);
110-
111101
BlockingQueue<Message<?>> messages = new LinkedBlockingQueue<>();
112102

113-
willAnswer(i -> {
114-
messages.put(i.getArgument(0));
115-
return null;
116-
})
117-
.given(testSubscriber)
118-
.onNext(any(Message.class));
103+
Subscriber<Message<?>> testSubscriber = Mockito.spy(new Subscriber<Message<?>>() {
104+
105+
@Override
106+
public void onSubscribe(Subscription subscription) {
107+
subscription.request(1);
108+
}
109+
110+
@Override
111+
public void onNext(Message<?> message) {
112+
messages.offer(message);
113+
}
114+
115+
@Override
116+
public void onError(Throwable t) {
117+
118+
}
119+
120+
@Override
121+
public void onComplete() {
122+
123+
}
124+
125+
});
119126

120127
ReactiveStreamsConsumer reactiveConsumer = new ReactiveStreamsConsumer(testChannel, testSubscriber);
121128
reactiveConsumer.setBeanFactory(mock(BeanFactory.class));
122129
reactiveConsumer.afterPropertiesSet();
123130
reactiveConsumer.start();
124131

125-
Message<?> testMessage = new GenericMessage<>("test");
132+
final Message<?> testMessage = new GenericMessage<>("test");
126133
testChannel.send(testMessage);
127134

128-
ArgumentCaptor<Subscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(Subscription.class);
129-
verify(testSubscriber).onSubscribe(subscriptionArgumentCaptor.capture());
130-
Subscription subscription = subscriptionArgumentCaptor.getValue();
131-
132-
subscription.request(1);
133-
134135
Message<?> message = messages.poll(10, TimeUnit.SECONDS);
135136
assertThat(message).isSameAs(testMessage);
136137

137138
reactiveConsumer.stop();
138139

139-
try {
140-
testChannel.send(testMessage);
141-
fail("MessageDeliveryException");
142-
}
143-
catch (Exception e) {
144-
assertThat(e).isInstanceOf(MessageDeliveryException.class);
145-
}
140+
assertThatExceptionOfType(MessageDeliveryException.class)
141+
.isThrownBy(() -> testChannel.send(testMessage));
146142

147143
reactiveConsumer.start();
148144

149-
subscription.request(1);
150-
151-
testMessage = new GenericMessage<>("test2");
152-
153145
testChannel.send(testMessage);
154146

155147
message = messages.poll(10, TimeUnit.SECONDS);
@@ -159,24 +151,40 @@ public void testReactiveStreamsConsumerDirectChannel() throws InterruptedExcepti
159151
verify(testSubscriber, never()).onComplete();
160152

161153
assertThat(messages.isEmpty()).isTrue();
154+
155+
reactiveConsumer.stop();
162156
}
163157

164158
@Test
165159
@SuppressWarnings("unchecked")
166160
public void testReactiveStreamsConsumerPollableChannel() throws InterruptedException {
167161
QueueChannel testChannel = new QueueChannel();
168162

169-
Subscriber<Message<?>> testSubscriber = (Subscriber<Message<?>>) Mockito.mock(Subscriber.class);
170-
171163
BlockingQueue<Message<?>> messages = new LinkedBlockingQueue<>();
172164

173-
willAnswer(i -> {
174-
messages.put(i.getArgument(0));
175-
return null;
176-
})
177-
.given(testSubscriber)
178-
.onNext(any(Message.class));
165+
Subscriber<Message<?>> testSubscriber = Mockito.spy(new Subscriber<Message<?>>() {
166+
167+
@Override
168+
public void onSubscribe(Subscription subscription) {
169+
subscription.request(2);
170+
}
171+
172+
@Override
173+
public void onNext(Message<?> message) {
174+
messages.offer(message);
175+
}
176+
177+
@Override
178+
public void onError(Throwable t) {
179+
180+
}
179181

182+
@Override
183+
public void onComplete() {
184+
185+
}
186+
187+
});
180188
ReactiveStreamsConsumer reactiveConsumer = new ReactiveStreamsConsumer(testChannel, testSubscriber);
181189
reactiveConsumer.setBeanFactory(mock(BeanFactory.class));
182190
reactiveConsumer.afterPropertiesSet();
@@ -185,12 +193,6 @@ public void testReactiveStreamsConsumerPollableChannel() throws InterruptedExcep
185193
Message<?> testMessage = new GenericMessage<>("test");
186194
testChannel.send(testMessage);
187195

188-
ArgumentCaptor<Subscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(Subscription.class);
189-
verify(testSubscriber).onSubscribe(subscriptionArgumentCaptor.capture());
190-
Subscription subscription = subscriptionArgumentCaptor.getValue();
191-
192-
subscription.request(1);
193-
194196
Message<?> message = messages.poll(10, TimeUnit.SECONDS);
195197
assertThat(message).isSameAs(testMessage);
196198

@@ -201,11 +203,6 @@ public void testReactiveStreamsConsumerPollableChannel() throws InterruptedExcep
201203

202204
reactiveConsumer.start();
203205

204-
verify(testSubscriber, times(2)).onSubscribe(subscriptionArgumentCaptor.capture());
205-
subscription = subscriptionArgumentCaptor.getValue();
206-
207-
subscription.request(2);
208-
209206
Message<?> testMessage2 = new GenericMessage<>("test2");
210207

211208
testChannel.send(testMessage2);
@@ -247,14 +244,10 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E
247244

248245
endpointFactoryBean.stop();
249246

250-
try {
251-
testChannel.send(testMessage);
252-
}
253-
catch (Exception e) {
254-
assertThat(e).isInstanceOf(MessageDeliveryException.class);
255-
assertThat(e.getCause()).isInstanceOf(IllegalStateException.class);
256-
assertThat(e.getMessage()).contains("doesn't have subscribers to accept messages");
257-
}
247+
assertThatExceptionOfType(MessageDeliveryException.class)
248+
.isThrownBy(() -> testChannel.send(testMessage))
249+
.withCauseInstanceOf(IllegalStateException.class)
250+
.withMessageContaining("doesn't have subscribers to accept messages");
258251

259252
endpointFactoryBean.start();
260253

0 commit comments

Comments
 (0)