Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 5f1f0ba

Browse files
authored
Merge pull request #27 from netty/send-deref
Introduce Deref abstraction
2 parents 1a741ba + 492977d commit 5f1f0ba

File tree

13 files changed

+250
-48
lines changed

13 files changed

+250
-48
lines changed

src/main/java/io/netty/buffer/api/Allocator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ default Buf allocate(int size, ByteOrder order) {
107107
* @return A buffer composed of, and backed by, the given buffers.
108108
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
109109
*/
110-
default Buf compose(Buf... bufs) {
110+
default Buf compose(Deref<Buf>... bufs) {
111111
return new CompositeBuf(this, bufs);
112112
}
113113

@@ -117,8 +117,8 @@ default Buf compose(Buf... bufs) {
117117
* the composite buffer was created.
118118
* The composite buffer is modified in-place.
119119
*
120-
* @see #compose(Buf...)
121-
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
120+
* @see #compose(Deref...)
121+
* @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given
122122
* extension buffer.
123123
* @param extension The buffer to extend the composite buffer with.
124124
*/
@@ -133,9 +133,9 @@ static void extend(Buf composite, Buf extension) {
133133
}
134134

135135
/**
136-
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
136+
* Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not.
137137
* @param composite The buffer to check.
138-
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
138+
* @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise.
139139
*/
140140
static boolean isComposite(Buf composite) {
141141
return composite.getClass() == CompositeBuf.class;

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
7171
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
7272
* all {@linkplain #slice() slices} must have been closed.
73-
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
73+
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer},
7474
* then that composite buffer must be closed.
7575
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
7676
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
@@ -439,6 +439,8 @@ default void ensureWritable(int size) {
439439
/**
440440
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
441441
* <p>
442+
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
443+
* <p>
442444
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
443445
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
444446
* {@linkplain #send() sent} to other threads.

src/main/java/io/netty/buffer/api/BufHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public int countBorrows() {
8383
return buf.countBorrows();
8484
}
8585

86+
@SuppressWarnings("unchecked")
8687
@Override
8788
public Send<T> send() {
88-
var send = buf.send();
89-
return () -> receive(send.receive());
89+
return buf.send().map((Class<T>) getClass(), this::receive);
9090
}
9191

9292
/**

src/main/java/io/netty/buffer/api/ComponentProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ interface ReadableComponent {
121121
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
122122
*/
123123
ByteBuffer readableBuffer();
124+
// todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive
124125
}
125126

126127
/**

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,68 +54,102 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
5454
private boolean closed;
5555
private boolean readOnly;
5656

57-
CompositeBuf(Allocator allocator, Buf[] bufs) {
58-
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
57+
CompositeBuf(Allocator allocator, Deref<Buf>[] refs) {
58+
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
5959
}
6060

61-
private static Buf[] filterExternalBufs(Buf[] bufs) {
61+
private static Buf[] filterExternalBufs(Deref<Buf>[] refs) {
6262
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
6363
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
6464
// we make sure that the number of composite buffers will never become greater than the number of bytes in
6565
// the composite buffer.
6666
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
6767
// will never overflow their component counts.
6868
// Allocating a new array unconditionally also prevents external modification of the array.
69-
bufs = Arrays.stream(bufs)
70-
.filter(b -> b.capacity() > 0)
69+
Buf[] bufs = Arrays.stream(refs)
70+
.map(r -> r.get()) // Increments reference counts.
71+
.filter(CompositeBuf::discardEmpty)
7172
.flatMap(CompositeBuf::flattenBuffer)
7273
.toArray(Buf[]::new);
7374
// Make sure there are no duplicates among the buffers.
7475
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
7576
duplicatesCheck.addAll(Arrays.asList(bufs));
7677
if (duplicatesCheck.size() < bufs.length) {
78+
for (Buf buf : bufs) {
79+
buf.close(); // Undo the increment we did with Deref.get().
80+
}
7781
throw new IllegalArgumentException(
7882
"Cannot create composite buffer with duplicate constituent buffer components.");
7983
}
8084
return bufs;
8185
}
8286

87+
private static boolean discardEmpty(Buf buf) {
88+
if (buf.capacity() > 0) {
89+
return true;
90+
} else {
91+
// If we filter a buffer out, then we must make sure to close it since we incremented the reference count
92+
// with Deref.get() earlier.
93+
buf.close();
94+
return false;
95+
}
96+
}
97+
8398
private static Stream<Buf> flattenBuffer(Buf buf) {
8499
if (buf instanceof CompositeBuf) {
85-
return Stream.of(((CompositeBuf) buf).bufs);
100+
// Extract components and move our reference count from the composite onto the components.
101+
var composite = (CompositeBuf) buf;
102+
var bufs = composite.bufs;
103+
for (Buf b : bufs) {
104+
b.acquire();
105+
}
106+
buf.close(); // Important: acquire on components *before* closing composite.
107+
return Stream.of(bufs);
86108
}
87109
return Stream.of(buf);
88110
}
89111

90-
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
112+
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop,
113+
boolean acquireBufs) {
91114
super(drop);
92115
this.allocator = allocator;
93116
this.isSendable = isSendable;
94-
for (Buf buf : bufs) {
95-
buf.acquire();
96-
}
97-
if (bufs.length > 0) {
98-
ByteOrder targetOrder = bufs[0].order();
117+
if (acquireBufs) {
99118
for (Buf buf : bufs) {
100-
if (buf.order() != targetOrder) {
101-
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
102-
}
119+
buf.acquire();
103120
}
104-
order = bufs[0].order();
121+
}
122+
try {
123+
if (bufs.length > 0) {
124+
ByteOrder targetOrder = bufs[0].order();
125+
for (Buf buf : bufs) {
126+
if (buf.order() != targetOrder) {
127+
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
128+
}
129+
}
130+
order = bufs[0].order();
105131

106-
boolean targetReadOnly = bufs[0].readOnly();
107-
for (Buf buf : bufs) {
108-
if (buf.readOnly() != targetReadOnly) {
109-
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
132+
boolean targetReadOnly = bufs[0].readOnly();
133+
for (Buf buf : bufs) {
134+
if (buf.readOnly() != targetReadOnly) {
135+
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
136+
}
110137
}
138+
readOnly = targetReadOnly;
139+
} else {
140+
order = ByteOrder.nativeOrder();
111141
}
112-
readOnly = targetReadOnly;
113-
} else {
114-
order = ByteOrder.nativeOrder();
142+
this.bufs = bufs;
143+
computeBufferOffsets();
144+
tornBufAccessors = new TornBufAccessors(this);
145+
} catch (Exception e) {
146+
// Always close bufs on exception, regardless of acquireBufs value.
147+
// If acquireBufs is false, it just means the ref count increments happened prior to this constructor call.
148+
for (Buf buf : bufs) {
149+
buf.close();
150+
}
151+
throw e;
115152
}
116-
this.bufs = bufs;
117-
computeBufferOffsets();
118-
tornBufAccessors = new TornBufAccessors(this);
119153
}
120154

121155
private void computeBufferOffsets() {
@@ -292,7 +326,7 @@ public Buf slice(int offset, int length) {
292326
slices = new Buf[] { choice.slice(subOffset, 0) };
293327
}
294328

295-
return new CompositeBuf(allocator, false, slices, drop);
329+
return new CompositeBuf(allocator, false, slices, drop, true);
296330
} catch (Throwable throwable) {
297331
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
298332
close();
@@ -718,7 +752,7 @@ public Buf bifurcate() {
718752
}
719753
if (bufs.length == 0) {
720754
// Bifurcating a zero-length buffer is trivial.
721-
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
755+
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order);
722756
}
723757

724758
int i = searchOffsets(woff);
@@ -730,7 +764,7 @@ public Buf bifurcate() {
730764
}
731765
computeBufferOffsets();
732766
try {
733-
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
767+
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true);
734768
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
735769
return compositeBuf;
736770
} finally {
@@ -1133,7 +1167,7 @@ public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) {
11331167
for (int i = 0; i < sends.length; i++) {
11341168
received[i] = sends[i].receive();
11351169
}
1136-
var composite = new CompositeBuf(allocator, true, received, drop);
1170+
var composite = new CompositeBuf(allocator, true, received, drop, true);
11371171
composite.readOnly = readOnly;
11381172
drop.attach(composite);
11391173
return composite;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.buffer.api;
17+
18+
import java.util.function.Supplier;
19+
20+
/**
21+
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
22+
* <p>
23+
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
24+
* <p>
25+
* Deref itself does not specify if a reference can be obtained more than once.
26+
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
27+
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
28+
* times.
29+
*
30+
* @param <T> The concrete type of reference counted object that can be obtained.
31+
*/
32+
public interface Deref<T extends Rc<T>> extends Supplier<T> {
33+
/**
34+
* Acquire a reference to the reference counted object.
35+
* <p>
36+
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
37+
* a {@link Rc#close()} call.
38+
* Using a try-with-resources clause is the easiest way to ensure this.
39+
*
40+
* @return A reference to the reference counted object.
41+
*/
42+
@Override
43+
T get();
44+
45+
/**
46+
* Determine if the object in this {@code Deref} is an instance of the given class.
47+
*
48+
* @param cls The type to check.
49+
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
50+
*/
51+
boolean isInstanceOf(Class<?> cls);
52+
}

src/main/java/io/netty/buffer/api/Rc.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*
2727
* @param <I> The concrete subtype.
2828
*/
29-
public interface Rc<I extends Rc<I>> extends AutoCloseable {
29+
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
3030
/**
3131
* Increment the reference count.
3232
* <p>
@@ -36,6 +36,16 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
3636
*/
3737
I acquire();
3838

39+
@Override
40+
default I get() {
41+
return acquire();
42+
}
43+
44+
@Override
45+
default boolean isInstanceOf(Class<?> cls) {
46+
return cls.isInstance(this);
47+
}
48+
3949
/**
4050
* Decrement the reference count, and despose of the resource if the last reference is closed.
4151
* <p>

src/main/java/io/netty/buffer/api/RcSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public final Send<I> send() {
7979
}
8080
var owned = prepareSend();
8181
acquires = -2; // Close without dropping. This also ignore future double-free attempts.
82-
return new TransferSend<I, T>(owned, drop);
82+
return new TransferSend<I, T>(owned, drop, getClass());
8383
}
8484

8585
/**

0 commit comments

Comments
 (0)