Skip to content

Commit 097f5b5

Browse files
committed
Use Sink instead of EmitterProcessor (#75)
Motivation: EmitterProcessor is deprecated Modifications: Used Sink to replace deprecated EmitterProcessor Results: Up-to-date with Reactor
1 parent fffddcb commit 097f5b5

File tree

1 file changed

+35
-30
lines changed

1 file changed

+35
-30
lines changed

src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,29 @@
2727
import io.netty.handler.logging.LogLevel;
2828
import io.netty.handler.logging.LoggingHandler;
2929
import io.netty.util.ReferenceCounted;
30-
import io.netty.util.internal.logging.InternalLoggerFactory;
3130
import io.r2dbc.spi.R2dbcException;
31+
import org.reactivestreams.Subscriber;
3232
import org.reactivestreams.Subscription;
3333
import reactor.core.CoreSubscriber;
3434
import reactor.core.Disposable;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.publisher.Mono;
3737
import reactor.core.publisher.Sinks;
38+
import reactor.core.publisher.Sinks.EmitFailureHandler;
3839
import reactor.core.publisher.SynchronousSink;
3940
import reactor.netty.Connection;
4041
import reactor.netty.FutureMono;
4142
import reactor.util.Logger;
4243
import reactor.util.Loggers;
4344
import reactor.util.context.Context;
45+
import reactor.util.context.ContextView;
4446

4547
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.function.BiConsumer;
4749
import java.util.function.Consumer;
4850
import java.util.function.Function;
4951

52+
import static io.asyncer.r2dbc.mysql.util.AssertUtils.require;
5053
import static io.asyncer.r2dbc.mysql.util.AssertUtils.requireNonNull;
5154

5255
/**
@@ -68,12 +71,8 @@ final class ReactorNettyClient implements Client {
6871

6972
private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer();
7073

71-
/**
72-
* TODO: use new API.
73-
*/
74-
@SuppressWarnings("deprecation")
75-
private final reactor.core.publisher.EmitterProcessor<ServerMessage> responseProcessor =
76-
reactor.core.publisher.EmitterProcessor.create(false);
74+
private final Sinks.Many<ServerMessage> responseProcessor =
75+
Sinks.many().multicast().onBackpressureBuffer(512, false);
7776

7877
private final RequestQueue requestQueue = new RequestQueue();
7978

@@ -83,6 +82,8 @@ final class ReactorNettyClient implements Client {
8382
requireNonNull(connection, "connection must not be null");
8483
requireNonNull(context, "context must not be null");
8584
requireNonNull(ssl, "ssl must not be null");
85+
require(responseProcessor.asFlux() instanceof Subscriber,
86+
"responseProcessor(" + responseProcessor + ") must be a Subscriber");
8687

8788
this.connection = connection;
8889
this.context = context;
@@ -147,11 +148,12 @@ public <T> Flux<T> exchange(ClientMessage request,
147148
return;
148149
}
149150

150-
Flux<T> responses = OperatorUtils.discardOnCancel(responseProcessor
151-
.doOnSubscribe(ignored -> emitNextRequest(request))
152-
.handle(handler)
153-
.doOnTerminate(requestQueue))
154-
.doOnDiscard(ReferenceCounted.class, RELEASE);
151+
Flux<T> responses = OperatorUtils.discardOnCancel(
152+
responseProcessor.asFlux()
153+
.doOnSubscribe(ignored -> emitNextRequest(request))
154+
.handle(handler)
155+
.doOnTerminate(requestQueue))
156+
.doOnDiscard(ReferenceCounted.class, RELEASE);
155157

156158
requestQueue.submit(RequestTask.wrap(request, sink, responses));
157159
}).flatMapMany(identity());
@@ -173,13 +175,16 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
173175
}
174176

175177
Flux<T> responses = responseProcessor
176-
.doOnSubscribe(ignored -> exchangeable.subscribe(this::emitNextRequest,
177-
e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)))
178-
.handle(exchangeable)
179-
.doOnTerminate(() -> {
180-
exchangeable.dispose();
181-
requestQueue.run();
182-
});
178+
.asFlux()
179+
.doOnSubscribe(ignored -> exchangeable.subscribe(
180+
this::emitNextRequest,
181+
e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST))
182+
)
183+
.handle(exchangeable)
184+
.doOnTerminate(() -> {
185+
exchangeable.dispose();
186+
requestQueue.run();
187+
});
183188

184189
requestQueue.submit(RequestTask.wrap(exchangeable, sink, OperatorUtils.discardOnCancel(responses)
185190
.doOnDiscard(ReferenceCounted.class, RELEASE)
@@ -269,7 +274,7 @@ private <T> Mono<T> resumeError(Throwable e) {
269274

270275
private void drainError(R2dbcException e) {
271276
this.requestQueue.dispose();
272-
this.responseProcessor.onError(e);
277+
responseProcessor.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST);
273278
}
274279

275280
private void handleClose() {
@@ -294,14 +299,9 @@ private ResponseSubscriber(ResponseSink sink) {
294299
this.sink = sink;
295300
}
296301

297-
@Override
298-
public Context currentContext() {
299-
return ReactorNettyClient.this.responseProcessor.currentContext();
300-
}
301-
302302
@Override
303303
public void onSubscribe(Subscription s) {
304-
ReactorNettyClient.this.responseProcessor.onSubscribe(s);
304+
((Subscriber<?>) responseProcessor.asFlux()).onSubscribe(s);
305305
}
306306

307307
@Override
@@ -327,15 +327,20 @@ public void complete() {
327327
throw new UnsupportedOperationException();
328328
}
329329

330+
@Deprecated
330331
@Override
331-
@SuppressWarnings("deprecation")
332332
public Context currentContext() {
333-
return ReactorNettyClient.this.responseProcessor.currentContext();
333+
return Context.empty();
334+
}
335+
336+
@Override
337+
public ContextView contextView() {
338+
return Context.empty();
334339
}
335340

336341
@Override
337342
public void error(Throwable e) {
338-
ReactorNettyClient.this.responseProcessor.onError(ClientExceptions.wrap(e));
343+
responseProcessor.emitError(ClientExceptions.wrap(e), EmitFailureHandler.FAIL_FAST);
339344
}
340345

341346
@Override
@@ -353,7 +358,7 @@ public void next(ServerMessage message) {
353358
logger.debug("Response: {}", message);
354359
}
355360

356-
ReactorNettyClient.this.responseProcessor.onNext(message);
361+
responseProcessor.emitNext(message, EmitFailureHandler.FAIL_FAST);
357362
}
358363
}
359364

0 commit comments

Comments
 (0)