Skip to content

Commit 38f698d

Browse files
authored
JAVA-2947: Release buffer after decoding multi-slice frame (#1554)
1 parent bb145eb commit 38f698d

File tree

4 files changed

+207
-17
lines changed

4 files changed

+207
-17
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.11.2 (in progress)
66

7+
- [bug] JAVA-2947: Release buffer after decoding multi-slice frame
78
- [bug] JAVA-2946: Make MapperResultProducerService instances be located with user-provided class loader
89
- [bug] JAVA-2942: GraphStatement.setConsistencyLevel() is not effective
910
- [bug] JAVA-2941: Cannot add a single static column with the alter table API

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,19 @@ protected void decode(
6969
private void decodeSelfContained(Segment<ByteBuf> segment, List<Object> out) {
7070
ByteBuf payload = segment.payload;
7171
int frameCount = 0;
72-
do {
73-
Frame frame = frameCodec.decode(payload);
74-
LOG.trace(
75-
"[{}] Decoded response frame {} from self-contained segment", logPrefix, frame.streamId);
76-
out.add(frame);
77-
frameCount += 1;
78-
} while (payload.isReadable());
79-
payload.release();
72+
try {
73+
do {
74+
Frame frame = frameCodec.decode(payload);
75+
LOG.trace(
76+
"[{}] Decoded response frame {} from self-contained segment",
77+
logPrefix,
78+
frame.streamId);
79+
out.add(frame);
80+
frameCount += 1;
81+
} while (payload.isReadable());
82+
} finally {
83+
payload.release();
84+
}
8085
LOG.trace("[{}] Done processing self-contained segment ({} frames)", logPrefix, frameCount);
8186
}
8287

@@ -89,28 +94,34 @@ private void decodeSlice(Segment<ByteBuf> segment, ByteBufAllocator allocator, L
8994
}
9095
accumulatedSlices.add(slice);
9196
accumulatedLength += slice.readableBytes();
97+
int accumulatedSlicesSize = accumulatedSlices.size();
9298
LOG.trace(
9399
"[{}] Decoded slice {}, {}/{} bytes",
94100
logPrefix,
95-
accumulatedSlices.size(),
101+
accumulatedSlicesSize,
96102
accumulatedLength,
97103
targetLength);
98104
assert accumulatedLength <= targetLength;
99105
if (accumulatedLength == targetLength) {
100106
// We've received enough data to reassemble the whole message
101-
CompositeByteBuf encodedFrame = allocator.compositeBuffer(accumulatedSlices.size());
107+
CompositeByteBuf encodedFrame = allocator.compositeBuffer(accumulatedSlicesSize);
102108
encodedFrame.addComponents(true, accumulatedSlices);
103-
Frame frame = frameCodec.decode(encodedFrame);
109+
Frame frame;
110+
try {
111+
frame = frameCodec.decode(encodedFrame);
112+
} finally {
113+
encodedFrame.release();
114+
// Reset our state
115+
targetLength = UNKNOWN_LENGTH;
116+
accumulatedSlices.clear();
117+
accumulatedLength = 0;
118+
}
104119
LOG.trace(
105120
"[{}] Decoded response frame {} from {} slices",
106121
logPrefix,
107122
frame.streamId,
108-
accumulatedSlices.size());
123+
accumulatedSlicesSize);
109124
out.add(frame);
110-
// Reset our state
111-
targetLength = UNKNOWN_LENGTH;
112-
accumulatedSlices.clear();
113-
accumulatedLength = 0;
114125
}
115126
}
116127
}

