Skip to content

Commit 617ec35

Browse files
committed
Update after MonoProcessor deprecation in Reactor
This commit adapts the usage of `MonoProcessor` after deprecations introduced in reactor/reactor-core#1053
1 parent 922f945 commit 617ec35

File tree

13 files changed

+29
-15
lines changed

13 files changed

+29
-15
lines changed

spring-core/src/main/java/org/springframework/core/codec/Decoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import reactor.core.publisher.Flux;
2424
import reactor.core.publisher.Mono;
2525
import reactor.core.publisher.MonoProcessor;
26+
import reactor.core.publisher.Sinks;
2627

2728
import org.springframework.core.ResolvableType;
2829
import org.springframework.core.io.buffer.DataBuffer;
@@ -92,7 +93,7 @@ Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementTy
9293
default T decode(DataBuffer buffer, ResolvableType targetType,
9394
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
9495

95-
MonoProcessor<T> processor = MonoProcessor.create();
96+
MonoProcessor<T> processor = MonoProcessor.fromSink(Sinks.one());
9697
decodeToMono(Mono.just(buffer), targetType, mimeType, hints).subscribeWith(processor);
9798

9899
Assert.state(processor.isTerminated(), "DataBuffer decoding should have completed.");

spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* @since 5.1
3434
* @param <T> the object type
3535
*/
36+
@SuppressWarnings("deprecation")
3637
public class MonoToListenableFutureAdapter<T> implements ListenableFuture<T> {
3738

3839
private final MonoProcessor<T> processor;

spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
3030
import reactor.core.publisher.MonoProcessor;
31+
import reactor.core.publisher.Sinks;
3132

3233
import org.springframework.core.io.buffer.DataBuffer;
3334
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -162,7 +163,7 @@ private int refCount(DataBuffer dataBuffer) {
162163
}
163164

164165
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
165-
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
166+
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.fromSink(Sinks.one());
166167
MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono);
167168

168169
AtomicBoolean read = new AtomicBoolean();

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.reactivestreams.Publisher;
3535
import reactor.core.publisher.Mono;
3636
import reactor.core.publisher.MonoProcessor;
37+
import reactor.core.publisher.Sinks;
3738
import reactor.core.scheduler.Scheduler;
3839
import reactor.core.scheduler.Schedulers;
3940
import reactor.netty.Connection;
@@ -204,7 +205,7 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
204205
}
205206

206207
// Report first connect to the ListenableFuture
207-
MonoProcessor<Void> connectMono = MonoProcessor.create();
208+
MonoProcessor<Void> connectMono = MonoProcessor.fromSink(Sinks.one());
208209

209210
this.tcpClient
210211
.handle(new ReactorNettyHandler(handler))
@@ -315,7 +316,7 @@ public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
315316
logger.debug("Connected to " + conn.address());
316317
}
317318
});
318-
MonoProcessor<Void> completion = MonoProcessor.create();
319+
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
319320
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
320321
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
321322

spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.MonoProcessor;
29+
import reactor.core.publisher.Sinks;
2930

3031
import org.springframework.core.io.buffer.DataBuffer;
3132
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -64,7 +65,7 @@ public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
6465
super(dataBufferFactory);
6566
this.writeHandler = body -> {
6667
// Avoid .then() which causes data buffers to be released
67-
MonoProcessor<Void> completion = MonoProcessor.create();
68+
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
6869
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
6970
this.body.subscribe();
7071
return completion;

spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
2727
import reactor.core.publisher.MonoProcessor;
28+
import reactor.core.publisher.Sinks;
2829
import reactor.core.scheduler.Schedulers;
2930

3031
import org.springframework.core.io.buffer.DataBuffer;
@@ -83,8 +84,8 @@ public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
8384
private Mono<ClientHttpResponse> doConnect(
8485
HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
8586

86-
MonoProcessor<Void> requestWriteCompletion = MonoProcessor.create();
87-
MonoProcessor<Void> handlerCompletion = MonoProcessor.create();
87+
MonoProcessor<Void> requestWriteCompletion = MonoProcessor.fromSink(Sinks.one());
88+
MonoProcessor<Void> handlerCompletion = MonoProcessor.fromSink(Sinks.one());
8889
ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
8990

9091
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);

spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
2929
import reactor.core.publisher.MonoProcessor;
30+
import reactor.core.publisher.Sinks;
3031

3132
import org.springframework.core.io.buffer.DataBuffer;
3233
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@@ -132,7 +133,7 @@ final static class WiretapRecorder {
132133

133134
private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer();
134135

135-
private final MonoProcessor<byte[]> content = MonoProcessor.create();
136+
private final MonoProcessor<byte[]> content = MonoProcessor.fromSink(Sinks.one());
136137

137138
private boolean hasContentConsumer;
138139

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.stream.Collectors;
2323

2424
import reactor.core.publisher.MonoProcessor;
25+
import reactor.core.publisher.Sinks;
2526

2627
import org.springframework.core.DefaultParameterNameDiscoverer;
2728
import org.springframework.core.ParameterNameDiscoverer;
@@ -102,7 +103,7 @@ public ParameterNameDiscoverer getParameterNameDiscoverer() {
102103
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
103104
BindingContext bindingContext, Object... providedArgs) {
104105

105-
MonoProcessor<HandlerResult> processor = MonoProcessor.create();
106+
MonoProcessor<HandlerResult> processor = MonoProcessor.fromSink(Sinks.one());
106107
this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
107108

108109
if (processor.isTerminated()) {

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import reactor.core.publisher.Mono;
2727
import reactor.core.publisher.MonoProcessor;
28+
import reactor.core.publisher.Sinks;
2829

2930
import org.springframework.beans.BeanUtils;
3031
import org.springframework.core.DefaultParameterNameDiscoverer;
@@ -116,7 +117,7 @@ public Mono<Object> resolveArgument(
116117
Mono<?> valueMono = prepareAttributeMono(name, valueType, context, exchange);
117118

118119
Map<String, Object> model = context.getModel().asMap();
119-
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.create();
120+
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.fromSink(Sinks.one());
120121
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono);
121122

122123
return valueMono.flatMap(value -> {

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.MonoProcessor;
29+
import reactor.core.publisher.Sinks;
2930
import reactor.util.concurrent.Queues;
3031

3132
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -73,7 +74,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
7374

7475
private final AtomicBoolean sendCalled = new AtomicBoolean();
7576

76-
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.create();
77+
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.fromSink(Sinks.one());
7778

7879

7980
/**

0 commit comments

Comments
 (0)