Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit d99ff12

Browse files
committed
Merge branch 'hotfix/1.5.4'
2 parents fc80afc + 79b0e6c commit d99ff12

File tree

6 files changed

+18
-24
lines changed

6 files changed

+18
-24
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.netifi.proteus
2-
version=1.5.3
2+
version=1.5.4

proteus-client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
import io.netty.buffer.ByteBuf;
1818
import io.netty.buffer.ByteBufAllocator;
1919
import io.netty.buffer.Unpooled;
20-
import io.netty.util.ReferenceCountUtil;
2120
import io.opentracing.Tracer;
2221
import io.rsocket.Payload;
2322
import io.rsocket.RSocket;
2423
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
2524
import io.rsocket.transport.ClientTransport;
2625
import io.rsocket.util.ByteBufPayload;
27-
import io.rsocket.util.DefaultPayload;
2826
import java.net.InetSocketAddress;
2927
import java.net.SocketAddress;
3028
import java.time.Duration;
@@ -104,8 +102,8 @@ public DefaultProteusBrokerService(
104102
this.requestHandlingRSocket = new UnwrappingRSocket(requestHandlingRSocket);
105103
this.group = group;
106104
this.destinationNameFactory = destinationNameFactory;
107-
this.members = new ArrayList<>();
108-
this.suppliers = new ArrayList<>();
105+
this.members = Collections.synchronizedList(new ArrayList<>());
106+
this.suppliers = Collections.synchronizedList(new ArrayList<>());
109107
this.clientTransportFactory = clientTransportFactory;
110108
this.poolSize = poolSize;
111109
this.selectRefresh = poolSize / 2;
@@ -137,15 +135,10 @@ static Payload getSetupPayload(
137135
String group,
138136
long accessKey,
139137
ByteBuf accessToken) {
140-
ByteBuf metadata = null;
141-
try {
142-
metadata =
143-
DestinationSetupFlyweight.encode(
144-
alloc, computedFromDestination, group, accessKey, accessToken);
145-
return DefaultPayload.create(Unpooled.EMPTY_BUFFER, metadata);
146-
} finally {
147-
ReferenceCountUtil.safeRelease(metadata);
148-
}
138+
ByteBuf metadata =
139+
DestinationSetupFlyweight.encode(
140+
alloc, computedFromDestination, group, accessKey, accessToken);
141+
return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, metadata);
149142
}
150143

151144
private synchronized void reconcileSuppliers(Set<Broker> incomingBrokers) {

proteus-client/src/main/java/io/netifi/proteus/rsocket/NamedRSocketClientWrapper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.netty.buffer.ByteBufAllocator;
55
import io.rsocket.Payload;
66
import io.rsocket.RSocket;
7-
import io.rsocket.rpc.RSocketRpcService;
87
import io.rsocket.rpc.frames.Metadata;
98
import io.rsocket.util.ByteBufPayload;
109
import io.rsocket.util.RSocketProxy;
@@ -22,7 +21,7 @@ private NamedRSocketClientWrapper(String name, RSocket source) {
2221
}
2322

2423
/**
25-
* Wraps an RSocket with {@link RSocketProxy} and {@link RSocketRpcService}
24+
* Wraps an RSocket with {@link RSocketProxy} and RSocketRpcService
2625
*
2726
* @param name what you want your RSocket to be found as
2827
* @param source the raw socket to handle to wrap
@@ -63,6 +62,6 @@ private Payload wrap(Payload payload) {
6362
ByteBuf metadata =
6463
Metadata.encode(ByteBufAllocator.DEFAULT, name, name, payload.sliceMetadata());
6564

66-
return ByteBufPayload.create(payload.sliceMetadata().retain(), metadata);
65+
return ByteBufPayload.create(payload.sliceData().retain(), metadata);
6766
}
6867
}

proteus-client/src/main/java/io/netifi/proteus/rsocket/NamedRSocketServiceWrapper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public String getService() {
4848
}
4949

5050
@Override
51-
public final Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
52-
return reactor.core.publisher.Flux.error(
53-
new UnsupportedOperationException("Request-Channel not implemented."));
51+
public Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
52+
return super.requestChannel(publisher);
5453
}
5554
}

proteus-client/src/main/java/io/netifi/proteus/rsocket/WeightedReconnectingRSocket.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ synchronized void resetStatistics() {
183183
errorPercentage.reset(1.0);
184184
}
185185

186-
private RSocketFactory.ClientRSocketFactory getClientFactory(String destination) {
186+
private RSocketFactory.ClientRSocketFactory getClientFactory(Payload setupPayload) {
187187
RSocketFactory.ClientRSocketFactory connect =
188188
RSocketFactory.connect().frameDecoder(Frame::retain);
189189

@@ -203,7 +203,7 @@ private RSocketFactory.ClientRSocketFactory getClientFactory(String destination)
203203
}
204204

205205
return connect
206-
.setupPayload(setupPayloadSupplier.apply(destination))
206+
.setupPayload(setupPayload)
207207
.keepAliveAckTimeout(Duration.ofSeconds(0))
208208
.keepAliveTickPeriod(Duration.ofSeconds(0));
209209
}
@@ -228,9 +228,10 @@ void connect() {
228228
WeightedClientTransportSupplier weighedClientTransportSupplier =
229229
transportSupplier.get();
230230
String destination = destinationNameFactory.get();
231+
Payload setupPayload = setupPayloadSupplier.apply(destination);
231232

232233
long start = System.nanoTime();
233-
return getClientFactory(destination)
234+
return getClientFactory(setupPayload)
234235
.errorConsumer(
235236
throwable ->
236237
logger.error(
@@ -276,7 +277,8 @@ void connect() {
276277
})
277278
.subscribe();
278279
setRSocket(rSocket);
279-
});
280+
})
281+
.doFinally(signalType -> setupPayload.release());
280282
}))
281283
.doOnError(t -> logger.error("error trying to broker", t))
282284
.retry()

proteus-client/src/test/java/io/netifi/proteus/ProteusBrokerServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public void setupPayloadLeakTest() {
1818
for (int i = 0; i < 100000; i++) {
1919
Payload payload =
2020
DefaultProteusBrokerService.getSetupPayload(alloc, "foo", "bar", 123L, token);
21+
payload.release();
2122
}
2223
Assert.assertEquals(0, directBuffersCount(alloc));
2324
Assert.assertEquals(0, heapBuffersCount(alloc));

0 commit comments

Comments
 (0)