Skip to content

Commit c765e80

Browse files
artembilangaryrussell
authored andcommitted
SO-62761903: Inject BF into Gateway's correlator
Related to https://stackoverflow.com/questions/62761903/spring-integration-reactive-streams-support-exception-in-creating-a-reactive The `MessagingGatewaySupport` creates an internal endpoint for consuming messages from the provided `replyChannel`. The endpoint type depends on the channel type. The `ReactiveStreamsConsumer` was missing a `BeanFactory` injection causing an error when `ReactiveStreamsConsumer` tries to extract a default `ErrorHandler` from `BeanFactory` * Refactor `MessagingGatewaySupport` to inject a `BeanFactory` to all the possible correlator endpoints. Also always call `afterPropertiesSet()` for all of them **Cherry-pick to 5.3.x & 5.2.x**
1 parent 463e3d4 commit c765e80

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -787,23 +787,21 @@ protected void registerReplyMessageCorrelatorIfNecessary() {
787787
}
788788
else if (replyChan instanceof PollableChannel) {
789789
PollingConsumer endpoint = new PollingConsumer((PollableChannel) replyChan, handler);
790-
if (beanFactory != null) {
791-
endpoint.setBeanFactory(beanFactory);
792-
}
793790
endpoint.setReceiveTimeout(this.replyTimeout);
794-
endpoint.afterPropertiesSet();
795791
correlator = endpoint;
796792
}
797793
else if (replyChan instanceof ReactiveStreamsSubscribableChannel) {
798-
ReactiveStreamsConsumer endpoint =
799-
new ReactiveStreamsConsumer(replyChan, (Subscriber<Message<?>>) handler);
800-
endpoint.afterPropertiesSet();
801-
correlator = endpoint;
794+
correlator = new ReactiveStreamsConsumer(replyChan, (Subscriber<Message<?>>) handler);
802795
}
803796
else {
804797
throw new MessagingException("Unsupported 'replyChannel' type [" + replyChan.getClass() + "]."
805798
+ "SubscribableChannel or PollableChannel type are supported.");
806799
}
800+
801+
if (beanFactory != null) {
802+
correlator.setBeanFactory(beanFactory);
803+
}
804+
correlator.afterPropertiesSet();
807805
this.replyMessageCorrelator = correlator;
808806
}
809807
if (isRunning()) {

spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.integration.annotation.Gateway;
4747
import org.springframework.integration.annotation.GatewayHeader;
4848
import org.springframework.integration.channel.DirectChannel;
49+
import org.springframework.integration.channel.FluxMessageChannel;
4950
import org.springframework.integration.channel.QueueChannel;
5051
import org.springframework.integration.endpoint.EventDrivenConsumer;
5152
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -158,6 +159,23 @@ public void testReceiveMessage() {
158159
assertThat(message.getPayload()).isEqualTo("foo");
159160
}
160161

162+
@Test
163+
public void testReactiveReplyChannel() {
164+
QueueChannel requestChannel = new QueueChannel();
165+
startResponder(requestChannel);
166+
FluxMessageChannel replyChannel = new FluxMessageChannel();
167+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestService.class);
168+
proxyFactory.setDefaultRequestChannel(requestChannel);
169+
proxyFactory.setDefaultReplyChannel(replyChannel);
170+
171+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
172+
proxyFactory.afterPropertiesSet();
173+
TestService service = (TestService) proxyFactory.getObject();
174+
175+
String result = service.requestReply("test");
176+
assertThat(result).isEqualTo("testbar");
177+
}
178+
161179
@Test
162180
public void testRequestReplyWithTypeConversion() {
163181
final QueueChannel requestChannel = new QueueChannel();

0 commit comments

Comments
 (0)