core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public void should_decode_sequence_of_slices() {
7373
encodeFrame(new AuthResponse(Bytes.fromHexString("0x" + Strings.repeat("aa", 1011))));
7474
int sliceLength = 100;
7575
do {
76-
ByteBuf payload = encodedFrame.readSlice(Math.min(sliceLength, encodedFrame.readableBytes()));
76+
ByteBuf payload =
77+
encodedFrame.readRetainedSlice(Math.min(sliceLength, encodedFrame.readableBytes()));
7778
channel.writeInbound(new Segment<>(payload, false));
7879
} while (encodedFrame.isReadable());
7980

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.core.connection;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.never;
21+
import static org.mockito.Mockito.verify;
22+
23+
import ch.qos.logback.classic.Logger;
24+
import ch.qos.logback.classic.spi.ILoggingEvent;
25+
import ch.qos.logback.core.Appender;
26+
import com.datastax.oss.driver.api.core.CqlSession;
27+
import com.datastax.oss.driver.api.core.Version;
28+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
29+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
30+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
31+
import com.datastax.oss.driver.api.core.cql.Row;
32+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
33+
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
34+
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
35+
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
36+
import com.datastax.oss.driver.categories.IsolatedTests;
37+
import com.datastax.oss.driver.shaded.guava.common.base.Strings;
38+
import com.datastax.oss.protocol.internal.Segment;
39+
import com.datastax.oss.protocol.internal.util.Bytes;
40+
import io.netty.util.ResourceLeakDetector;
41+
import io.netty.util.ResourceLeakDetector.Level;
42+
import java.nio.ByteBuffer;
43+
import java.util.List;
44+
import org.junit.After;
45+
import org.junit.Assume;
46+
import org.junit.Before;
47+
import org.junit.BeforeClass;
48+
import org.junit.ClassRule;
49+
import org.junit.Test;
50+
import org.junit.experimental.categories.Category;
51+
import org.junit.rules.RuleChain;
52+
import org.junit.rules.TestRule;
53+
import org.junit.runner.RunWith;
54+
import org.mockito.Mock;
55+
import org.mockito.junit.MockitoJUnitRunner;
56+
import org.slf4j.LoggerFactory;
57+
58+
@Category(IsolatedTests.class)
59+
@RunWith(MockitoJUnitRunner.class)
60+
public class NettyResourceLeakDetectionIT {
61+
62+
static {
63+
ResourceLeakDetector.setLevel(Level.PARANOID);
64+
}
65+
66+
private static final CustomCcmRule CCM_RULE = CustomCcmRule.builder().build();
67+
68+
private static final SessionRule<CqlSession> SESSION_RULE = SessionRule.builder(CCM_RULE).build();
69+
70+
@ClassRule
71+
public static final TestRule CHAIN = RuleChain.outerRule(CCM_RULE).around(SESSION_RULE);
72+
73+
private static final ByteBuffer LARGE_PAYLOAD =
74+
Bytes.fromHexString("0x" + Strings.repeat("ab", Segment.MAX_PAYLOAD_LENGTH + 100));
75+
76+
@Mock private Appender<ILoggingEvent> appender;
77+
78+
@BeforeClass
79+
public static void createTables() {
80+
CqlSession session = SESSION_RULE.session();
81+
DriverExecutionProfile slowProfile = SessionUtils.slowProfile(session);
82+
session.execute(
83+
SimpleStatement.newInstance(
84+
"CREATE TABLE IF NOT EXISTS leak_test_small (key int PRIMARY KEY, value int)")
85+
.setExecutionProfile(slowProfile));
86+
session.execute(
87+
SimpleStatement.newInstance(
88+
"CREATE TABLE IF NOT EXISTS leak_test_large (key int PRIMARY KEY, value blob)")
89+
.setExecutionProfile(slowProfile));
90+
}
91+
92+
@Before
93+
public void setupLogger() {
94+
Logger logger = (Logger) LoggerFactory.getLogger(ResourceLeakDetector.class);
95+
logger.setLevel(ch.qos.logback.classic.Level.ERROR);
96+
logger.addAppender(appender);
97+
}
98+
99+
@After
100+
public void resetLogger() {
101+
Logger logger = (Logger) LoggerFactory.getLogger(ResourceLeakDetector.class);
102+
logger.detachAppender(appender);
103+
}
104+
105+
@Test
106+
public void should_not_leak_uncompressed() {
107+
doLeakDetectionTest(SESSION_RULE.session());
108+
}
109+
110+
@Test
111+
public void should_not_leak_compressed_lz4() {
112+
DriverConfigLoader loader =
113+
SessionUtils.configLoaderBuilder()
114+
.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, "lz4")
115+
.build();
116+
try (CqlSession session = SessionUtils.newSession(CCM_RULE, SESSION_RULE.keyspace(), loader)) {
117+
doLeakDetectionTest(session);
118+
}
119+
}
120+
121+
@Test
122+
public void should_not_leak_compressed_snappy() {
123+
Assume.assumeTrue(
124+
"Snappy is not supported in OSS C* 4.0+ with protocol v5",
125+
CCM_RULE.getDseVersion().isPresent()
126+
|| CCM_RULE.getCassandraVersion().nextStable().compareTo(Version.V4_0_0) < 0);
127+
DriverConfigLoader loader =
128+
SessionUtils.configLoaderBuilder()
129+
.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, "snappy")
130+
.build();
131+
try (CqlSession session = SessionUtils.newSession(CCM_RULE, SESSION_RULE.keyspace(), loader)) {
132+
doLeakDetectionTest(session);
133+
}
134+
}
135+
136+
private void doLeakDetectionTest(CqlSession session) {
137+
for (int i = 0; i < 10; i++) {
138+
testSmallMessages(session);
139+
verify(appender, never()).doAppend(any());
140+
System.gc();
141+
testLargeMessages(session);
142+
verify(appender, never()).doAppend(any());
143+
System.gc();
144+
}
145+
}
146+
147+
private void testSmallMessages(CqlSession session) {
148+
// trigger some activity using small requests and responses; in v5, these messages should fit in
149+
// one single, self-contained segment
150+
for (int i = 0; i < 1000; i++) {
151+
session.execute("INSERT INTO leak_test_small (key, value) VALUES (?,?)", i, i);
152+
}
153+
List<Row> rows = session.execute("SELECT value FROM leak_test_small").all();
154+
assertThat(rows).hasSize(1000);
155+
for (Row row : rows) {
156+
assertThat(row).isNotNull();
157+
int actual = row.getInt(0);
158+
assertThat(actual).isGreaterThanOrEqualTo(0).isLessThan(1000);
159+
}
160+
}
161+
162+
private void testLargeMessages(CqlSession session) {
163+
// trigger some activity using large requests and responses; in v5, these messages are likely to
164+
// be split in multiple segments
165+
for (int i = 0; i < 100; i++) {
166+
session.execute(
167+
"INSERT INTO leak_test_large (key, value) VALUES (?,?)", i, LARGE_PAYLOAD.duplicate());
168+
}
169+
List<Row> rows = session.execute("SELECT value FROM leak_test_large").all();
170+
assertThat(rows).hasSize(100);
171+
for (Row row : rows) {
172+
assertThat(row).isNotNull();
173+
ByteBuffer actual = row.getByteBuffer(0);
174+
assertThat(actual).isEqualTo(LARGE_PAYLOAD.duplicate());
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)