Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit d382017

Browse files
committed
Add support for iterating underlying buffer components
Motivation: It's desirable to be able to access the contents of a Buf via an array or a ByteBuffer. However, we would also like to have a unified API that works for both composite and non-composite buffers. Even for nested composite buffers. Modification: Add a forEachReadable method, which uses internal iteration to process all buffer components. The internal iteration allows us to hide any nesting of composite buffers. The consumer in the internal iteration is presented with a Component object, which exposes the contents in various ways. The data is exposed from the Component via methods, such that anything that is expensive to create, will not have to be paid for unless it is used. This mechanism also let us avoid any allocation unnecessary allocation; the ByteBuffers and arrays will necessarily have to be allocated, but the consumer may or may not need allocation depending on how it's implemented, and the component objects do not need to be allocated, because the non-composite buffers can directly implement the Component interface. Result: It's now possible to access the contents of Buf instances as arrays or ByteBuffers, without having to copy the data.
1 parent 8cdcfd5 commit d382017

File tree

6 files changed

+924
-108
lines changed

6 files changed

+924
-108
lines changed

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.nio.ByteOrder;
20+
import java.util.function.Consumer;
2021

2122
/**
2223
* A reference counted buffer of memory, with separate reader and writer offsets.
@@ -187,7 +188,7 @@ default int writableBytes() {
187188
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
188189
* @return The native memory address, if any, otherwise 0.
189190
*/
190-
long getNativeAddress();
191+
long nativeAddress();
191192

192193
/**
193194
* Set the read-only state of this buffer.
@@ -482,4 +483,26 @@ default void ensureWritable(int size) {
482483
* or is {@linkplain #readOnly() read-only}.
483484
*/
484485
void compact();
486+
487+
/**
488+
* Get the number of "components" in this buffer. For composite buffers, this is the number of transitive
489+
* constituent buffers, while non-composite buffers only have one component.
490+
*
491+
* @return The number of components in this buffer.
492+
*/
493+
int componentCount();
494+
495+
/**
496+
* Process all readable components of this buffer, and return the number of components consumed.
497+
* <p>
498+
* The number of components consumed may be less than the {@linkplain #componentCount() component count} if not all
499+
* of them have readable data.
500+
*
501+
* <strong>Note</strong> that the {@link Component} instance passed to the consumer could be reused for multiple
502+
* calls, so the data must be extracted from the component in the context of the iteration.
503+
*
504+
* @param consumer The consumer that will be used to process the buffer components.
505+
* @return The number of readable components processed, which may be less than {@link #componentCount()}.
506+
*/
507+
int forEachReadable(Consumer<Component> consumer);
485508
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2020 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.buffer.api;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.function.Consumer;
20+
21+
/**
22+
* A view onto the buffer component being processed in a given iteration of {@link Buf#forEachReadable(Consumer)}.
23+
* <p>
24+
* Instances of this interface are allowed to be mutable behind the scenes, and the data is only guaranteed to be
25+
* consistent within the given iteration.
26+
*/
27+
public interface Component {
28+
29+
/**
30+
* Check if this component is backed by a cached byte array than can be accessed cheaply.
31+
*
32+
* @return {@code true} if {@link #array()} is a cheap operation, otherwise {@code false}.
33+
*/
34+
boolean hasCachedArray();
35+
36+
/**
37+
* Get a byte array of the contents of this component.
38+
* <p>
39+
* <strong>Note</strong> that the array is meant to be read-only. It may either be a direct reference to the
40+
* concrete array instance that is backing this component, or it is a fresh copy.
41+
*
42+
* @return A byte array of the contents of this component.
43+
*/
44+
byte[] array();
45+
46+
/**
47+
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
48+
* @return The native memory address, if any, otherwise 0.
49+
*/
50+
long nativeAddress();
51+
52+
/**
53+
* Build a {@link ByteBuffer} instance for this memory component.
54+
* @return A new {@link ByteBuffer} for this memory component.
55+
*/
56+
ByteBuffer byteBuffer();
57+
}

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.nio.ByteOrder;
2020
import java.util.Arrays;
2121
import java.util.Objects;
22+
import java.util.function.Consumer;
2223

