Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit c678c06

Browse files
committed
Ensure payloads are released in DefaultNetifiSocket
1 parent cd4cc25 commit c678c06

File tree

2 files changed

+145
-173
lines changed

2 files changed

+145
-173
lines changed

client/src/main/java/io/netifi/proteus/rs/DefaultNetifiSocket.java

Lines changed: 144 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -69,225 +69,197 @@ public ByteBuf getRoute() {
6969
@Override
7070
public Mono<Void> fireAndForget(Payload payload) {
7171
try {
72-
ByteBuf metadataToWrap = payload.sliceMetadata();
73-
ByteBuf data = payload.sliceData();
7472
ByteBuf route = getRoute();
75-
76-
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
73+
ByteBuf data = payload.sliceData();
74+
ByteBuf metadataToWrap = payload.sliceMetadata();
7775

7876
SecureRSocket secureRSocket = rSocketSupplier.get();
79-
return secureRSocket
80-
.getCurrentSessionCounter()
81-
.flatMap(
82-
counter -> {
83-
long count = counter.incrementAndGet();
84-
85-
return secureRSocket
86-
.getCurrentSessionToken()
87-
.flatMap(
88-
key -> {
89-
byte[] currentRequestToken =
90-
sessionUtil.generateSessionToken(key, data, count);
91-
int requestToken =
92-
sessionUtil.generateRequestToken(currentRequestToken, data, count);
93-
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
94-
RoutingFlyweight.encode(
95-
metadata,
96-
true,
97-
requestToken,
98-
accessKey,
99-
fromDestination,
100-
generator.nextId(),
101-
route,
102-
metadataToWrap);
103-
104-
return secureRSocket.fireAndForget(
105-
ByteBufPayload.create(payload.sliceData(), metadata));
106-
});
107-
});
108-
77+
return secureRSocket.getCurrentSessionCounter()
78+
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
79+
long count = counter.incrementAndGet();
80+
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
81+
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
82+
})
83+
.flatMap(requestToken -> {
84+
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
85+
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
86+
RoutingFlyweight.encode(
87+
metadata,
88+
true,
89+
requestToken,
90+
accessKey,
91+
fromDestination,
92+
generator.nextId(),
93+
route,
94+
metadataToWrap);
95+
96+
Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
97+
payload.release();
98+
99+
return secureRSocket.fireAndForget(wrappedPayload);
100+
});
109101
} catch (Throwable t) {
102+
payload.release();
110103
return Mono.error(t);
111104
}
112105
}
113106

114107
@Override
115108
public Mono<Payload> requestResponse(Payload payload) {
116109
try {
117-
ByteBuf metadataToWrap = payload.sliceMetadata();
118110
ByteBuf route = getRoute();
119111
ByteBuf data = payload.sliceData();
120-
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
112+
ByteBuf metadataToWrap = payload.sliceMetadata();
121113

122114
SecureRSocket secureRSocket = rSocketSupplier.get();
123-
return secureRSocket
124-
.getCurrentSessionCounter()
125-
.flatMap(
126-
counter -> {
127-
long count = counter.incrementAndGet();
128-
129-
return secureRSocket
130-
.getCurrentSessionToken()
131-
.flatMap(
132-
key -> {
133-
byte[] currentRequestToken =
134-
sessionUtil.generateSessionToken(key, data, count);
135-
int requestToken =
136-
sessionUtil.generateRequestToken(currentRequestToken, data, count);
137-
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
138-
RoutingFlyweight.encode(
139-
metadata,
140-
true,
141-
requestToken,
142-
accessKey,
143-
fromDestination,
144-
generator.nextId(),
145-
route,
146-
metadataToWrap);
147-
148-
return secureRSocket.requestResponse(
149-
ByteBufPayload.create(payload.sliceData(), metadata));
150-
});
151-
});
115+
return secureRSocket.getCurrentSessionCounter()
116+
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
117+
long count = counter.incrementAndGet();
118+
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
119+
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
120+
})
121+
.flatMap(requestToken -> {
122+
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
123+
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
124+
RoutingFlyweight.encode(
125+
metadata,
126+
true,
127+
requestToken,
128+
accessKey,
129+
fromDestination,
130+
generator.nextId(),
131+
route,
132+
metadataToWrap);
133+
134+
Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
135+
payload.release();
136+
137+
return secureRSocket.requestResponse(wrappedPayload);
138+
});
152139
} catch (Throwable t) {
140+
payload.release();
153141
return Mono.error(t);
154142
}
155143
}
156144

157145
@Override
158146
public Flux<Payload> requestStream(Payload payload) {
159147
try {
160-
ByteBuf metadataToWrap = payload.sliceMetadata();
161148
ByteBuf route = getRoute();
162149
ByteBuf data = payload.sliceData();
163-
164-
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
150+
ByteBuf metadataToWrap = payload.sliceMetadata();
165151

166152
SecureRSocket secureRSocket = rSocketSupplier.get();
167-
return secureRSocket
168-
.getCurrentSessionCounter()
169-
.flatMapMany(
170-
counter -> {
171-
long count = counter.incrementAndGet();
172-
173-
return secureRSocket
174-
.getCurrentSessionToken()
175-
.flatMapMany(
176-
key -> {
177-
byte[] currentRequestToken =
178-
sessionUtil.generateSessionToken(key, data, count);
179-
int requestToken =
180-
sessionUtil.generateRequestToken(currentRequestToken, data, count);
181-
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
182-
RoutingFlyweight.encode(
183-
metadata,
184-
true,
185-
requestToken,
186-
accessKey,
187-
fromDestination,
188-
generator.nextId(),
189-
route,
190-
metadataToWrap);
191-
192-
return secureRSocket.requestStream(
193-
ByteBufPayload.create(payload.sliceData(), metadata));
194-
});
195-
});
196-
153+
return secureRSocket.getCurrentSessionCounter()
154+
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
155+
long count = counter.incrementAndGet();
156+
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
157+
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
158+
})
159+
.flatMapMany(requestToken -> {
160+
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
161+
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
162+
RoutingFlyweight.encode(
163+
metadata,
164+
true,
165+
requestToken,
166+
accessKey,
167+
fromDestination,
168+
generator.nextId(),
169+
route,
170+
metadataToWrap);
171+
172+
Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
173+
payload.release();
174+
175+
return secureRSocket.requestStream(wrappedPayload);
176+
});
197177
} catch (Throwable t) {
178+
payload.release();
198179
return Flux.error(t);
199180
}
200181
}
201182

