Skip to content

Commit 93507f0

Browse files
authored
Fix ByteBuf memory leak in PutOperation when operations are aborted (#3176)
* Fix ByteBuf memory leak in PutOperation when operations are aborted When a PutOperation is aborted or fails before all data is processed by the ChunkFiller thread, channelReadBuf may still hold a reference to a ByteBuf that was read from the channel but not yet consumed. This causes a memory leak as the buffer is never released. Changes: - Add channelReadBuf.release() in cleanupChunks() to ensure the buffer is properly released when the operation completes or fails. - Remove synchronized modifier from PutChunk.fillFrom() as it's not needed with the ChunkFiller single threaded model - Add test case testPutOperationByteBufLeakOnAbort() to verify ByteBuf resources are properly released when operations are aborted mid-flight The fix ensures that even if the ChunkFiller thread hasn't processed all data from the channel when an operation completes/fails, the ByteBuf holding unprocessed data is properly released, preventing memory leaks. * Go one step further and prevent use-after-free race condition After even more extensive review, we identified that not only is there a case where the read buf is leaked, it's also possible for the read buf to be used after free in rare cases where the memory is released by the network and GCed before it is retained by the ChunkFiller thread. Some of the memory leak in the previous commit is masking some of the use-after-free issues fixed in this commit. Given how intertwined they are, it wouldn't be safe to merge these changes separately. * Protect the operations in fillChunks from router errors.
1 parent 691eb00 commit 93507f0

File tree

4 files changed

+302
-17
lines changed

4 files changed

+302
-17
lines changed

ambry-router/src/main/java/com/github/ambry/router/PutManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import com.github.ambry.account.Account;
1717
import com.github.ambry.account.AccountService;
18-
import com.github.ambry.account.Container;
1918
import com.github.ambry.clustermap.ClusterMap;
2019
import com.github.ambry.clustermap.ClusterMapUtils;
2120
import com.github.ambry.commons.ByteBufferAsyncWritableChannel;

ambry-router/src/main/java/com/github/ambry/router/PutOperation.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import io.netty.buffer.Unpooled;
5858
import io.netty.util.ReferenceCountUtil;
5959
import java.nio.ByteBuffer;
60+
import java.nio.channels.ClosedChannelException;
6061
import java.security.GeneralSecurityException;
6162
import java.util.ArrayList;
6263
import java.util.Collections;
@@ -574,10 +575,23 @@ void setOperationCompleted() {
574575

575576
/**
576577
* Clean up the chunks to release any data buffer. This should be invoked when terminating the operation with
577-
* an exception.
578+
* an exception. This method also closes the chunkFillerChannel to fire any pending callbacks, ensuring the original
579+
* buffer from the ReadableStreamChannel is properly released. Synchronized for memory visibility on channelReadBuf.
580+
* The contract upheld by PutManager is that this method is called AT-MOST-ONCE.
578581
*/
579-
public void cleanupChunks() {
580-
releaseDataForAllChunks();
582+
public synchronized void cleanupChunks() {
583+
try {
584+
releaseDataForAllChunks();
585+
} finally {
586+
// Release the extra reference we retained when storing in channelReadBuf.
587+
if (channelReadBuf != null) {
588+
channelReadBuf.release();
589+
channelReadBuf = null;
590+
}
591+
// Close the chunkFillerChannel to fire any remaining callbacks in chunksAwaitingResolution.
592+
// This ensures the original buffer (owned by the callback) is released and not leaked.
593+
chunkFillerChannel.close();
594+
}
581595
}
582596

583597
/**
@@ -676,13 +690,18 @@ boolean isChunkFillingDone() {
676690
* chunkFillerChannel, if there is any.
677691
* @throws InterruptedException if the call to get a chunk from the chunkFillerChannel is interrupted.
678692
*/
679-
void fillChunks() {
693+
synchronized void fillChunks() {
680694
try {
681695
PutChunk chunkToFill;
682696
while (!isChunkFillingDone()) {
683697
// Attempt to fill a chunk
684698
if (channelReadBuf == null) {
685699
channelReadBuf = chunkFillerChannel.getNextByteBuf(0);
700+
if (channelReadBuf != null) {
701+
// Retain the buffer to protect against the channel callback releasing it
702+
// while we still hold a reference we're processing.
703+
channelReadBuf.retain();
704+
}
686705
}
687706
if (channelReadBuf != null) {
688707
if (channelReadBuf.readableBytes() > 0 && isChunkAwaitingResolution()) {
@@ -707,8 +726,13 @@ void fillChunks() {
707726
routerCallback.onPollReady();
708727
}
709728
if (!channelReadBuf.isReadable()) {
710-
chunkFillerChannel.resolveOldestChunk(null);
711-
channelReadBuf = null;
729+
try {
730+
chunkFillerChannel.resolveOldestChunk(null);
731+
} finally {
732+
// Release the reference we retained when storing getNextByteBuf, even if resolveOldestChunk throws.
733+
channelReadBuf.release();
734+
channelReadBuf = null;
735+
}
712736
}
713737
}
714738
} else {
@@ -1096,16 +1120,19 @@ boolean isStitchOperation() {
10961120
}
10971121

10981122
/**
1099-
* Set the exception associated with this operation.
1100-
* First, if current operationException is null, directly set operationException as exception;
1101-
* Second, if operationException exists, compare ErrorCodes of exception and existing operation Exception depending
1102-
* on precedence level. An ErrorCode with a smaller precedence level overrides an ErrorCode with a larger precedence
1103-
* level. Update the operationException if necessary.
1104-
* @param exception the {@link RouterException} to possibly set.
1123+
* Set the exception associated with this operation and mark it complete.
1124+
* For {@link RouterException}: uses precedence-based replacement where lower precedence
1125+
* levels override higher ones.
1126+
* For {@link java.nio.channels.ClosedChannelException}: only set if no other exception has
1127+
* been set to avoid overwriting meaningful errors.
1128+
* For all others simply set the exception as we don't know what they are or how to classify them.
1129+
* @param exception the {@link Exception} to possibly set.
11051130
*/
11061131
void setOperationExceptionAndComplete(Exception exception) {
11071132
if (exception instanceof RouterException) {
11081133
RouterUtils.replaceOperationException(operationException, (RouterException) exception, this::getPrecedenceLevel);
1134+
} else if (exception instanceof ClosedChannelException) {
1135+
operationException.compareAndSet(null, exception);
11091136
} else {
11101137
operationException.set(exception);
11111138
}
@@ -1642,7 +1669,7 @@ void onFillComplete(boolean updateMetric) {
16421669
* @param channelReadBuf the {@link ByteBuf} from which to read data.
16431670
* @return the number of bytes transferred in this operation.
16441671
*/
1645-
synchronized int fillFrom(ByteBuf channelReadBuf) {
1672+
int fillFrom(ByteBuf channelReadBuf) {
16461673
int toWrite;
16471674
ByteBuf slice;
16481675
if (buf == null) {

ambry-router/src/test/java/com/github/ambry/router/NonBlockingRouterTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.github.ambry.router;
1515

1616
import com.codahale.metrics.MetricRegistry;
17+
import com.github.ambry.utils.NettyByteBufLeakHelper;
1718
import com.github.ambry.account.Account;
1819
import com.github.ambry.account.Container;
1920
import com.github.ambry.clustermap.DataNodeId;
@@ -22,6 +23,7 @@
2223
import com.github.ambry.clustermap.PartitionId;
2324
import com.github.ambry.clustermap.ReplicaId;
2425
import com.github.ambry.commons.BlobId;
26+
import com.github.ambry.commons.ByteBufReadableStreamChannel;
2527
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
2628
import com.github.ambry.commons.Callback;
2729
import com.github.ambry.commons.LoggingNotificationSystem;
@@ -37,7 +39,6 @@
3739
import com.github.ambry.frontend.Operations;
3840
import com.github.ambry.messageformat.BlobProperties;
3941
import com.github.ambry.messageformat.MessageFormatRecord;
40-
import com.github.ambry.named.NamedBlobRecord;
4142
import com.github.ambry.network.NetworkClient;
4243
import com.github.ambry.network.NetworkClientErrorCode;
4344
import com.github.ambry.network.NetworkClientFactory;
@@ -97,6 +98,8 @@
9798
import java.util.stream.Collectors;
9899
import java.util.stream.LongStream;
99100
import javax.sql.DataSource;
101+
import io.netty.buffer.ByteBuf;
102+
import io.netty.buffer.PooledByteBufAllocator;
100103
import org.json.JSONObject;
101104
import org.junit.AfterClass;
102105
import org.junit.Assert;
@@ -4550,4 +4553,52 @@ static void verifyRepairRequestRecordInDb(MysqlRepairRequestsDb db, BlobId blobI
45504553
assertEquals(expectedRecord.getExpirationTimeMs(), record.getExpirationTimeMs());
45514554
}
45524555
}
4556+
4557+
/**
4558+
* Test for bytebuf memory leaks in PutOperation when operations are aborted in the middle of a put operation.
4559+
* This test verifies that PutOperation properly releases bytebuf when the operation completes/fails, even if
4560+
* the ChunkFiller thread hasn't processed some data yet.
4561+
*/
4562+
@Test
4563+
public void testPutOperationByteBufLeakOnAbort() throws Exception {
4564+
NettyByteBufLeakHelper testLeakHelper = new NettyByteBufLeakHelper();
4565+
testLeakHelper.beforeTest();
4566+
4567+
Properties props = getNonBlockingRouterProperties(localDcName);
4568+
int chunkSize = 512;
4569+
props.setProperty("router.max.put.chunk.size.bytes", Integer.toString(chunkSize));
4570+
setRouter(props, mockServerLayout, new LoggingNotificationSystem());
4571+
4572+
// Configure servers to succeed for first few chunks, then fail
4573+
List<ServerErrorCode> serverErrorList = new ArrayList<>();
4574+
serverErrorList.add(ServerErrorCode.NoError);
4575+
serverErrorList.add(ServerErrorCode.NoError);
4576+
for (int i = 0; i < 100; i++) {
4577+
serverErrorList.add(ServerErrorCode.PartitionReadOnly);
4578+
}
4579+
mockServerLayout.getMockServers().forEach(server -> server.setServerErrors(serverErrorList));
4580+
4581+
// The first two will run normally, but 3+ will get ServerErrorCode.PartitionReadOnly
4582+
int blobSize = 100 * chunkSize;
4583+
byte[] blobData = new byte[blobSize];
4584+
ThreadLocalRandom.current().nextBytes(blobData);
4585+
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(blobSize);
4586+
pooledBuf.writeBytes(blobData);
4587+
ByteBufReadableStreamChannel channel = new ByteBufReadableStreamChannel(pooledBuf);
4588+
4589+
BlobProperties blobProperties = new BlobProperties(blobSize, "serviceId", "ownerId", "contentType",
4590+
false, Utils.Infinite_Time, Utils.getRandomShort(ThreadLocalRandom.current()),
4591+
Utils.getRandomShort(ThreadLocalRandom.current()), false, null, null, null);
4592+
4593+
try {
4594+
router.putBlob(blobProperties, new byte[10], channel, PutBlobOptions.DEFAULT).get();
4595+
} catch (ExecutionException e) {
4596+
// Expected for operations that hit error responses
4597+
}
4598+
// If there are leaks, it will be detected in NettyByteBufLeakHelper and fail the test.
4599+
// Should be called before router close as closing of the router shouldn't be required to prevent leaks.
4600+
testLeakHelper.afterTest();
4601+
router.close();
4602+
router = null;
4603+
}
45534604
}

0 commit comments

Comments
 (0)