Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 492977d

Browse files
committed
Introduce Deref abstraction
Motivation: Sometimes, we wish to operate on both buffers and anything that can produce a buffer. For instance, when making a composite buffer, we could compose either buffers or sends. Modification: Introduce a Deref interface, which is extended by both Rc and Send. A Deref can be used to acquire an Rc instance, and in doing so will also acquire a reference to the Rc. That is, dereferencing increases the reference count. For Rc itself, this just delegates to Rc.acquire, while for Send it delegates to Send.receive, and can only be called once. The Allocator.compose method has been changed to take Derefs. This allows us to compose either Bufs or Sends of bufs. Or a mix. Extra care and caution has been added to the code, to make sure the reference counts are managed correctly when composing buffers, now that it's a more complicated operation. A handful of convenience methods for working with Sends have also been added to the Send interface. Result: We can now build a composite buffer out of sends of buffers.
1 parent 1a741ba commit 492977d

File tree

13 files changed

+250
-48
lines changed

13 files changed

+250
-48
lines changed

src/main/java/io/netty/buffer/api/Allocator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ default Buf allocate(int size, ByteOrder order) {
107107
* @return A buffer composed of, and backed by, the given buffers.
108108
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
109109
*/
110-
default Buf compose(Buf... bufs) {
110+
default Buf compose(Deref<Buf>... bufs) {
111111
return new CompositeBuf(this, bufs);
112112
}
113113

@@ -117,8 +117,8 @@ default Buf compose(Buf... bufs) {
117117
* the composite buffer was created.
118118
* The composite buffer is modified in-place.
119119
*
120-
* @see #compose(Buf...)
121-
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
120+
* @see #compose(Deref...)
121+
* @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given
122122
* extension buffer.
123123
* @param extension The buffer to extend the composite buffer with.
124124
*/
@@ -133,9 +133,9 @@ static void extend(Buf composite, Buf extension) {
133133
}
134134

135135
/**
136-
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
136+
* Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not.
137137
* @param composite The buffer to check.
138-
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
138+
* @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise.
139139
*/
140140
static boolean isComposite(Buf composite) {
141141
return composite.getClass() == CompositeBuf.class;

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
7171
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
7272
* all {@linkplain #slice() slices} must have been closed.
73-
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
73+
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer},
7474
* then that composite buffer must be closed.
7575
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
7676
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
@@ -439,6 +439,8 @@ default void ensureWritable(int size) {
439439
/**
440440
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
441441
* <p>
442+
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
443+
* <p>
442444
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
443445
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
444446
* {@linkplain #send() sent} to other threads.

src/main/java/io/netty/buffer/api/BufHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public int countBorrows() {
8383
return buf.countBorrows();
8484
}
8585

86+
@SuppressWarnings("unchecked")
8687
@Override
8788
public Send<T> send() {
88-
var send = buf.send();
89-
return () -> receive(send.receive());
89+
return buf.send().map((Class<T>) getClass(), this::receive);
9090
}
9191

9292
/**

src/main/java/io/netty/buffer/api/ComponentProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ interface ReadableComponent {
121121
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
122122
*/
123123
ByteBuffer readableBuffer();
124+
// todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive
124125
}
125126

126127
/**

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,68 +54,102 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
5454
private boolean closed;
5555
private boolean readOnly;
5656

57-
CompositeBuf(Allocator allocator, Buf[] bufs) {
58-
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
57+
CompositeBuf(Allocator allocator, Deref<Buf>[] refs) {
58+
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
5959
}
6060

61-
private static Buf[] filterExternalBufs(Buf[] bufs) {
61+
private static Buf[] filterExternalBufs(Deref<Buf>[] refs) {
6262
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
6363
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
6464
// we make sure that the number of composite buffers will never become greater than the number of bytes in
6565
// the composite buffer.
6666
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
6767
// will never overflow their component counts.
6868
// Allocating a new array unconditionally also prevents external modification of the array.
69-
bufs = Arrays.stream(bufs)
70-
.filter(b -> b.capacity() > 0)
69+
Buf[] bufs = Arrays.stream(refs)
70+
.map(r -> r.get()) // Increments reference counts.
71+
.filter(CompositeBuf::discardEmpty)
7172
.flatMap(CompositeBuf::flattenBuffer)
7273
.toArray(Buf[]::new);
7374
// Make sure there are no duplicates among the buffers.
7475
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
7576
duplicatesCheck.addAll(Arrays.asList(bufs));
7677
if (duplicatesCheck.size() < bufs.length) {
78+
for (Buf buf : bufs) {
79+
buf.close(); // Undo the increment we did with Deref.get().
80+
}
7781
throw new IllegalArgumentException(
7882
"Cannot create composite buffer with duplicate constituent buffer components.");
7983
}
8084
return bufs;
8185
}
8286

87+
private static boolean discardEmpty(Buf buf) {
88+
if (buf.capacity() > 0) {
89+
return true;
90+
} else {
91+
// If we filter a buffer out, then we must make sure to close it since we incremented the reference count
92+
// with Deref.get() earlier.
93+
buf.close();
94+
return false;
95+
}
96+
}
97+
8398
private static Stream<Buf> flattenBuffer(Buf buf) {
8499
if (buf instanceof CompositeBuf) {
85-
return Stream.of(((CompositeBuf) buf).bufs);
100+
// Extract components and move our reference count from the composite onto the components.
101+
var composite = (CompositeBuf) buf;
102+
var bufs = composite.bufs;
103+
for (Buf b : bufs) {
104+
b.acquire();
105+
}
106+
buf.close(); // Important: acquire on components *before* closing composite.
107+
return Stream.of(bufs);
86108
}
87109
return Stream.of(buf);
88110
}
89111

90-
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
112+
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop,
113+
boolean acquireBufs) {
91114
super(drop);
92115
this.allocator = allocator;
93116
this.isSendable = isSendable;
94-
for (Buf buf : bufs) {
95-
buf.acquire();
96-
}
97-
if (bufs.length > 0) {
98-
ByteOrder targetOrder = bufs[0].order();
117+
if (acquireBufs) {
99118
for (Buf buf : bufs) {
100-
if (buf.order() != targetOrder) {
101-
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
102-
}
119+
buf.acquire();
103120
}
104-
order = bufs[0].order();
121+
}
122+
try {
123+
if (bufs.length > 0) {
124+
ByteOrder targetOrder = bufs[0].order();
125+
for (Buf buf : bufs) {
126+
if (buf.order() != targetOrder) {
127+
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
128+
}
129+
}
130+
order = bufs[0].order();
105131

106-
boolean targetReadOnly = bufs[0].readOnly();
107-
for (Buf buf : bufs) {
108-
if (buf.readOnly() != targetReadOnly) {
109-
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
132+
boolean targetReadOnly = bufs[0].readOnly();
133+
for (Buf buf : bufs) {
134+
if (buf.readOnly() != targetReadOnly) {
135+
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
136+
}
110137
}
138+
readOnly = targetReadOnly;
139+
} else {
140+
order = ByteOrder.nativeOrder();
111141
}
112-
readOnly = targetReadOnly;
113-
} else {
114-
order = ByteOrder.nativeOrder();
142+
this.bufs = bufs;
143+
computeBufferOffsets();
144+
tornBufAccessors = new TornBufAccessors(this);
145+
} catch (Exception e) {
146+
// Always close bufs on exception, regardless of acquireBufs value.
147+
// If acquireBufs is false, it just means the ref count increments happened prior to this constructor call.
148+
for (Buf buf : bufs) {
149+
buf.close();
150+
}
151+
throw e;
115152
}
116-
this.bufs = bufs;
117-
computeBufferOffsets();
118-
tornBufAccessors = new TornBufAccessors(this);
119153
}
120154

121155
private void computeBufferOffsets() {
@@ -292,7 +326,7 @@ public Buf slice(int offset, int length) {
292326
slices = new Buf[] { choice.slice(subOffset, 0) };
293327
}
294328

295-
return new CompositeBuf(allocator, false, slices, drop);
329+
return new CompositeBuf(allocator, false, slices, drop, true);
296330
} catch (Throwable throwable) {
297331
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
298332
close();
@@ -718,7 +752,7 @@ public Buf bifurcate() {
718752
}
719753
if (bufs.length == 0) {
720754
// Bifurcating a zero-length buffer is trivial.
721-
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
755+
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order);
722756
}
723757

724758
int i = searchOffsets(woff);
@@ -730,7 +764,7 @@ public Buf bifurcate() {
730764
}
731765
computeBufferOffsets();
732766
try {
733-
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
767+
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true);
734768
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
735769
return compositeBuf;
736770
} finally {
@@ -1133,7 +1167,7 @@ public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) {
11331167
for (int i = 0; i < sends.length; i++) {
11341168
received[i] = sends[i].receive();
11351169
}
1136-
var composite = new CompositeBuf(allocator, true, received, drop);
1170+
var composite = new CompositeBuf(allocator, true, received, drop, true);
11371171
composite.readOnly = readOnly;
11381172
drop.attach(composite);
11391173
return composite;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.buffer.api;
17+
18+
import java.util.function.Supplier;
19+
20+
/**
21+
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
22+
* <p>
23+
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
24+
* <p>
25+
* Deref itself does not specify if a reference can be obtained more than once.
26+
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
27+
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
28+
* times.
29+
*
30+
* @param <T> The concrete type of reference counted object that can be obtained.
31+
*/
32+
public interface Deref<T extends Rc<T>> extends Supplier<T> {
33+
/**
34+
* Acquire a reference to the reference counted object.
35+
* <p>
36+
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
37+
* a {@link Rc#close()} call.
38+
* Using a try-with-resources clause is the easiest way to ensure this.
39+
*
40+
* @return A reference to the reference counted object.
41+
*/
42+
@Override
43+
T get();
44+
45+
/**
46+
* Determine if the object in this {@code Deref} is an instance of the given class.
47+
*
48+
* @param cls The type to check.
49+
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
50+
*/
51+
boolean isInstanceOf(Class<?> cls);
52+
}

src/main/java/io/netty/buffer/api/Rc.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*
2727
* @param <I> The concrete subtype.
2828
*/
29-
public interface Rc<I extends Rc<I>> extends AutoCloseable {
29+
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
3030
/**
3131
* Increment the reference count.
3232
* <p>
@@ -36,6 +36,16 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
3636
*/
3737
I acquire();
3838

39+
@Override
40+
default I get() {
41+
return acquire();
42+
}
43+
44+
@Override
45+
default boolean isInstanceOf(Class<?> cls) {
46+
return cls.isInstance(this);
47+
}
48+
3949
/**
4050
* Decrement the reference count, and despose of the resource if the last reference is closed.
4151
* <p>

src/main/java/io/netty/buffer/api/RcSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public final Send<I> send() {
7979
}
8080
var owned = prepareSend();
8181
acquires = -2; // Close without dropping. This also ignore future double-free attempts.
82-
return new TransferSend<I, T>(owned, drop);
82+
return new TransferSend<I, T>(owned, drop, getClass());
8383
}
8484

8585
/**

0 commit comments

Comments
 (0)