202183
@Override
203184
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
204-
SecureRSocket secureRSocket = rSocketSupplier.get();
205185
ByteBuf route = getRoute();
206-
Flux<Payload> payloadFlux =
186+
187+
SecureRSocket secureRSocket = rSocketSupplier.get();
188+
Flux<Payload> wrappedPayloads =
207189
Flux.from(payloads)
208-
.flatMap(
209-
payload -> {
210-
ByteBuf data = payload.sliceData();
211-
ByteBuf metadataToWrap = payload.sliceMetadata();
212-
int length =
213-
RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
214-
215-
return secureRSocket
216-
.getCurrentSessionCounter()
217-
.flatMapMany(
218-
counter -> {
219-
long count = counter.incrementAndGet();
220-
221-
return secureRSocket
222-
.getCurrentSessionToken()
223-
.map(
224-
key -> {
225-
byte[] currentRequestToken =
226-
sessionUtil.generateSessionToken(key, data, count);
227-
int requestToken =
228-
sessionUtil.generateRequestToken(
229-
currentRequestToken, data, count);
230-
ByteBuf metadata =
231-
ByteBufAllocator.DEFAULT.directBuffer(length);
232-
RoutingFlyweight.encode(
233-
metadata,
234-
true,
235-
requestToken,
236-
accessKey,
237-
fromDestination,
238-
generator.nextId(),
239-
route,
240-
metadataToWrap);
241-
242-
return ByteBufPayload.create(payload.sliceData(), metadata);
243-
});
244-
});
245-
});
246-
247-
return secureRSocket.requestChannel(payloadFlux);
190+
.concatMap(payload -> {
191+
try {
192+
ByteBuf data = payload.sliceData();
193+
ByteBuf metadataToWrap = payload.sliceMetadata();
194+
195+
return secureRSocket.getCurrentSessionCounter()
196+
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
197+
long count = counter.incrementAndGet();
198+
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
199+
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
200+
})
201+
.map(requestToken -> {
202+
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
203+
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
204+
RoutingFlyweight.encode(
205+
metadata,
206+
true,
207+
requestToken,
208+
accessKey,
209+
fromDestination,
210+
generator.nextId(),
211+
route,
212+
metadataToWrap);
213+
214+
Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
215+
payload.release();
216+
217+
return wrappedPayload;
218+
});
219+
} catch (Throwable t) {
220+
payload.release();
221+
return Flux.error(t);
222+
}
223+
});
224+
225+
return secureRSocket.requestChannel(wrappedPayloads);
248226
}
249227

250228
@Override
251229
public Mono<Void> metadataPush(Payload payload) {
252230
try {
253231
ByteBuf route = getRoute();
254-
ByteBuf unwrappedMetadata = payload.sliceMetadata();
255232
ByteBuf data = payload.sliceData();
233+
ByteBuf metadataToWrap = payload.sliceMetadata();
256234

257-
int length = RoutingFlyweight.computeLength(true, fromDestination, route);
258235
SecureRSocket secureRSocket = rSocketSupplier.get();
259-
260-
return secureRSocket
261-
.getCurrentSessionCounter()
262-
.flatMap(
263-
counter -> {
264-
long count = counter.incrementAndGet();
265-
266-
return secureRSocket
267-
.getCurrentSessionToken()
268-
.flatMap(
269-
key -> {
270-
byte[] currentRequestToken =
271-
sessionUtil.generateSessionToken(key, data, count);
272-
int requestToken =
273-
sessionUtil.generateRequestToken(currentRequestToken, data, count);
274-
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
275-
RoutingFlyweight.encode(
276-
metadata,
277-
true,
278-
requestToken,
279-
accessKey,
280-
fromDestination,
281-
generator.nextId(),
282-
route,
283-
unwrappedMetadata);
284-
285-
return secureRSocket.metadataPush(
286-
ByteBufPayload.create(payload.sliceData(), metadata));
287-
});
288-
});
236+
return secureRSocket.getCurrentSessionCounter()
237+
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
238+
long count = counter.incrementAndGet();
239+
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
240+
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
241+
})
242+
.flatMap(requestToken -> {
243+
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
244+
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
245+
RoutingFlyweight.encode(
246+
metadata,
247+
true,
248+
requestToken,
249+
accessKey,
250+
fromDestination,
251+
generator.nextId(),
252+
route,
253+
metadataToWrap);
254+
255+
Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
256+
payload.release();
257+
258+
return secureRSocket.metadataPush(wrappedPayload);
259+
});
289260

290261
} catch (Throwable t) {
262+
payload.release();
291263
return Mono.error(t);
292264
}
293265
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.netifi.proteus
2-
version=0.6.2
2+
version=0.6.3

0 commit comments

Comments
 (0)