Skip to content

Commit 22bf62d

Browse files
committed
Replace remaining use of deprecated Processors in tests
See gh-25085
1 parent 0e83aaa commit 22bf62d

File tree

5 files changed

+75
-75
lines changed

5 files changed

+75
-75
lines changed

spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,11 +32,10 @@
3232
import org.mockito.Captor;
3333
import org.mockito.Mock;
3434
import org.mockito.junit.jupiter.MockitoExtension;
35-
import reactor.core.publisher.EmitterProcessor;
3635
import reactor.core.publisher.Flux;
37-
import reactor.core.publisher.FluxProcessor;
3836
import reactor.core.publisher.Mono;
3937
import reactor.core.publisher.MonoProcessor;
38+
import reactor.core.publisher.Sinks;
4039

4140
import org.springframework.context.support.StaticApplicationContext;
4241
import org.springframework.lang.Nullable;
@@ -343,8 +342,8 @@ public void monoSuccess() {
343342
Message<?> message = createMessage("/app1/mono");
344343
this.messageHandler.handleMessage(message);
345344

346-
assertThat(controller.mono).isNotNull();
347-
controller.mono.onNext("foo");
345+
assertThat(controller.monoProcessor).isNotNull();
346+
controller.monoProcessor.onNext("foo");
348347
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
349348
assertThat(this.payloadCaptor.getValue()).isEqualTo("foo");
350349
}
@@ -358,7 +357,7 @@ public void monoFailure() {
358357
Message<?> message = createMessage("/app1/mono");
359358
this.messageHandler.handleMessage(message);
360359

361-
controller.mono.onError(new IllegalStateException());
360+
controller.monoProcessor.onError(new IllegalStateException());
362361
assertThat(controller.exceptionCaught).isTrue();
363362
}
364363

@@ -371,14 +370,14 @@ public void fluxNotHandled() {
371370
Message<?> message = createMessage("/app1/flux");
372371
this.messageHandler.handleMessage(message);
373372

374-
assertThat(controller.flux).isNotNull();
375-
controller.flux.onNext("foo");
373+
assertThat(controller.fluxSink).isNotNull();
374+
controller.fluxSink.next("foo");
376375

377376
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
378377
}
379378

