Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit 534e099

Browse files
committed
Fix SetupPayload being released too early
1 parent fc80afc commit 534e099

File tree

4 files changed

+12
-16
lines changed

4 files changed

+12
-16
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.netifi.proteus
2-
version=1.5.3
2+
version=1.5.4

proteus-client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
import io.netty.buffer.ByteBuf;
1818
import io.netty.buffer.ByteBufAllocator;
1919
import io.netty.buffer.Unpooled;
20-
import io.netty.util.ReferenceCountUtil;
2120
import io.opentracing.Tracer;
2221
import io.rsocket.Payload;
2322
import io.rsocket.RSocket;
2423
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
2524
import io.rsocket.transport.ClientTransport;
2625
import io.rsocket.util.ByteBufPayload;
27-
import io.rsocket.util.DefaultPayload;
2826
import java.net.InetSocketAddress;
2927
import java.net.SocketAddress;
3028
import java.time.Duration;
@@ -137,15 +135,10 @@ static Payload getSetupPayload(
137135
String group,
138136
long accessKey,
139137
ByteBuf accessToken) {
140-
ByteBuf metadata = null;
141-
try {
142-
metadata =
143-
DestinationSetupFlyweight.encode(
144-
alloc, computedFromDestination, group, accessKey, accessToken);
145-
return DefaultPayload.create(Unpooled.EMPTY_BUFFER, metadata);
146-
} finally {
147-
ReferenceCountUtil.safeRelease(metadata);
148-
}
138+
ByteBuf metadata =
139+
DestinationSetupFlyweight.encode(
140+
alloc, computedFromDestination, group, accessKey, accessToken);
141+
return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, metadata);
149142
}
150143

151144
private synchronized void reconcileSuppliers(Set<Broker> incomingBrokers) {

proteus-client/src/main/java/io/netifi/proteus/rsocket/WeightedReconnectingRSocket.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ synchronized void resetStatistics() {
183183
errorPercentage.reset(1.0);
184184
}
185185

186-
private RSocketFactory.ClientRSocketFactory getClientFactory(String destination) {
186+
private RSocketFactory.ClientRSocketFactory getClientFactory(Payload setupPayload) {
187187
RSocketFactory.ClientRSocketFactory connect =
188188
RSocketFactory.connect().frameDecoder(Frame::retain);
189189

@@ -203,7 +203,7 @@ private RSocketFactory.ClientRSocketFactory getClientFactory(String destination)
203203
}
204204

205205
return connect
206-
.setupPayload(setupPayloadSupplier.apply(destination))
206+
.setupPayload(setupPayload)
207207
.keepAliveAckTimeout(Duration.ofSeconds(0))
208208
.keepAliveTickPeriod(Duration.ofSeconds(0));
209209
}
@@ -228,9 +228,10 @@ void connect() {
228228
WeightedClientTransportSupplier weighedClientTransportSupplier =
229229
transportSupplier.get();
230230
String destination = destinationNameFactory.get();
231+
Payload setupPayload = setupPayloadSupplier.apply(destination);
231232

232233
long start = System.nanoTime();
233-
return getClientFactory(destination)
234+
return getClientFactory(setupPayload)
234235
.errorConsumer(
235236
throwable ->
236237
logger.error(
@@ -276,7 +277,8 @@ void connect() {
276277
})
277278
.subscribe();
278279
setRSocket(rSocket);
279-
});
280+
})
281+
.doFinally(signalType -> setupPayload.release());
280282
}))
281283
.doOnError(t -> logger.error("error trying to broker", t))
282284
.retry()

proteus-client/src/test/java/io/netifi/proteus/ProteusBrokerServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public void setupPayloadLeakTest() {
1818
for (int i = 0; i < 100000; i++) {
1919
Payload payload =
2020
DefaultProteusBrokerService.getSetupPayload(alloc, "foo", "bar", 123L, token);
21+
payload.release();
2122
}
2223
Assert.assertEquals(0, directBuffersCount(alloc));
2324
Assert.assertEquals(0, heapBuffersCount(alloc));

0 commit comments

Comments
 (0)