2324
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
2425
/**
@@ -203,7 +204,7 @@ public Buf fill(byte value) {
203204
}
204205

205206
@Override
206-
public long getNativeAddress() {
207+
public long nativeAddress() {
207208
return 0;
208209
}
209210

@@ -700,6 +701,27 @@ public void compact() {
700701
writerOffset(woff - distance);
701702
}
702703

704+
@Override
705+
public int componentCount() {
706+
int sum = 0;
707+
for (Buf buf : bufs) {
708+
sum += buf.componentCount();
709+
}
710+
return sum;
711+
}
712+
713+
@Override
714+
public int forEachReadable(Consumer<Component> consumer) {
715+
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
716+
int visited = 0;
717+
for (Buf buf : bufs) {
718+
if (buf.readableBytes() > 0) {
719+
visited += buf.forEachReadable(consumer);
720+
}
721+
}
722+
return visited;
723+
}
724+
703725
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
704726
@Override
705727
public byte readByte() {

src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import io.netty.buffer.api.AllocatorControl;
2020
import io.netty.buffer.api.Buf;
2121
import io.netty.buffer.api.ByteCursor;
22+
import io.netty.buffer.api.Component;
2223
import io.netty.buffer.api.Drop;
2324
import io.netty.buffer.api.Owned;
2425
import io.netty.buffer.api.RcSupport;
2526
import jdk.incubator.foreign.MemorySegment;
2627

2728
import java.nio.ByteBuffer;
2829
import java.nio.ByteOrder;
30+
import java.util.function.Consumer;
2931

3032
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset;
3133
import static jdk.incubator.foreign.MemoryAccess.getCharAtOffset;
@@ -42,7 +44,7 @@
4244
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
4345
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
4446

45-
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
47+
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, Component {
4648
private static final MemorySegment CLOSED_SEGMENT;
4749
static final Drop<MemSegBuf> SEGMENT_CLOSE;
4850

@@ -58,22 +60,25 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
5860

5961
private final AllocatorControl alloc;
6062
private final boolean isSendable;
63+
private final int baseOffset; // TODO remove this when JDK bug is fixed (slices of heap buffers)
6164
private MemorySegment seg;
6265
private MemorySegment wseg;
6366
private ByteOrder order;
6467
private int roff;
6568
private int woff;
6669

6770
MemSegBuf(MemorySegment segmet, Drop<MemSegBuf> drop, AllocatorControl alloc) {
68-
this(segmet, drop, alloc, true);
71+
this(segmet, drop, alloc, true, 0);
6972
}
7073

71-
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, AllocatorControl alloc, boolean isSendable) {
74+
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, AllocatorControl alloc, boolean isSendable,
75+
int baseOffset) {
7276
super(drop);
7377
this.alloc = alloc;
7478
seg = segment;
7579
wseg = segment;
7680
this.isSendable = isSendable;
81+
this.baseOffset = baseOffset;
7782
order = ByteOrder.nativeOrder();
7883
}
7984

@@ -130,14 +135,42 @@ public Buf fill(byte value) {
130135
}
131136

132137
@Override
133-
public long getNativeAddress() {
138+
public boolean hasCachedArray() {
139+
return false;
140+
}
141+
142+
@Override
143+
public byte[] array() {
144+
return seg.toByteArray();
145+
}
146+
147+
@Override
148+
public long nativeAddress() {
134149
try {
135150
return seg.address().toRawLongValue();
136151
} catch (UnsupportedOperationException e) {
137152
return 0; // This is a heap segment.
138153
}
139154
}
140155

156+
@Override
157+
public ByteBuffer byteBuffer() {
158+
var buffer = seg.asByteBuffer();
159+
int base = baseOffset;
160+
if (buffer.isDirect()) {
161+
// TODO Remove this when JDK bug is fixed.
162+
ByteBuffer tmp = ByteBuffer.allocateDirect(buffer.capacity());
163+
tmp.put(buffer);
164+
buffer = tmp.position(0);
165+
base = 0; // TODO native memory segments do not have the buffer-of-slice bug.
166+
}
167+
if (readOnly()) {
168+
buffer = buffer.asReadOnlyBuffer();
169+
}
170+
// TODO avoid slicing and just set position+limit when JDK bug is fixed.
171+
return buffer.slice(base + readerOffset(), readableBytes()).order(order);
172+
}
173+
141174
@Override
142175
public Buf readOnly(boolean readOnly) {
143176
wseg = readOnly? CLOSED_SEGMENT : seg;
@@ -161,7 +194,7 @@ public Buf slice(int offset, int length) {
161194
b.makeInaccessible();
162195
};
163196
var sendable = false; // Sending implies ownership change, which we can't do for slices.
164-
return new MemSegBuf(slice, drop, alloc, sendable)
197+
return new MemSegBuf(slice, drop, alloc, sendable, baseOffset + offset)
165198
.writerOffset(length)
166199
.order(order())
167200
.readOnly(readOnly());
@@ -458,6 +491,18 @@ public void compact() {
458491
woff -= distance;
459492
}
460493

494+
@Override
495+
public int componentCount() {
496+
return 1;
497+
}
498+
499+
@Override
500+
public int forEachReadable(Consumer<Component> consumer) {
501+
checkRead(readerOffset(), Math.max(1, readableBytes()));
502+
consumer.accept(this);
503+
return 1;
504+
}
505+
461506
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
462507
@Override
463508
public byte readByte() {
@@ -969,13 +1014,13 @@ public boolean isOwned() {
9691014

9701015
private void checkRead(int index, int size) {
9711016
if (index < 0 || woff < index + size) {
972-
throw accessCheckException(index);
1017+
throw readAccessCheckException(index);
9731018
}
9741019
}
9751020

9761021
private void checkWrite(int index, int size) {
9771022
if (index < 0 || wseg.byteSize() < index + size) {
978-
throw accessCheckException(index);
1023+
throw writeAccessCheckException(index);
9791024
}
9801025
}
9811026

@@ -989,16 +1034,21 @@ private RuntimeException checkWriteState(IndexOutOfBoundsException ioobe) {
9891034
return ioobe;
9901035
}
9911036

992-
private RuntimeException accessCheckException(int index) {
1037+
private RuntimeException readAccessCheckException(int index) {
1038+
if (seg == CLOSED_SEGMENT) {
1039+
throw bufferIsClosed();
1040+
}
1041+
return outOfBounds(index);
1042+
}
1043+
1044+
private RuntimeException writeAccessCheckException(int index) {
9931045
if (seg == CLOSED_SEGMENT) {
9941046
throw bufferIsClosed();
9951047
}
9961048
if (wseg != seg) {
9971049
return bufferIsReadOnly();
9981050
}
999-
return new IndexOutOfBoundsException(
1000-
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
1001-
(seg.byteSize() - 1) + "].");
1051+
return outOfBounds(index);
10021052
}
10031053

10041054
private static IllegalStateException bufferIsClosed() {
@@ -1009,6 +1059,12 @@ private static IllegalStateException bufferIsReadOnly() {
10091059
return new IllegalStateException("This buffer is read-only.");
10101060
}
10111061

1062+
private IndexOutOfBoundsException outOfBounds(int index) {
1063+
return new IndexOutOfBoundsException(
1064+
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
1065+
(seg.byteSize() - 1) + "].");
1066+
}
1067+
10121068
Object recoverableMemory() {
10131069
return new RecoverableMemory(seg, alloc);
10141070
}

0 commit comments

Comments
 (0)