380379
@Test
381-
public void placeholder() throws Exception {
380+
public void placeholder() {
382381
Message<?> message = createMessage("/pre/myValue");
383382
this.messageHandler.setEmbeddedValueResolver(value -> ("/${myProperty}".equals(value) ? "/myValue" : value));
384383
this.messageHandler.registerHandler(this.testController);
@@ -586,22 +585,22 @@ public void handleValidationException() {
586585
@Controller
587586
private static class ReactiveController {
588587

589-
private MonoProcessor<String> mono;
588+
private MonoProcessor<String> monoProcessor;
590589

591-
private FluxProcessor<String, String> flux;
590+
private Sinks.StandaloneFluxSink<String> fluxSink;
592591

593592
private boolean exceptionCaught = false;
594593

595594
@MessageMapping("mono")
596595
public Mono<String> handleMono() {
597-
this.mono = MonoProcessor.create();
598-
return this.mono;
596+
this.monoProcessor = MonoProcessor.create();
597+
return this.monoProcessor;
599598
}
600599

601600
@MessageMapping("flux")
602601
public Flux<String> handleFlux() {
603-
this.flux = EmitterProcessor.create();
604-
return this.flux;
602+
this.fluxSink = Sinks.unicast();
603+
return this.fluxSink.asFlux();
605604
}
606605

607606
@MessageExceptionHandler(IllegalStateException.class)

spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.junit.jupiter.api.Test;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
29+
import reactor.core.publisher.Sinks;
2930
import reactor.core.publisher.UnicastProcessor;
3031

3132
import org.springframework.core.ResolvableType;
@@ -207,12 +208,12 @@ private String decodeToString(Part part) {
207208

208209
@Test // SPR-16402
209210
public void singleSubscriberWithResource() throws IOException {
210-
UnicastProcessor<Resource> processor = UnicastProcessor.create();
211+
Sinks.StandaloneFluxSink<Resource> sink = Sinks.unicast();
211212
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
212-
Mono.just(logo).subscribe(processor);
213+
sink.next(logo);
213214

214215
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
215-
bodyBuilder.asyncPart("logo", processor, Resource.class);
216+
bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class);
216217

217218
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
218219

spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
3030
import reactor.core.publisher.MonoProcessor;
31-
import reactor.core.publisher.ReplayProcessor;
3231
import reactor.util.retry.Retry;
3332

3433
import org.springframework.context.annotation.Bean;
@@ -81,15 +80,16 @@ void echo(WebSocketClient client, HttpServer server, Class<?> serverConfigClass)
8180
private void testEcho() {
8281
int count = 100;
8382
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
84-
ReplayProcessor<Object> output = ReplayProcessor.create(count);
85-
this.client.execute(getUrl("/echo"), session -> session
86-
.send(input.map(session::textMessage))
87-
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
88-
.subscribeWith(output)
89-
.then())
83+
AtomicReference<List<String>> actualRef = new AtomicReference<>();
84+
this.client.execute(getUrl("/echo"), session ->
85+
session.send(input.map(session::textMessage))
86+
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
87+
.collectList()
88+
.doOnNext(actualRef::set)
89+
.then())
9090
.block(TIMEOUT);
91-
assertThat(output.isTerminated()).isTrue();
92-
assertThat(output.collectList().block()).isEqualTo(input.collectList().block());
91+
assertThat(actualRef.get()).isNotNull();
92+
assertThat(actualRef.get()).isEqualTo(input.collectList().block());
9393
}
9494

9595
@ParameterizedWebSocketTest

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import io.reactivex.rxjava3.core.SingleEmitter;
3131
import org.junit.jupiter.api.BeforeEach;
3232
import org.junit.jupiter.api.Test;
33-
import reactor.core.publisher.EmitterProcessor;
3433
import reactor.core.publisher.Flux;
3534
import reactor.core.publisher.Mono;
3635
import reactor.core.publisher.MonoProcessor;
36+
import reactor.core.publisher.Sinks;
3737

3838
import org.springframework.core.MethodParameter;
3939
import org.springframework.core.ReactiveAdapterRegistry;
@@ -138,11 +138,11 @@ public void deferredResultSubscriberWithMultipleValues() throws Exception {
138138
Bar bar1 = new Bar("foo");
139139
Bar bar2 = new Bar("bar");
140140

141-
EmitterProcessor<Bar> emitter = EmitterProcessor.create();
142-
testDeferredResultSubscriber(emitter, Flux.class, forClass(Bar.class), () -> {
143-
emitter.onNext(bar1);
144-
emitter.onNext(bar2);
145-
emitter.onComplete();
141+
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
142+
testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> {
143+
sink.next(bar1);
144+
sink.next(bar2);
145+
sink.complete();
146146
}, Arrays.asList(bar1, bar2));
147147
}
148148

@@ -189,16 +189,16 @@ private void testSseResponse(boolean expectSseEmitter) throws Exception {
189189
public void writeServerSentEvents() throws Exception {
190190

191191
this.servletRequest.addHeader("Accept", "text/event-stream");
192-
EmitterProcessor<String> processor = EmitterProcessor.create();
193-
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, forClass(String.class));
192+
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
193+
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class));
194194

195195
EmitterHandler emitterHandler = new EmitterHandler();
196196
sseEmitter.initialize(emitterHandler);
197197

198-
processor.onNext("foo");
199-
processor.onNext("bar");
200-
processor.onNext("baz");
201-
processor.onComplete();
198+
sink.next("foo");
199+
sink.next("bar");
200+
sink.next("baz");
201+
sink.complete();
202202

203203
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
204204
}
@@ -208,16 +208,16 @@ public void writeServerSentEventsWithBuilder() throws Exception {
208208

209209
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
210210

211-
EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
212-
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);
211+
Sinks.StandaloneFluxSink<ServerSentEvent<?>> sink = Sinks.unicast();
212+
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type);
213213

214214
EmitterHandler emitterHandler = new EmitterHandler();
215215
sseEmitter.initialize(emitterHandler);
216216

217-
processor.onNext(ServerSentEvent.builder("foo").id("1").build());
218-
processor.onNext(ServerSentEvent.builder("bar").id("2").build());
219-
processor.onNext(ServerSentEvent.builder("baz").id("3").build());
220-
processor.onComplete();
217+
sink.next(ServerSentEvent.builder("foo").id("1").build());
218+
sink.next(ServerSentEvent.builder("bar").id("2").build());
219+
sink.next(ServerSentEvent.builder("baz").id("3").build());
220+
sink.complete();
221221

222222
assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n");
223223
}
@@ -227,8 +227,8 @@ public void writeStreamJson() throws Exception {
227227

228228
this.servletRequest.addHeader("Accept", "application/stream+json");
229229

230-
EmitterProcessor<Bar> processor = EmitterProcessor.create();
231-
ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(Bar.class));
230+
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
231+
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class));
232232

