Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit c8e3bdf

Browse files
authored
Merge pull request #24 from netifi-proteus/hotfix/1.5.2
Hotfix/1.5.2
2 parents 96cea91 + 64bb5dc commit c8e3bdf

File tree

3 files changed

+90
-5
lines changed

3 files changed

+90
-5
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.netifi.proteus
2-
version=1.5.1
2+
version=1.5.2

proteus-client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.netty.buffer.ByteBuf;
1717
import io.netty.buffer.ByteBufAllocator;
1818
import io.netty.buffer.Unpooled;
19+
import io.netty.util.ReferenceCountUtil;
1920
import io.opentracing.Tracer;
2021
import io.rsocket.Payload;
2122
import io.rsocket.RSocket;
@@ -24,6 +25,7 @@
2425
import io.rsocket.rpc.stats.Quantile;
2526
import io.rsocket.transport.ClientTransport;
2627
import io.rsocket.util.ByteBufPayload;
28+
import io.rsocket.util.DefaultPayload;
2729
import java.net.InetSocketAddress;
2830
import java.net.SocketAddress;
2931
import java.time.Duration;
@@ -264,10 +266,25 @@ WeightedReconnectingRSocket createWeightedReconnectingRSocket() {
264266
}
265267

266268
Payload getSetupPayload(String computedFromDestination) {
267-
ByteBuf metadata =
268-
DestinationSetupFlyweight.encode(
269-
ByteBufAllocator.DEFAULT, computedFromDestination, group, accessKey, accessToken);
270-
return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, metadata);
269+
return getSetupPayload(
270+
ByteBufAllocator.DEFAULT, computedFromDestination, group, accessKey, accessToken);
271+
}
272+
273+
static Payload getSetupPayload(
274+
ByteBufAllocator alloc,
275+
String computedFromDestination,
276+
String group,
277+
long accessKey,
278+
ByteBuf accessToken) {
279+
ByteBuf metadata = null;
280+
try {
281+
metadata =
282+
DestinationSetupFlyweight.encode(
283+
alloc, computedFromDestination, group, accessKey, accessToken);
284+
return DefaultPayload.create(Unpooled.EMPTY_BUFFER, metadata);
285+
} finally {
286+
ReferenceCountUtil.safeRelease(metadata);
287+
}
271288
}
272289

273290
private ProteusSocket unwrappedDestination(String destination, String group) {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.netifi.proteus;
2+
3+
import io.netifi.proteus.frames.DestinationSetupFlyweight;
4+
import io.netty.buffer.*;
5+
import io.rsocket.Payload;
6+
import java.nio.charset.Charset;
7+
import java.util.Collection;
8+
import java.util.function.Function;
9+
import org.junit.*;
10+
11+
public class ProteusBrokerServiceTest {
12+
13+
@Test
14+
public void setupPayloadLeakTest() {
15+
ByteBuf token = Unpooled.wrappedBuffer("token".getBytes(Charset.defaultCharset()));
16+
PooledByteBufAllocator alloc = nonCachingAllocator();
17+
18+
for (int i = 0; i < 100000; i++) {
19+
Payload payload =
20+
DefaultProteusBrokerService.getSetupPayload(alloc, "foo", "bar", 123L, token);
21+
}
22+
Assert.assertEquals(0, directBuffersCount(alloc));
23+
Assert.assertEquals(0, heapBuffersCount(alloc));
24+
}
25+
26+
@Test
27+
public void setupDecodeTest() {
28+
String expectedToken = "token";
29+
String expectedDest = "foo";
30+
String expectedGroup = "bar";
31+
long expectedKey = 123L;
32+
33+
ByteBuf token = Unpooled.wrappedBuffer(expectedToken.getBytes(Charset.defaultCharset()));
34+
35+
Payload payload =
36+
DefaultProteusBrokerService.getSetupPayload(
37+
ByteBufAllocator.DEFAULT, expectedDest, expectedGroup, expectedKey, token);
38+
ByteBuf metadata = payload.sliceMetadata();
39+
String actualDest = DestinationSetupFlyweight.destination(metadata);
40+
String actualGroup = DestinationSetupFlyweight.group(metadata);
41+
long actualAccessKey = DestinationSetupFlyweight.accessKey(metadata);
42+
String actualAccessToken =
43+
DestinationSetupFlyweight.accessToken(metadata).toString(Charset.defaultCharset());
44+
45+
Assert.assertEquals(expectedToken, actualAccessToken);
46+
Assert.assertEquals(expectedDest, actualDest);
47+
Assert.assertEquals(expectedGroup, actualGroup);
48+
Assert.assertEquals(expectedKey, actualAccessKey);
49+
}
50+
51+
private static PooledByteBufAllocator nonCachingAllocator() {
52+
return new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
53+
}
54+
55+
private static long directBuffersCount(PooledByteBufAllocator alloc) {
56+
return count(alloc, PooledByteBufAllocator::directArenas);
57+
}
58+
59+
private static long heapBuffersCount(PooledByteBufAllocator alloc) {
60+
return count(alloc, PooledByteBufAllocator::heapArenas);
61+
}
62+
63+
private static long count(
64+
PooledByteBufAllocator alloc,
65+
Function<PooledByteBufAllocator, Collection<PoolArenaMetric>> f) {
66+
return f.apply(alloc).stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
67+
}
68+
}

0 commit comments

Comments
 (0)