Skip to content

Commit 734f4a4

Browse files
committed
Replace of ReplayProcessor in RSocket tests
See gh-25085
1 parent 97efacc commit 734f4a4

File tree

4 files changed

+23
-23
lines changed

4 files changed

+23
-23
lines changed

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.reactivestreams.Publisher;
4343
import reactor.core.publisher.Flux;
4444
import reactor.core.publisher.Mono;
45-
import reactor.core.publisher.ReplayProcessor;
45+
import reactor.core.publisher.Sinks;
4646
import reactor.test.StepVerifier;
4747

4848
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -246,8 +246,8 @@ private static class PayloadInterceptor implements RSocket, RSocketInterceptor {
246246
void checkForLeaks() {
247247
this.rsockets.stream().map(PayloadSavingDecorator::getPayloads)
248248
.forEach(payloadInfoProcessor -> {
249-
payloadInfoProcessor.onComplete();
250-
payloadInfoProcessor
249+
payloadInfoProcessor.complete();
250+
payloadInfoProcessor.asFlux()
251251
.doOnNext(this::checkForLeak)
252252
.blockLast();
253253
});
@@ -291,18 +291,18 @@ private static class PayloadSavingDecorator implements RSocket {
291291

292292
private final RSocket delegate;
293293

294-
private ReplayProcessor<PayloadLeakInfo> payloads = ReplayProcessor.create();
294+
private Sinks.StandaloneFluxSink<PayloadLeakInfo> payloads = Sinks.replayAll();
295295

296296
PayloadSavingDecorator(RSocket delegate) {
297297
this.delegate = delegate;
298298
}
299299

300-
ReplayProcessor<PayloadLeakInfo> getPayloads() {
300+
Sinks.StandaloneFluxSink<PayloadLeakInfo> getPayloads() {
301301
return this.payloads;
302302
}
303303

304304
void reset() {
305-
this.payloads = ReplayProcessor.create();
305+
this.payloads = Sinks.replayAll();
306306
}
307307

308308
@Override
@@ -328,7 +328,7 @@ public Flux<io.rsocket.Payload> requestChannel(Publisher<io.rsocket.Payload> pay
328328
}
329329

330330
private io.rsocket.Payload addPayload(io.rsocket.Payload payload) {
331-
this.payloads.onNext(new PayloadLeakInfo(payload));
331+
this.payloads.next(new PayloadLeakInfo(payload));
332332
return payload;
333333
}
334334

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.reactivestreams.Publisher;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.publisher.Mono;
37-
import reactor.core.publisher.ReplayProcessor;
37+
import reactor.core.publisher.Sinks;
3838
import reactor.test.StepVerifier;
3939

4040
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -108,7 +108,7 @@ public void fireAndForget() {
108108
.concatMap(i -> requester.route("receive").data("Hello " + i).send())
109109
.blockLast();
110110

111-
StepVerifier.create(context.getBean(ServerController.class).fireForgetPayloads)
111+
StepVerifier.create(context.getBean(ServerController.class).fireForgetPayloads.asFlux())
112112
.expectNext("Hello 1")
113113
.expectNext("Hello 2")
114114
.expectNext("Hello 3")
@@ -171,7 +171,7 @@ public void metadataPush() {
171171
.concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata())
172172
.blockLast();
173173

174-
StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads)
174+
StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads.asFlux())
175175
.expectNext("bar")
176176
.expectNext("baz")
177177
.thenAwait(Duration.ofMillis(50))
@@ -225,14 +225,14 @@ public void noMatchingRoute() {
225225
@Controller
226226
static class ServerController {
227227

228-
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
228+
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();
229229

230-
final ReplayProcessor<String> metadataPushPayloads = ReplayProcessor.create();
230+
final Sinks.StandaloneFluxSink<String> metadataPushPayloads = Sinks.replayAll();
231231

232232

233233
@MessageMapping("receive")
234234
void receive(String payload) {
235-
this.fireForgetPayloads.onNext(payload);
235+
this.fireForgetPayloads.next(payload);
236236
}
237237

238238
@MessageMapping("echo")
@@ -274,7 +274,7 @@ Mono<Void> voidReturnValue(String payload) {
274274

275275
@ConnectMapping("foo-updates")
276276
public void handleMetadata(@Header("foo") String foo) {
277-
this.metadataPushPayloads.onNext(foo);
277+
this.metadataPushPayloads.next(foo);
278278
}
279279

280280
@MessageExceptionHandler

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import reactor.core.publisher.Flux;
3030
import reactor.core.publisher.Mono;
3131
import reactor.core.publisher.MonoProcessor;
32-
import reactor.core.publisher.ReplayProcessor;
32+
import reactor.core.publisher.Sinks;
3333
import reactor.core.scheduler.Schedulers;
3434
import reactor.test.StepVerifier;
3535

@@ -212,11 +212,11 @@ private void runTest(Runnable testEcho) {
212212

213213
private static class ClientHandler {
214214

215-
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
215+
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();
216216

217217
@MessageMapping("receive")
218218
void receive(String payload) {
219-
this.fireForgetPayloads.onNext(payload);
219+
this.fireForgetPayloads.next(payload);
220220
}
221221

222222
@MessageMapping("echo")

spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping
3939
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler
4040
import org.springframework.stereotype.Controller
4141
import reactor.core.publisher.Flux
42-
import reactor.core.publisher.ReplayProcessor
42+
import reactor.core.publisher.Sinks
4343
import reactor.test.StepVerifier
4444
import java.time.Duration
4545

@@ -56,7 +56,7 @@ class RSocketClientToServerCoroutinesIntegrationTests {
5656
Flux.range(1, 3)
5757
.concatMap { requester.route("receive").data("Hello $it").send() }
5858
.blockLast()
59-
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads)
59+
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
6060
.expectNext("Hello 1")
6161
.expectNext("Hello 2")
6262
.expectNext("Hello 3")
@@ -70,7 +70,7 @@ class RSocketClientToServerCoroutinesIntegrationTests {
7070
Flux.range(1, 3)
7171
.concatMap { i: Int -> requester.route("receive-async").data("Hello $i").send() }
7272
.blockLast()
73-
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads)
73+
StepVerifier.create(context.getBean(ServerController::class.java).fireForgetPayloads.asFlux())
7474
.expectNext("Hello 1")
7575
.expectNext("Hello 2")
7676
.expectNext("Hello 3")
@@ -145,17 +145,17 @@ class RSocketClientToServerCoroutinesIntegrationTests {
145145
@Controller
146146
class ServerController {
147147

148-
val fireForgetPayloads = ReplayProcessor.create<String>()
148+
val fireForgetPayloads = Sinks.replayAll<String>()
149149

150150
@MessageMapping("receive")
151151
fun receive(payload: String) {
152-
fireForgetPayloads.onNext(payload)
152+
fireForgetPayloads.next(payload)
153153
}
154154

155155
@MessageMapping("receive-async")
156156
suspend fun receiveAsync(payload: String) {
157157
delay(10)
158-
fireForgetPayloads.onNext(payload)
158+
fireForgetPayloads.next(payload)
159159
}
160160

161161
@MessageMapping("echo-async")

0 commit comments

Comments
 (0)