3030import io .netty .util .internal .logging .InternalLogger ;
3131import io .netty .util .internal .logging .InternalLoggerFactory ;
3232import io .r2dbc .spi .R2dbcException ;
33+ import org .reactivestreams .Subscriber ;
3334import org .reactivestreams .Subscription ;
3435import reactor .core .CoreSubscriber ;
3536import reactor .core .Disposable ;
3637import reactor .core .publisher .Flux ;
3738import reactor .core .publisher .Mono ;
3839import reactor .core .publisher .Sinks ;
40+ import reactor .core .publisher .Sinks .EmitFailureHandler ;
3941import reactor .core .publisher .SynchronousSink ;
4042import reactor .netty .Connection ;
4143import reactor .netty .FutureMono ;
4244import reactor .util .context .Context ;
45+ import reactor .util .context .ContextView ;
4346
4447import java .util .concurrent .atomic .AtomicBoolean ;
4548import java .util .function .BiConsumer ;
4649import java .util .function .Consumer ;
4750import java .util .function .Function ;
4851
52+ import static io .asyncer .r2dbc .mysql .internal .util .AssertUtils .require ;
4953import static io .asyncer .r2dbc .mysql .internal .util .AssertUtils .requireNonNull ;
5054
5155/**
@@ -67,12 +71,8 @@ final class ReactorNettyClient implements Client {
6771
6872 private final Sinks .Many <ClientMessage > requests = Sinks .many ().unicast ().onBackpressureBuffer ();
6973
70- /**
71- * TODO: use new API.
72- */
73- @ SuppressWarnings ("deprecation" )
74- private final reactor .core .publisher .EmitterProcessor <ServerMessage > responseProcessor =
75- reactor .core .publisher .EmitterProcessor .create (false );
74+ private final Sinks .Many <ServerMessage > responseProcessor =
75+ Sinks .many ().multicast ().onBackpressureBuffer (512 , false );
7676
7777 private final RequestQueue requestQueue = new RequestQueue ();
7878
@@ -82,6 +82,8 @@ final class ReactorNettyClient implements Client {
8282 requireNonNull (connection , "connection must not be null" );
8383 requireNonNull (context , "context must not be null" );
8484 requireNonNull (ssl , "ssl must not be null" );
85+ require (responseProcessor .asFlux () instanceof Subscriber ,
86+ "responseProcessor(" + responseProcessor + ") must be a Subscriber" );
8587
8688 this .connection = connection ;
8789 this .context = context ;
@@ -146,11 +148,12 @@ public <T> Flux<T> exchange(ClientMessage request,
146148 return ;
147149 }
148150
149- Flux <T > responses = OperatorUtils .discardOnCancel (responseProcessor
150- .doOnSubscribe (ignored -> emitNextRequest (request ))
151- .handle (handler )
152- .doOnTerminate (requestQueue ))
153- .doOnDiscard (ReferenceCounted .class , RELEASE );
151+ Flux <T > responses = OperatorUtils .discardOnCancel (
152+ responseProcessor .asFlux ()
153+ .doOnSubscribe (ignored -> emitNextRequest (request ))
154+ .handle (handler )
155+ .doOnTerminate (requestQueue ))
156+ .doOnDiscard (ReferenceCounted .class , RELEASE );
154157
155158 requestQueue .submit (RequestTask .wrap (request , sink , responses ));
156159 }).flatMapMany (identity ());
@@ -172,13 +175,16 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
172175 }
173176
174177 Flux <T > responses = responseProcessor
175- .doOnSubscribe (ignored -> exchangeable .subscribe (this ::emitNextRequest ,
176- e -> requests .emitError (e , Sinks .EmitFailureHandler .FAIL_FAST )))
177- .handle (exchangeable )
178- .doOnTerminate (() -> {
179- exchangeable .dispose ();
180- requestQueue .run ();
181- });
178+ .asFlux ()
179+ .doOnSubscribe (ignored -> exchangeable .subscribe (
180+ this ::emitNextRequest ,
181+ e -> requests .emitError (e , Sinks .EmitFailureHandler .FAIL_FAST ))
182+ )
183+ .handle (exchangeable )
184+ .doOnTerminate (() -> {
185+ exchangeable .dispose ();
186+ requestQueue .run ();
187+ });
182188
183189 requestQueue .submit (RequestTask .wrap (exchangeable , sink , OperatorUtils .discardOnCancel (responses )
184190 .doOnDiscard (ReferenceCounted .class , RELEASE )
@@ -268,7 +274,7 @@ private <T> Mono<T> resumeError(Throwable e) {
268274
269275 private void drainError (R2dbcException e ) {
270276 this .requestQueue .dispose ();
271- this . responseProcessor .onError ( e );
277+ responseProcessor .emitError ( e , Sinks . EmitFailureHandler . FAIL_FAST );
272278 }
273279
274280 private void handleClose () {
@@ -293,14 +299,9 @@ private ResponseSubscriber(ResponseSink sink) {
293299 this .sink = sink ;
294300 }
295301
296- @ Override
297- public Context currentContext () {
298- return ReactorNettyClient .this .responseProcessor .currentContext ();
299- }
300-
301302 @ Override
302303 public void onSubscribe (Subscription s ) {
303- ReactorNettyClient . this . responseProcessor .onSubscribe (s );
304+ (( Subscriber <?>) responseProcessor . asFlux ()) .onSubscribe (s );
304305 }
305306
306307 @ Override
@@ -326,15 +327,20 @@ public void complete() {
326327 throw new UnsupportedOperationException ();
327328 }
328329
330+ @ Deprecated
329331 @ Override
330- @ SuppressWarnings ("deprecation" )
331332 public Context currentContext () {
332- return ReactorNettyClient .this .responseProcessor .currentContext ();
333+ return Context .empty ();
334+ }
335+
336+ @ Override
337+ public ContextView contextView () {
338+ return Context .empty ();
333339 }
334340
335341 @ Override
336342 public void error (Throwable e ) {
337- ReactorNettyClient . this . responseProcessor .onError (ClientExceptions .wrap (e ));
343+ responseProcessor .emitError (ClientExceptions .wrap (e ), EmitFailureHandler . FAIL_FAST );
338344 }
339345
340346 @ Override
@@ -352,7 +358,7 @@ public void next(ServerMessage message) {
352358 logger .debug ("Response: {}" , message );
353359 }
354360
355- ReactorNettyClient . this . responseProcessor .onNext (message );
361+ responseProcessor .emitNext (message , EmitFailureHandler . FAIL_FAST );
356362 }
357363 }
358364
0 commit comments