Skip to content

Commit aed9d84

Browse files
authored
Refine Netty4Utils.toByteBuf (#138627)
1 parent aedf9e3 commit aed9d84

File tree

2 files changed

+89
-27
lines changed

2 files changed

+89
-27
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

Lines changed: 77 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.transport.netty4;
1111

1212
import io.netty.buffer.ByteBuf;
13-
import io.netty.buffer.CompositeByteBuf;
1413
import io.netty.buffer.Unpooled;
1514
import io.netty.channel.Channel;
1615
import io.netty.channel.ChannelFuture;
@@ -40,7 +39,6 @@
4039
import java.io.IOException;
4140
import java.nio.ByteBuffer;
4241
import java.util.ArrayList;
43-
import java.util.List;
4442
import java.util.Locale;
4543
import java.util.concurrent.RejectedExecutionException;
4644
import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,29 +89,7 @@ public static ByteBuf toByteBuf(final BytesReference reference) {
9189
if (reference.hasArray()) {
9290
return Unpooled.wrappedBuffer(reference.array(), reference.arrayOffset(), reference.length());
9391
}
94-
return compositeReferenceToByteBuf(reference);
95-
}
96-
97-
private static ByteBuf compositeReferenceToByteBuf(BytesReference reference) {
98-
final BytesRefIterator iterator = reference.iterator();
99-
// usually we have one, two, or three components from the header, the message, and a buffer
100-
final List<ByteBuf> buffers = new ArrayList<>(3);
101-
try {
102-
BytesRef slice;
103-
while ((slice = iterator.next()) != null) {
104-
buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length));
105-
}
106-
107-
if (buffers.size() == 1) {
108-
return buffers.get(0);
109-
} else {
110-
CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size());
111-
composite.addComponents(true, buffers);
112-
return composite;
113-
}
114-
} catch (IOException ex) {
115-
throw new AssertionError("no IO happens here", ex);
116-
}
92+
return fromByteRefIterator(reference.iterator());
11793
}
11894

11995
/**
@@ -223,6 +199,82 @@ private static boolean assertCorrectPromiseListenerThreading(ChannelPromise prom
223199
return true;
224200
}
225201

202+
static ByteBuf unpooledFrom(BytesRef ref) {
203+
return Unpooled.wrappedBuffer(ref.bytes, ref.offset, ref.length);
204+
}
205+
206+
static ByteBuf compositeOf2(ByteBuf buf1, ByteBuf buf2) {
207+
final var c = Unpooled.compositeBuffer(2);
208+
c.addComponent(true, buf1);
209+
c.addComponent(true, buf2);
210+
return c;
211+
}
212+
213+
static ByteBuf compositeOf3(ByteBuf buf1, ByteBuf buf2, ByteBuf buf3) {
214+
final var c = Unpooled.compositeBuffer(3);
215+
c.addComponent(true, buf1);
216+
c.addComponent(true, buf2);
217+
c.addComponent(true, buf3);
218+
return c;
219+
}
220+
221+
static ByteBuf compositeOf4OrMore(ByteBuf buf1, ByteBuf buf2, ByteBuf buf3, ByteBuf buf4, BytesRefIterator remaining)
222+
throws IOException {
223+
final var parts = new ArrayList<ByteBuf>(8) {
224+
{
225+
add(buf1);
226+
add(buf2);
227+
add(buf3);
228+
add(buf4);
229+
}
230+
};
231+
BytesRef slice;
232+
while ((slice = remaining.next()) != null) {
233+
parts.add(unpooledFrom(slice));
234+
}
235+
final var composite = Unpooled.compositeBuffer(parts.size());
236+
for (var part : parts) {
237+
composite.addComponent(true, part);
238+
}
239+
return composite;
240+
}
241+
242+
static ByteBuf fromByteRefIterator(BytesRefIterator iter) {
243+
try {
244+
// usually we have one, two, or three components from the header, the message, and a buffer
245+
// it is a hot-path and cases of 0,1,2,3 do not allocate extra list to collect all parts from iterator
246+
// CompositeByteBuf is fixed size and collecting parts from iterator is necessary
247+
248+
var slice = iter.next(); // slice might be reused based on iterator API
249+
if (slice == null) {
250+
return Unpooled.EMPTY_BUFFER;
251+
}
252+
final var buf1 = unpooledFrom(slice);
253+
254+
slice = iter.next();
255+
if (slice == null) {
256+
return buf1;
257+
}
258+
final var buf2 = unpooledFrom(slice);
259+
260+
slice = iter.next();
261+
if (slice == null) {
262+
return compositeOf2(buf1, buf2);
263+
}
264+
final var buf3 = unpooledFrom(slice);
265+
266+
slice = iter.next();
267+
if (slice == null) {
268+
return compositeOf3(buf1, buf2, buf3);
269+
}
270+
final var buf4 = unpooledFrom(slice);
271+
272+
return compositeOf4OrMore(buf1, buf2, buf3, buf4, iter);
273+
} catch (IOException e) {
274+
throw new AssertionError("no IO happens here", e);
275+
}
276+
}
277+
226278
/**
227279
* Subscribes the given {@link ActionListener} to the given {@link Future}.
228280
*/

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
2020
import org.elasticsearch.common.bytes.BytesArray;
2121
import org.elasticsearch.common.bytes.BytesReference;
22+
import org.elasticsearch.common.bytes.ZeroBytesReference;
2223
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
2324
import org.elasticsearch.common.util.BigArrays;
2425
import org.elasticsearch.common.util.PageCacheRecycler;
@@ -32,8 +33,17 @@ public class Netty4UtilsTests extends ESTestCase {
3233
private static final int PAGE_SIZE = PageCacheRecycler.BYTE_PAGE_SIZE;
3334
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);
3435

35-
public void testToChannelBufferWithEmptyRef() throws IOException {
36-
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));
36+
public void testToByteBufEmptyArray() throws IOException {
37+
var arrayBackedRef = getRandomizedBytesReference(0);
38+
assertTrue(arrayBackedRef.hasArray());
39+
var buffer = Netty4Utils.toByteBuf(arrayBackedRef);
40+
assertSame(Unpooled.EMPTY_BUFFER, buffer);
41+
}
42+
43+
public void testToByteBufEmptyIterator() {
44+
var iterBackedRef = new ZeroBytesReference(0);
45+
assertFalse(iterBackedRef.hasArray());
46+
var buffer = Netty4Utils.toByteBuf(iterBackedRef);
3747
assertSame(Unpooled.EMPTY_BUFFER, buffer);
3848
}
3949

0 commit comments

Comments
 (0)