Skip to content

Commit 068dbe5

Browse files
steveguryNiteshKant
authored andcommitted
Fix subscription in DuplexConnection. (#98)
**Problem** A Publisher may not have been initialized when a DuplexConnection failed during the connection establishement. (e.g. connection reset by peer) **Solution** Do the subscription as soon as Netty complete, regarding of the result of the ConnectFuture.
1 parent 1c90af0 commit 068dbe5

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
7272
}).connect(address);
7373

7474
connect.addListener(connectFuture -> {
75+
s.onSubscribe(EmptySubscription.INSTANCE);
7576
if (connectFuture.isSuccess()) {
7677
Channel ch = connect.channel();
77-
s.onSubscribe(EmptySubscription.INSTANCE);
7878
s.onNext(new ClientTcpDuplexConnection(ch, subjects));
7979
s.onComplete();
8080
} else {

reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
9191
clientHandler
9292
.getHandshakePromise()
9393
.addListener(handshakeFuture -> {
94+
s.onSubscribe(EmptySubscription.INSTANCE);
9495
if (handshakeFuture.isSuccess()) {
95-
s.onSubscribe(EmptySubscription.INSTANCE);
9696
s.onNext(new ClientWebSocketDuplexConnection(ch, subjects));
9797
s.onComplete();
9898
} else {

0 commit comments

Comments
 (0)