Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit cbc1eb1

Browse files
committed
Merge branch 'release/1.6.0'
2 parents dc6f699 + e767a68 commit cbc1eb1

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

proteus-client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.netty.buffer.ByteBuf;
3333
import io.netty.buffer.ByteBufAllocator;
3434
import io.netty.buffer.Unpooled;
35+
import io.netty.util.ReferenceCountUtil;
3536
import io.opentracing.Tracer;
3637
import io.rsocket.Payload;
3738
import io.rsocket.RSocket;

proteus-client/src/main/java/io/netifi/proteus/rsocket/WeightedReconnectingRSocket.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,13 @@ private ClientRSocketFactory getClientFactory() {
198198
if (keepalive) {
199199
connect =
200200
connect
201+
.keepAlive()
201202
.keepAliveTickPeriod(Duration.ofSeconds(tickPeriodSeconds))
202203
.keepAliveAckTimeout(Duration.ofSeconds(ackTimeoutSeconds))
203204
.keepAliveMissedAcks(missedAcks);
204205
} else {
205206
connect
207+
.keepAlive()
206208
.keepAliveAckTimeout(Duration.ofSeconds(0))
207209
.keepAliveAckTimeout(Duration.ofSeconds(0))
208210
.keepAliveMissedAcks(missedAcks);
@@ -276,8 +278,7 @@ void connect() {
276278
})
277279
.subscribe();
278280
setRSocket(rSocket);
279-
})
280-
.doFinally(signalType -> setupPayload.release());
281+
});
281282
}))
282283
.doOnError(t -> logger.error("error trying to broker", t))
283284
.retry()

0 commit comments

Comments
 (0)