Skip to content

Commit 6c379d7

Browse files
artembilangaryrussell
authored andcommitted
Fix a logic in the IntFlowDef.toReactivePublisher (#2553)
* Fix a logic in the IntFlowDef.toReactivePublisher * We may consider to start a `Publisher<Message<?>>` just from one channel. So, and an implicit `bridge()` to meet and internal `IntegrationFlow` logic * The `MessageChannelReference` and `FixedSubscriberChannelPrototype` can't be converted to the reactive `Publisher`. So, an implicit `bridge()` in between them and target `FluxMessageChannel` NOTE: This maybe considered for back-port, but the workaround is simple: just add extra `bridge()` after the mentioned channels * * Allow `log()` before `toReactivePublisher()` and the same time fix the problem with not resetted `implicitChannel` flag in the `IntegrationFlowDefinition`
1 parent 54094da commit 6c379d7

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ public B channel(MessageChannelSpec<?, ?> messageChannelSpec) {
210210
*/
211211
public B channel(MessageChannel messageChannel) {
212212
Assert.notNull(messageChannel, "'messageChannel' must not be null");
213+
this.implicitChannel = false;
213214
if (this.currentMessageChannel != null) {
214215
bridge();
215216
}
@@ -431,9 +432,9 @@ public B wireTap(MessageChannel wireTapChannel, Consumer<WireTapSpec> wireTapCon
431432
*/
432433
public B wireTap(WireTapSpec wireTapSpec) {
433434
WireTap interceptor = wireTapSpec.get();
434-
if (this.currentMessageChannel == null || !(this.currentMessageChannel instanceof ChannelInterceptorAware)) {
435-
this.implicitChannel = true;
435+
if (!(this.currentMessageChannel instanceof ChannelInterceptorAware)) {
436436
channel(new DirectChannel());
437+
this.implicitChannel = true;
437438
}
438439
addComponent(wireTapSpec);
439440
((ChannelInterceptorAware) this.currentMessageChannel).addInterceptor(interceptor);
@@ -2941,7 +2942,9 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
29412942
publisher = (Publisher<Message<T>>) channelForPublisher;
29422943
}
29432944
else {
2944-
if (channelForPublisher != null) {
2945+
if (channelForPublisher != null && this.integrationComponents.size() > 1
2946+
&& !(channelForPublisher instanceof MessageChannelReference) &&
2947+
!(channelForPublisher instanceof FixedSubscriberChannelPrototype)) {
29452948
publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher);
29462949
}
29472950
else {
@@ -2951,6 +2954,8 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
29512954
}
29522955
}
29532956

2957+
this.implicitChannel = false;
2958+
29542959
get();
29552960

29562961
return new PublisherIntegrationFlow<>(this.integrationComponents, publisher);

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ public class ReactiveStreamsTests {
8989
@Autowired
9090
private IntegrationFlowContext integrationFlowContext;
9191

92+
@Autowired
93+
private MessageChannel singleChannel;
94+
95+
@Autowired
96+
private Publisher<Message<String>> singleChannelFlow;
97+
98+
@Autowired
99+
private MessageChannel fixedSubscriberChannel;
100+
101+
@Autowired
102+
private Publisher<Message<String>> fixedSubscriberChannelFlow;
103+
92104
@Test
93105
public void testReactiveFlow() throws Exception {
94106
List<String> results = new ArrayList<>();
@@ -207,6 +219,30 @@ public void testFluxTransform() {
207219
integrationFlowRegistration.destroy();
208220
}
209221

222+
@Test
223+
public void singleChannelFlowTest() throws InterruptedException {
224+
CountDownLatch latch = new CountDownLatch(1);
225+
Flux.from(this.singleChannelFlow)
226+
.map(m -> m.getPayload().toUpperCase())
227+
.subscribe(p -> {
228+
latch.countDown();
229+
});
230+
this.singleChannel.send(new GenericMessage<>("foo"));
231+
assertTrue(latch.await(10, TimeUnit.SECONDS));
232+
}
233+
234+
@Test
235+
public void fixedSubscriberChannelFlowTest() throws InterruptedException {
236+
CountDownLatch latch = new CountDownLatch(1);
237+
Flux.from(this.fixedSubscriberChannelFlow)
238+
.map(m -> m.getPayload().toUpperCase())
239+
.subscribe(p -> {
240+
latch.countDown();
241+
});
242+
this.fixedSubscriberChannel.send(new GenericMessage<>("bar"));
243+
assertTrue(latch.await(10, TimeUnit.SECONDS));
244+
}
245+
210246
@Configuration
211247
@EnableIntegration
212248
public static class ContextConfiguration {
@@ -221,6 +257,7 @@ public Publisher<Message<String>> reactiveFlow() {
221257
.autoStartup(false)
222258
.id("reactiveStreamsMessageSource"))
223259
.split(String.class, p -> p.split(","))
260+
.log()
224261
.toReactivePublisher();
225262
}
226263

@@ -231,6 +268,23 @@ public Publisher<Message<Integer>> pollableReactiveFlow() {
231268
.split(s -> s.delimiters(","))
232269
.<String, Integer>transform(Integer::parseInt)
233270
.channel(MessageChannels.queue())
271+
.log()
272+
.toReactivePublisher();
273+
}
274+
275+
@Bean
276+
public Publisher<Message<String>> singleChannelFlow() {
277+
return IntegrationFlows
278+
.from(MessageChannels.direct("singleChannel"))
279+
.log()
280+
.toReactivePublisher();
281+
}
282+
283+
@Bean
284+
public Publisher<Message<String>> fixedSubscriberChannelFlow() {
285+
return IntegrationFlows
286+
.from("fixedSubscriberChannel", true)
287+
.log()
234288
.toReactivePublisher();
235289
}
236290

0 commit comments

Comments
 (0)