Skip to content

Commit 117d03a

Browse files
authored
RATIS-2197. Clean remote stream to resolve direct memory leak (#1179)
1 parent accb612 commit 117d03a

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataS
153153

154154
private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, Iterable<WriteOption> options) {
155155
if (isClosed()) {
156+
if (data instanceof ByteBuf) {
157+
((ByteBuf) data).release();
158+
}
156159
return JavaUtils.completeExceptionally(new AlreadyClosedException(
157160
clientId + ": stream already closed, request=" + header));
158161
}

ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ private Set<RaftPeer> getSuccessors(RaftPeerId peerId) {
211211

212212
return Collections.emptySet();
213213
}
214+
215+
void cleanUp(ClientInvocationId invocationId) {
216+
getDivision().getDataStreamMap().remove(invocationId);
217+
getLocal().cleanUp();
218+
applyToRemotes(remote -> remote.out.closeAsync());
219+
}
214220
}
215221

216222
private final RaftServer server;
@@ -301,6 +307,9 @@ static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
301307
final DataChannel channel = stream.getDataChannel();
302308
long byteWritten = 0;
303309
for (ByteBuffer buffer : buf.nioBuffers()) {
310+
if (buffer.remaining() == 0) {
311+
continue;
312+
}
304313
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(
305314
buffer, buf::retain, ignored -> buf.release());
306315
try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
@@ -389,9 +398,7 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu
389398

390399
void cleanUp(Set<ClientInvocationId> ids) {
391400
for (ClientInvocationId clientInvocationId : ids) {
392-
Optional.ofNullable(streams.remove(clientInvocationId))
393-
.map(StreamInfo::getLocal)
394-
.ifPresent(LocalStream::cleanUp);
401+
removeDataStream(clientInvocationId);
395402
}
396403
}
397404

@@ -411,19 +418,16 @@ void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
411418
readImpl(request, ctx, getStreams);
412419
} catch (Throwable t) {
413420
replyDataStreamException(t, request, ctx);
414-
removeDataStream(ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()), null);
421+
removeDataStream(ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()));
415422
}
416423
}
417424

418-
private void removeDataStream(ClientInvocationId invocationId, StreamInfo info) {
425+
private StreamInfo removeDataStream(ClientInvocationId invocationId) {
419426
final StreamInfo removed = streams.remove(invocationId);
420-
if (info == null) {
421-
info = removed;
422-
}
423-
if (info != null) {
424-
info.getDivision().getDataStreamMap().remove(invocationId);
425-
info.getLocal().cleanUp();
427+
if (removed != null) {
428+
removed.cleanUp(invocationId);
426429
}
430+
return removed;
427431
}
428432

429433
private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
@@ -479,7 +483,14 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
479483
try {
480484
if (exception != null) {
481485
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
482-
removeDataStream(key, info);
486+
final StreamInfo removed = removeDataStream(key);
487+
if (removed != null) {
488+
Preconditions.assertSame(info, removed, "removed");
489+
} else {
490+
info.cleanUp(key);
491+
}
492+
} else if (close) {
493+
info.applyToRemotes(remote -> remote.out.closeAsync());
483494
}
484495
} finally {
485496
request.release();

0 commit comments

Comments
 (0)