Skip to content

Commit b4d01bd

Browse files
SophieGuo410Sophie Guo
andauthored
Fix memory leak for batch delete (#3161)
* Fix memory leak for batch delete * Address comments --------- Co-authored-by: Sophie Guo <sopguo@sopguo-mn2.linkedin.biz>
1 parent 7b5f74b commit b4d01bd

File tree

3 files changed

+105
-3
lines changed

3 files changed

+105
-3
lines changed

ambry-commons/src/main/java/com/github/ambry/commons/RetainingAsyncWritableChannel.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,26 @@ public ByteBuf consumeContentAsByteBuf() {
170170
}
171171
}
172172

173+
/**
174+
* Consume the collected content buffer and return its contents as a byte array.
175+
*
176+
* @return a byte array containing all data received up to this point.
177+
* @throws IllegalStateException if the content has already been consumed or the channel was closed.
178+
*/
179+
public byte[] consumeContentAsBytes() {
180+
ByteBuf buf = null;
181+
try {
182+
buf = consumeContentAsByteBuf();
183+
byte[] data = new byte[buf.readableBytes()];
184+
buf.readBytes(data);
185+
return data;
186+
} finally {
187+
if (buf != null) {
188+
buf.release();
189+
}
190+
}
191+
}
192+
173193
/**
174194
* @return an {@link InputStream} that contains the data from the chunks received up to this point in time.
175195
*/

ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BatchDeleteHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ private Callback<Long> parseRequestBodyAndDeleteCallback(RetainingAsyncWritableC
188188
public S3MessagePayload.S3BatchDeleteObjects deserializeRequest(RetainingAsyncWritableChannel channel)
189189
throws RestServiceException {
190190
try {
191-
ByteBuf byteBuffer = channel.consumeContentAsByteBuf();
192-
byte[] byteArray = new byte[byteBuffer.readableBytes()];
193-
byteBuffer.readBytes(byteArray);
191+
byte[] byteArray = channel.consumeContentAsBytes();
194192
return new XmlMapper().readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class);
195193
} catch (Exception e) {
196194
logger.trace("s3batchdelete failed to deserialize request");

ambry-frontend/src/test/java/com/github/ambry/frontend/S3BatchDeleteHandlerTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
*/
1515
package com.github.ambry.frontend;
1616
import com.github.ambry.commons.InMemNamedBlobDbFactory;
17+
import com.github.ambry.commons.RetainingAsyncWritableChannel;
18+
import io.netty.buffer.ByteBuf;
19+
import io.netty.buffer.Unpooled;
1720
import java.nio.charset.StandardCharsets;
1821

1922
import com.codahale.metrics.MetricRegistry;
@@ -42,13 +45,16 @@
4245
import com.github.ambry.router.FutureResult;
4346
import com.github.ambry.router.InMemoryRouter;
4447
import com.github.ambry.router.ReadableStreamChannel;
48+
import com.github.ambry.utils.NettyByteBufLeakHelper;
4549
import com.github.ambry.utils.TestUtils;
4650
import java.nio.ByteBuffer;
4751
import java.util.Arrays;
4852
import java.util.Collections;
4953
import java.util.LinkedList;
5054
import java.util.Properties;
5155
import org.json.JSONObject;
56+
import org.junit.After;
57+
import org.junit.Before;
5258
import org.junit.Test;
5359

5460
import static com.github.ambry.frontend.s3.S3Constants.*;
@@ -67,6 +73,8 @@ public class S3BatchDeleteHandlerTest {
6773
private FrontendConfig frontendConfig;
6874
private NamedBlobPutHandler namedBlobPutHandler;
6975
private S3BatchDeleteHandler s3BatchDeleteHandler;
76+
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();
77+
7078

7179
public S3BatchDeleteHandlerTest() throws Exception {
7280
account = ACCOUNT_SERVICE.createAndAddRandomAccount();
@@ -83,6 +91,16 @@ public S3BatchDeleteHandlerTest() throws Exception {
8391
performPutOperation(KEY_NAME_3, KEY_NAME, container, account);
8492
}
8593

94+
@Before
95+
public void before() {
96+
nettyByteBufLeakHelper.beforeTest();
97+
}
98+
99+
@After
100+
public void after() {
101+
nettyByteBufLeakHelper.afterTest();
102+
}
103+
86104
@Test
87105
public void deleteObjectTest() throws Exception {
88106
String uri = String.format("/s3/%s/%s", account.getName(), container.getName());
@@ -123,6 +141,72 @@ public void deleteObjectTest() throws Exception {
123141
assertEquals("Mismatch on status", ResponseStatus.Ok, restResponseChannel.getStatus());
124142
}
125143

144+
@Test
145+
public void testConsumeContentAsBytesReleasesBuffer() {
146+
// Arrange: Create a ByteBuf with some data
147+
ByteBuf testBuf = Unpooled.buffer();
148+
String validXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
149+
"<Delete xmlns=\"http://s3.amazonaws.com/doc/2006-03-01\">" +
150+
"<Object>" +
151+
"<Key>key-success</Key>" +
152+
"</Object>" +
153+
"<Object>" +
154+
"<Key>key-error</Key>" +
155+
"</Object>" +
156+
"<Object>" +
157+
"<Key>key-error2</Key>" +
158+
"</Object>" +
159+
"<Object>" +
160+
"<Key>key-success-2</Key>" +
161+
"</Object>" +
162+
"</Delete>";
163+
byte[] xmlBytes = validXml.getBytes();
164+
testBuf.writeBytes(xmlBytes);
165+
// Wrap the consumeContentAsByteBuf method to return the test buffer
166+
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel() {
167+
@Override
168+
public ByteBuf consumeContentAsByteBuf() {
169+
return testBuf;
170+
}
171+
};
172+
173+
// Act: Call the method
174+
byte[] result = channel.consumeContentAsBytes();
175+
176+
// Assert: Verify the buffer was released and the data matches
177+
assertArrayEquals(xmlBytes, result);
178+
assertEquals("Buffer should be released", 0, testBuf.refCnt());
179+
}
180+
181+
@Test
182+
public void testDeserializeRequestWithRealChannel() throws Exception {
183+
// Arrange
184+
String validXml =
185+
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<Delete xmlns=\"http://s3.amazonaws.com/doc/2006-03-01\">"
186+
+ "<Object>" + "<Key>key-success</Key>" + "</Object>" + "<Object>" + "<Key>key-error</Key>" + "</Object>"
187+
+ "</Delete>";
188+
189+
ByteBuf byteBuf = Unpooled.wrappedBuffer(validXml.getBytes());
190+
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel();
191+
channel.write(byteBuf, (result, exception) -> {
192+
if (exception != null) {
193+
fail("Failed to write to channel: " + exception.getMessage());
194+
}
195+
});
196+
197+
S3BatchDeleteHandler handler = new S3BatchDeleteHandler(null, null);
198+
199+
// Act
200+
S3MessagePayload.S3BatchDeleteObjects result = handler.deserializeRequest(channel);
201+
202+
// Assert
203+
assertNotNull(result);
204+
assertEquals(2, result.getObjects().size());
205+
assertEquals("key-success", result.getObjects().get(0).getKey());
206+
assertEquals("key-error", result.getObjects().get(1).getKey());
207+
}
208+
209+
126210
@Test
127211
public void malformedXMLRequestTest() throws Exception {
128212
String uri = String.format("/s3/%s/%s", account.getName(), container.getName());

0 commit comments

Comments
 (0)