Skip to content

Commit 9147428

Browse files
authored
Use underlying ByteBuf refCount for ReleasableBytesReference (#116211)
1 parent 583cf23 commit 9147428

File tree

4 files changed

+89
-36
lines changed

4 files changed

+89
-36
lines changed

docs/changelog/116211.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116211
2+
summary: Use underlying `ByteBuf` `refCount` for `ReleasableBytesReference`
3+
area: Network
4+
type: bug
5+
issues: []

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageInboundHandler.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414

1515
import org.elasticsearch.ElasticsearchException;
1616
import org.elasticsearch.ExceptionsHelper;
17-
import org.elasticsearch.common.bytes.BytesReference;
1817
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1918
import org.elasticsearch.common.network.ThreadWatchdog;
20-
import org.elasticsearch.core.RefCounted;
2119
import org.elasticsearch.core.Releasables;
2220
import org.elasticsearch.transport.InboundPipeline;
2321
import org.elasticsearch.transport.Transports;
@@ -52,9 +50,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5250

5351
final ByteBuf buffer = (ByteBuf) msg;
5452
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
55-
final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
5653
activityTracker.startActivity();
57-
try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, new ByteBufRefCounted(buffer))) {
54+
try (ReleasableBytesReference reference = Netty4Utils.toReleasableBytesReference(buffer)) {
5855
pipeline.handleBytes(channel, reference);
5956
} finally {
6057
activityTracker.stopActivity();
@@ -81,35 +78,4 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
8178
super.channelInactive(ctx);
8279
}
8380

84-
private record ByteBufRefCounted(ByteBuf buffer) implements RefCounted {
85-
86-
@Override
87-
public void incRef() {
88-
buffer.retain();
89-
}
90-
91-
@Override
92-
public boolean tryIncRef() {
93-
if (hasReferences() == false) {
94-
return false;
95-
}
96-
try {
97-
buffer.retain();
98-
} catch (RuntimeException e) {
99-
assert hasReferences() == false;
100-
return false;
101-
}
102-
return true;
103-
}
104-
105-
@Override
106-
public boolean decRef() {
107-
return buffer.release();
108-
}
109-
110-
@Override
111-
public boolean hasReferences() {
112-
return buffer.refCnt() > 0;
113-
}
114-
}
11581
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.util.concurrent.EsExecutors;
3434
import org.elasticsearch.core.Booleans;
35+
import org.elasticsearch.core.RefCounted;
3536
import org.elasticsearch.core.SuppressForbidden;
3637
import org.elasticsearch.http.HttpBody;
3738
import org.elasticsearch.transport.TransportException;
@@ -130,8 +131,51 @@ public static BytesReference toBytesReference(final ByteBuf buffer) {
130131
}
131132
}
132133

134+
/**
135+
* Wrap Netty's {@link ByteBuf} into {@link ReleasableBytesReference} and delegating reference count to ByteBuf.
136+
*/
133137
public static ReleasableBytesReference toReleasableBytesReference(final ByteBuf buffer) {
134-
return new ReleasableBytesReference(toBytesReference(buffer), buffer::release);
138+
return new ReleasableBytesReference(toBytesReference(buffer), toRefCounted(buffer));
139+
}
140+
141+
static ByteBufRefCounted toRefCounted(final ByteBuf buf) {
142+
return new ByteBufRefCounted(buf);
143+
}
144+
145+
record ByteBufRefCounted(ByteBuf buffer) implements RefCounted {
146+
147+
public int refCnt() {
148+
return buffer.refCnt();
149+
}
150+
151+
@Override
152+
public void incRef() {
153+
buffer.retain();
154+
}
155+
156+
@Override
157+
public boolean tryIncRef() {
158+
if (hasReferences() == false) {
159+
return false;
160+
}
161+
try {
162+
buffer.retain();
163+
} catch (RuntimeException e) {
164+
assert hasReferences() == false;
165+
return false;
166+
}
167+
return true;
168+
}
169+
170+
@Override
171+
public boolean decRef() {
172+
return buffer.release();
173+
}
174+
175+
@Override
176+
public boolean hasReferences() {
177+
return buffer.refCnt() > 0;
178+
}
135179
}
136180

137181
public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import io.netty.buffer.ByteBuf;
1313
import io.netty.buffer.CompositeByteBuf;
14+
import io.netty.buffer.PooledByteBufAllocator;
1415
import io.netty.buffer.Unpooled;
1516

1617
import org.apache.lucene.util.BytesRef;
@@ -68,6 +69,43 @@ public void testToChannelBuffer() throws IOException {
6869
assertArrayEquals(BytesReference.toBytes(ref), BytesReference.toBytes(bytesReference));
6970
}
7071

72+
/**
73+
* Test that wrapped reference counted object from netty reflects correct counts in ES RefCounted
74+
*/
75+
public void testToRefCounted() {
76+
var buf = PooledByteBufAllocator.DEFAULT.buffer(1);
77+
assertEquals(1, buf.refCnt());
78+
79+
var refCounted = Netty4Utils.toRefCounted(buf);
80+
assertEquals(1, refCounted.refCnt());
81+
82+
buf.retain();
83+
assertEquals(2, refCounted.refCnt());
84+
85+
refCounted.incRef();
86+
assertEquals(3, refCounted.refCnt());
87+
assertEquals(buf.refCnt(), refCounted.refCnt());
88+
89+
refCounted.decRef();
90+
assertEquals(2, refCounted.refCnt());
91+
assertEquals(buf.refCnt(), refCounted.refCnt());
92+
assertTrue(refCounted.hasReferences());
93+
94+
refCounted.decRef();
95+
refCounted.decRef();
96+
assertFalse(refCounted.hasReferences());
97+
}
98+
99+
/**
100+
* Ensures that released ByteBuf cannot be accessed from ReleasableBytesReference
101+
*/
102+
public void testToReleasableBytesReferenceThrowOnByteBufRelease() {
103+
var buf = PooledByteBufAllocator.DEFAULT.buffer(1);
104+
var relBytes = Netty4Utils.toReleasableBytesReference(buf);
105+
buf.release();
106+
assertThrows(AssertionError.class, () -> relBytes.get(0));
107+
}
108+
71109
private BytesReference getRandomizedBytesReference(int length) throws IOException {
72110
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
73111
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(length, bigarrays);

0 commit comments

Comments
 (0)