233233
EmitterHandler emitterHandler = new EmitterHandler();
234234
emitter.initialize(emitterHandler);
@@ -239,9 +239,9 @@ public void writeStreamJson() throws Exception {
239239
Bar bar1 = new Bar("foo");
240240
Bar bar2 = new Bar("bar");
241241

242-
processor.onNext(bar1);
243-
processor.onNext(bar2);
244-
processor.onComplete();
242+
sink.next(bar1);
243+
sink.next(bar2);
244+
sink.complete();
245245

246246
assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/stream+json");
247247
assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n"));
@@ -250,16 +250,16 @@ public void writeStreamJson() throws Exception {
250250
@Test
251251
public void writeText() throws Exception {
252252

253-
EmitterProcessor<String> processor = EmitterProcessor.create();
254-
ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(String.class));
253+
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
254+
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));
255255

256256
EmitterHandler emitterHandler = new EmitterHandler();
257257
emitter.initialize(emitterHandler);
258258

259-
processor.onNext("The quick");
260-
processor.onNext(" brown fox jumps over ");
261-
processor.onNext("the lazy dog");
262-
processor.onComplete();
259+
sink.next("The quick");
260+
sink.next(" brown fox jumps over ");
261+
sink.next("the lazy dog");
262+
sink.complete();
263263

264264
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
265265
}

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
26-
import reactor.core.publisher.EmitterProcessor;
2726
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Sinks;
2828

2929
import org.springframework.core.MethodParameter;
3030
import org.springframework.core.ResolvableType;
@@ -227,16 +227,16 @@ public void responseBodyFlux() throws Exception {
227227
this.request.addHeader("Accept", "text/event-stream");
228228

229229
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
230-
EmitterProcessor<String> processor = EmitterProcessor.create();
231-
this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest);
230+
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
231+
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
232232

233233
assertThat(this.request.isAsyncStarted()).isTrue();
234234
assertThat(this.response.getStatus()).isEqualTo(200);
235235

236-
processor.onNext("foo");
237-
processor.onNext("bar");
238-
processor.onNext("baz");
239-
processor.onComplete();
236+
sink.next("foo");
237+
sink.next("bar");
238+
sink.next("baz");
239+
sink.complete();
240240

241241
assertThat(this.response.getContentType()).isEqualTo("text/event-stream");
242242
assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
@@ -248,14 +248,14 @@ public void responseBodyFluxWithError() throws Exception {
248248
this.request.addHeader("Accept", "text/event-stream");
249249

250250
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
251-
EmitterProcessor<String> processor = EmitterProcessor.create();
252-
this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest);
251+
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
252+
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
253253

254254
assertThat(this.request.isAsyncStarted()).isTrue();
255255

256256
IllegalStateException ex = new IllegalStateException("wah wah");
257-
processor.onError(ex);
258-
processor.onComplete();
257+
sink.error(ex);
258+
sink.complete();
259259

260260
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest);
261261
assertThat(asyncManager.getConcurrentResult()).isSameAs(ex);
@@ -290,19 +290,19 @@ public void responseEntitySseNoContent() throws Exception {
290290
@Test
291291
public void responseEntityFlux() throws Exception {
292292

293-
EmitterProcessor<String> processor = EmitterProcessor.create();
294-
ResponseEntity<Flux<String>> entity = ResponseEntity.ok().body(processor);
293+
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
294+
ResponseEntity<Flux<String>> entity = ResponseEntity.ok().body(sink.asFlux());
295295
ResolvableType bodyType = forClassWithGenerics(Flux.class, String.class);
296296
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);
297297
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);
298298

299299
assertThat(this.request.isAsyncStarted()).isTrue();
300300
assertThat(this.response.getStatus()).isEqualTo(200);
301301

302-
processor.onNext("foo");
303-
processor.onNext("bar");
304-
processor.onNext("baz");
305-
processor.onComplete();
302+
sink.next("foo");
303+
sink.next("bar");
304+
sink.next("baz");
305+
sink.complete();
306306

307307
assertThat(this.response.getContentType()).isEqualTo("text/plain");
308308
assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz");
@@ -311,8 +311,8 @@ public void responseEntityFlux() throws Exception {
311311
@Test // SPR-17076
312312
public void responseEntityFluxWithCustomHeader() throws Exception {
313313

314-
EmitterProcessor<SimpleBean> processor = EmitterProcessor.create();
315-
ResponseEntity<Flux<SimpleBean>> entity = ResponseEntity.ok().header("x-foo", "bar").body(processor);
314+
Sinks.StandaloneFluxSink<SimpleBean> sink = Sinks.unicast();
315+
ResponseEntity<Flux<SimpleBean>> entity = ResponseEntity.ok().header("x-foo", "bar").body(sink.asFlux());
316316
ResolvableType bodyType = forClassWithGenerics(Flux.class, SimpleBean.class);
317317
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);
318318
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);

0 commit comments

Comments
 (0)