Skip to content

Commit 7391f9b

Browse files
committed
Fix Reactor Core DirectProcessor deprecation
As of reactor/reactor-core#2188, `DirectProcessor` variants are deprecated. This commit replaces them with the new `FluxIdentityProcessor` variant. See gh-25085
1 parent 34cb489 commit 7391f9b

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
import org.apache.commons.logging.Log;
3333
import org.apache.commons.logging.LogFactory;
3434
import org.reactivestreams.Publisher;
35-
import reactor.core.publisher.DirectProcessor;
35+
import reactor.core.publisher.FluxIdentityProcessor;
3636
import reactor.core.publisher.Mono;
3737
import reactor.core.publisher.MonoProcessor;
38+
import reactor.core.publisher.Processors;
3839
import reactor.core.scheduler.Scheduler;
3940
import reactor.core.scheduler.Schedulers;
4041
import reactor.netty.Connection;
@@ -316,7 +317,7 @@ public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
316317
logger.debug("Connected to " + conn.address());
317318
}
318319
});
319-
DirectProcessor<Void> completion = DirectProcessor.create();
320+
FluxIdentityProcessor<Void> completion = Processors.more().multicastNoBackpressure();
320321
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
321322
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
322323

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.springframework.messaging.tcp.reactor;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import reactor.core.publisher.DirectProcessor;
20+
import reactor.core.publisher.FluxIdentityProcessor;
2121
import reactor.core.publisher.Mono;
2222
import reactor.netty.NettyInbound;
2323
import reactor.netty.NettyOutbound;
@@ -42,11 +42,11 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
4242

4343
private final ReactorNettyCodec<P> codec;
4444

45-
private final DirectProcessor<Void> closeProcessor;
45+
private final FluxIdentityProcessor<Void> closeProcessor;
4646

4747

4848
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
49-
ReactorNettyCodec<P> codec, DirectProcessor<Void> closeProcessor) {
49+
ReactorNettyCodec<P> codec, FluxIdentityProcessor<Void> closeProcessor) {
5050

5151
this.inbound = inbound;
5252
this.outbound = outbound;

0 commit comments

Comments
 (0)