Skip to content

Commit 73162cd

Browse files
committed
fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0bc84d3 commit 73162cd

File tree

3 files changed

+12
-20
lines changed

3 files changed

+12
-20
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ private void tryShutdown() {
374374
}
375375

376376
private void terminate(Throwable e) {
377-
LOGGER.info("closing requester");
377+
LOGGER.info("closing requester " + getDuplexConnection());
378378
if (keepAliveFramesAcceptor != null) {
379379
keepAliveFramesAcceptor.dispose();
380380
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public Mono<Void> onClose() {
186186
}
187187

188188
final void doOnDispose() {
189-
LOGGER.info("closing responder");
189+
LOGGER.info("closing responder " + getDuplexConnection());
190190
cleanUpSendingSubscriptions();
191191

192192
getDuplexConnection().dispose();

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@
2121
import io.netty.util.ReferenceCountUtil;
2222
import io.netty.util.ReferenceCounted;
2323
import io.netty.util.ResourceLeakDetector;
24-
import io.rsocket.Closeable;
25-
import io.rsocket.DuplexConnection;
26-
import io.rsocket.Payload;
27-
import io.rsocket.RSocket;
28-
import io.rsocket.RSocketErrorException;
24+
import io.rsocket.*;
2925
import io.rsocket.core.RSocketConnector;
3026
import io.rsocket.core.RSocketServer;
3127
import io.rsocket.core.Resume;
@@ -55,15 +51,11 @@
5551
import org.junit.jupiter.api.DisplayName;
5652
import org.junit.jupiter.api.Test;
5753
import org.reactivestreams.Subscription;
58-
import reactor.core.CoreSubscriber;
59-
import reactor.core.Disposable;
60-
import reactor.core.Disposables;
61-
import reactor.core.Exceptions;
62-
import reactor.core.Fuseable;
54+
import reactor.core.*;
6355
import reactor.core.publisher.Flux;
6456
import reactor.core.publisher.Mono;
65-
import reactor.core.publisher.MonoProcessor;
6657
import reactor.core.publisher.Operators;
58+
import reactor.core.publisher.Sinks;
6759
import reactor.core.scheduler.Scheduler;
6860
import reactor.core.scheduler.Schedulers;
6961
import reactor.test.StepVerifier;
@@ -625,8 +617,8 @@ public TransportPair(
625617

626618
@Override
627619
public void dispose() {
628-
server.dispose();
629620
client.dispose();
621+
server.dispose();
630622
}
631623

632624
RSocket getClient() {
@@ -794,12 +786,12 @@ private static class ByteBufReleaserOperator
794786
implements CoreSubscriber<ByteBuf>, Subscription, Fuseable.QueueSubscription<ByteBuf> {
795787

796788
CoreSubscriber<? super ByteBuf> actual;
797-
final MonoProcessor<Void> closeableMono;
789+
final Sinks.Empty<Void> closeableMonoSink;
798790

799791
Subscription s;
800792

801793
public ByteBufReleaserOperator() {
802-
this.closeableMono = MonoProcessor.create();
794+
this.closeableMonoSink = Sinks.unsafe().empty();
803795
}
804796

805797
@Override
@@ -820,19 +812,19 @@ public void onNext(ByteBuf buf) {
820812
}
821813

822814
Mono<Void> onClose() {
823-
return closeableMono;
815+
return closeableMonoSink.asMono();
824816
}
825817

826818
@Override
827819
public void onError(Throwable t) {
828820
actual.onError(t);
829-
closeableMono.onError(t);
821+
closeableMonoSink.tryEmitError(t);
830822
}
831823

832824
@Override
833825
public void onComplete() {
834826
actual.onComplete();
835-
closeableMono.onComplete();
827+
closeableMonoSink.tryEmitEmpty();
836828
}
837829

838830
@Override
@@ -843,7 +835,7 @@ public void request(long n) {
843835
@Override
844836
public void cancel() {
845837
s.cancel();
846-
closeableMono.onComplete();
838+
closeableMonoSink.tryEmitEmpty();
847839
}
848840

849841
@Override

0 commit comments

Comments
 (0)