Skip to content

Commit a180ca8

Browse files
committed
MessagingGW: replace MonoProcessor with Sinks.One
The `MonoProcessor.create()` is deprecated in the Reactor in favor of `Sinks.one()`
1 parent 0bae11c commit a180ca8

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
import reactor.core.publisher.Mono;
7070
import reactor.core.publisher.MonoProcessor;
71+
import reactor.core.publisher.Sinks;
7172

7273
/**
7374
* A convenient base class for connecting application code to
@@ -82,7 +83,7 @@
8283
@IntegrationManagedResource
8384
public abstract class MessagingGatewaySupport extends AbstractEndpoint
8485
implements org.springframework.integration.support.management.TrackableComponent,
85-
IntegrationInboundManagement, IntegrationPattern {
86+
IntegrationInboundManagement, IntegrationPattern {
8687

8788
private static final long DEFAULT_TIMEOUT = 1000L;
8889

@@ -648,7 +649,7 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
648649

649650
sendMessageForReactiveFlow(requestChannel, messageToSend);
650651

651-
return buildReplyMono(requestMessage, replyChan.replyMono, error, originalReplyChannelHeader,
652+
return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error, originalReplyChannelHeader,
652653
originalErrorChannelHeader);
653654
})
654655
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
@@ -886,21 +887,19 @@ public Message<?> toMessage(Object object, @Nullable Map<String, Object> headers
886887

887888
private static class MonoReplyChannel implements MessageChannel, ReactiveStreamsSubscribableChannel {
888889

889-
private final MonoProcessor<Message<?>> replyMono = MonoProcessor.create();
890+
private final Sinks.One<Message<?>> replyMono = Sinks.one();
890891

891892
MonoReplyChannel() {
892893
}
893894

894895
@Override
895896
public boolean send(Message<?> message, long timeout) {
896-
this.replyMono.onNext(message);
897-
this.replyMono.onComplete();
898-
return true;
897+
return Boolean.TRUE.equals(this.replyMono.emitValue(message).hasEmitted());
899898
}
900899

901900
@Override
902901
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
903-
publisher.subscribe(this.replyMono);
902+
publisher.subscribe(MonoProcessor.fromSink(replyMono));
904903
}
905904

906905
}

0 commit comments

Comments
 (0)