Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 253b6cb

Browse files
committed
Allow slices to obtain ownership when parent is closed
Motivation: It is kind of a weird internal and hidden state, that slices were special. For instance, slices could not be sent, and they could never obtain ownership. This means buffers from slices behaved differently from allocated buffers. In doing so, they violated both the principle that magic should stay hidden, and the principle of consistent behaviour. Modification: - The special reference-counting drop implementation that was added to support bifurcation, has been renamed to ArcDrop (for atomic reference counting). - The ArcDrop is then used throughout the MemSegBuffer implementation to account for every instance where multiple buffers reference the same memory, e.g. slices and the like. - Borrows of a buffer is then the sum of borrows from the buffer itself, and its ArcDrop. - Ownership is thus tied to both the buffer itself being owned, and the ArcDrop being in an owned state. - SizeClassedMemoryPool is changed to pool recoverable memory instead of sends, because the sends could come from slices. - We also take care to keep around a "base" memory segment, so that we don't return memory segment slices to the memory pool (doing so would leak the memory from the parent segment that is not part of the slice). - CleanerPooledDrop now keeps a weak reference to itself, rather than the buffer, which is more correct anyway, but now also required because we cannot rely on the buffer reference the cleaner was created with. - The CleanerPooledDrop now takes care to drop the buffer that is actually passed to it, rather than what it was referencing from some earlier point. - MemoryManager can now disclose the size of recoverable memory, so that SizeClassedMemoryPool can pick the correct size pool to return memory to. It cannot rely on the passed down buffer instance for this, because that buffer might have been a slice. Result: It is now possible for slices to obtain ownership when their parent buffer is closed.
1 parent 374b052 commit 253b6cb

File tree

8 files changed

+202
-93
lines changed

8 files changed

+202
-93
lines changed

src/main/java/io/netty/buffer/api/CleanerPooledDrop.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public void drop(Buffer buf) {
4545
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
4646
if (c != null) {
4747
c.clean();
48+
delegate.drop(buf);
4849
}
4950
}
5051

@@ -57,24 +58,38 @@ public void attach(Buffer buf) {
5758
c.clean();
5859
}
5960

60-
var pool = this.pool;
6161
var mem = manager.unwrapRecoverableMemory(buf);
62-
var delegate = this.delegate;
63-
WeakReference<Buffer> ref = new WeakReference<>(buf);
62+
WeakReference<CleanerPooledDrop> ref = new WeakReference<>(this);
6463
AtomicBoolean gate = new AtomicBoolean(true);
65-
cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> {
64+
cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate)));
65+
}
66+
67+
private static final class CleanAction implements Runnable {
68+
private final SizeClassedMemoryPool pool;
69+
private final Object mem;
70+
private final WeakReference<CleanerPooledDrop> ref;
71+
private final AtomicBoolean gate;
72+
73+
private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference<CleanerPooledDrop> ref,
74+
AtomicBoolean gate) {
75+
this.pool = pool;
76+
this.mem = mem;
77+
this.ref = ref;
78+
this.gate = gate;
79+
}
80+
81+
@Override
82+
public void run() {
6683
if (gate.getAndSet(false)) {
67-
Buffer b = ref.get();
68-
if (b == null) {
84+
var monitored = ref.get();
85+
if (monitored == null) {
6986
pool.recoverMemory(mem);
70-
} else {
71-
delegate.drop(b);
7287
}
7388
}
74-
}));
89+
}
7590
}
7691

77-
private static class GatedCleanable implements Cleanable {
92+
private static final class GatedCleanable implements Cleanable {
7893
private final AtomicBoolean gate;
7994
private final Cleanable cleanable;
8095

src/main/java/io/netty/buffer/api/MemoryManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ static MemoryManager getNativeMemoryManager() {
3434
Buffer allocateShared(AllocatorControl allo, long size, Drop<Buffer> drop, Cleaner cleaner);
3535
Drop<Buffer> drop();
3636
Object unwrapRecoverableMemory(Buffer buf);
37+
int capacityOfRecoverableMemory(Object memory);
3738
Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop);
3839
}

src/main/java/io/netty/buffer/api/RcSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected RcSupport(Drop<T> drop) {
3737
@Override
3838
public final I acquire() {
3939
if (acquires < 0) {
40-
throw attachTrace(new IllegalStateException("Resource is closed."));
40+
throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.'));
4141
}
4242
if (acquires == Integer.MAX_VALUE) {
4343
throw new IllegalStateException("Cannot acquire more references; counter would overflow.");

src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
2828
private static final VarHandle CLOSE = Statics.findVarHandle(
2929
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
3030
private final MemoryManager manager;
31-
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Send<Buffer>>> pool;
31+
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Object>> pool;
3232
@SuppressWarnings("unused")
3333
private volatile boolean closed;
3434

@@ -41,9 +41,9 @@ protected SizeClassedMemoryPool(MemoryManager manager) {
4141
public Buffer allocate(int size) {
4242
BufferAllocator.checkSize(size);
4343
var sizeClassPool = getSizeClassPool(size);
44-
Send<Buffer> send = sizeClassPool.poll();
45-
if (send != null) {
46-
return send.receive()
44+
Object memory = sizeClassPool.poll();
45+
if (memory != null) {
46+
return recoverMemoryIntoBuffer(memory)
4747
.reset()
4848
.readOnly(false)
4949
.fill((byte) 0)
@@ -71,10 +71,10 @@ public void close() {
7171
if (CLOSE.compareAndSet(this, false, true)) {
7272
var capturedExceptions = new ArrayList<Exception>(4);
7373
pool.forEach((k, v) -> {
74-
Send<Buffer> send;
75-
while ((send = v.poll()) != null) {
74+
Object memory;
75+
while ((memory = v.poll()) != null) {
7676
try {
77-
send.receive().close();
77+
dispose(recoverMemoryIntoBuffer(memory));
7878
} catch (Exception e) {
7979
capturedExceptions.add(e);
8080
}
@@ -94,40 +94,42 @@ public void drop(Buffer buf) {
9494
dispose(buf);
9595
return;
9696
}
97-
var sizeClassPool = getSizeClassPool(buf.capacity());
98-
sizeClassPool.offer(buf.send());
97+
Object mem = manager.unwrapRecoverableMemory(buf);
98+
var sizeClassPool = getSizeClassPool(manager.capacityOfRecoverableMemory(mem));
99+
sizeClassPool.offer(mem);
99100
if (closed) {
100-
Send<Buffer> send;
101-
while ((send = sizeClassPool.poll()) != null) {
102-
send.receive().close();
101+
Object memory;
102+
while ((memory = sizeClassPool.poll()) != null) {
103+
dispose(recoverMemoryIntoBuffer(memory));
103104
}
104105
}
105106
}
106107

107108
@Override
108109
public Object allocateUntethered(Buffer originator, int size) {
109110
var sizeClassPool = getSizeClassPool(size);
110-
Send<Buffer> send = sizeClassPool.poll();
111-
Buffer untetheredBuf;
112-
if (send != null) {
113-
var transfer = (TransferSend<Buffer, Buffer>) send;
114-
var owned = transfer.unsafeUnwrapOwned();
115-
untetheredBuf = owned.transferOwnership(NO_OP_DROP);
116-
} else {
117-
untetheredBuf = createBuf(size, NO_OP_DROP);
111+
Object memory = sizeClassPool.poll();
112+
if (memory == null) {
113+
Buffer untetheredBuf = createBuf(size, NO_OP_DROP);
114+
memory = manager.unwrapRecoverableMemory(untetheredBuf);
118115
}
119-
return manager.unwrapRecoverableMemory(untetheredBuf);
116+
return memory;
120117
}
121118

122119
@Override
123120
public void recoverMemory(Object memory) {
121+
Buffer buf = recoverMemoryIntoBuffer(memory);
122+
buf.close();
123+
}
124+
125+
private Buffer recoverMemoryIntoBuffer(Object memory) {
124126
var drop = getDrop();
125127
var buf = manager.recoverMemory(memory, drop);
126128
drop.attach(buf);
127-
buf.close();
129+
return buf;
128130
}
129131

130-
private ConcurrentLinkedQueue<Send<Buffer>> getSizeClassPool(int size) {
132+
private ConcurrentLinkedQueue<Object> getSizeClassPool(int size) {
131133
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
132134
}
133135

src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Buffer allocateConfined(AllocatorControl alloc, long size, Drop<Buffer> d
3434
if (cleaner != null) {
3535
segment = segment.registerCleaner(cleaner);
3636
}
37-
return new MemSegBuffer(segment, convert(drop), alloc);
37+
return new MemSegBuffer(segment, segment, convert(drop), alloc);
3838
}
3939

4040
@Override
@@ -43,7 +43,7 @@ public Buffer allocateShared(AllocatorControl alloc, long size, Drop<Buffer> dro
4343
if (cleaner != null) {
4444
segment = segment.registerCleaner(cleaner);
4545
}
46-
return new MemSegBuffer(segment, convert(drop), alloc);
46+
return new MemSegBuffer(segment, segment, convert(drop), alloc);
4747
}
4848

4949
protected abstract MemorySegment createSegment(long size);
@@ -59,6 +59,11 @@ public Object unwrapRecoverableMemory(Buffer buf) {
5959
return b.recoverableMemory();
6060
}
6161

62+
@Override
63+
public int capacityOfRecoverableMemory(Object memory) {
64+
return ((RecoverableMemory) memory).capacity();
65+
}
66+
6267
@Override
6368
public Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop) {
6469
var recovery = (RecoverableMemory) recoverableMemory;

src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java renamed to src/main/java/io/netty/buffer/api/memseg/ArcDrop.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,47 @@
2020
import java.lang.invoke.MethodHandles;
2121
import java.lang.invoke.VarHandle;
2222

23-
class BifurcatedDrop implements Drop<MemSegBuffer> {
23+
final class ArcDrop implements Drop<MemSegBuffer> {
2424
private static final VarHandle COUNT;
2525
static {
2626
try {
27-
COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class);
27+
COUNT = MethodHandles.lookup().findVarHandle(ArcDrop.class, "count", int.class);
2828
} catch (Exception e) {
2929
throw new ExceptionInInitializerError(e);
3030
}
3131
}
3232

33-
private final MemSegBuffer originalBuf;
3433
private final Drop<MemSegBuffer> delegate;
3534
@SuppressWarnings("FieldMayBeFinal")
3635
private volatile int count;
3736

38-
BifurcatedDrop(MemSegBuffer originalBuf, Drop<MemSegBuffer> delegate) {
39-
this.originalBuf = originalBuf;
37+
ArcDrop(Drop<MemSegBuffer> delegate) {
4038
this.delegate = delegate;
41-
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
39+
count = 1;
4240
}
4341

44-
void increment() {
42+
static Drop<MemSegBuffer> wrap(Drop<MemSegBuffer> drop) {
43+
if (drop.getClass() == ArcDrop.class) {
44+
return drop;
45+
}
46+
return new ArcDrop(drop);
47+
}
48+
49+
static Drop<MemSegBuffer> acquire(Drop<MemSegBuffer> drop) {
50+
if (drop.getClass() == ArcDrop.class) {
51+
((ArcDrop) drop).increment();
52+
return drop;
53+
}
54+
return new ArcDrop(drop);
55+
}
56+
57+
ArcDrop increment() {
4558
int c;
4659
do {
4760
c = count;
4861
checkValidState(c);
4962
} while (!COUNT.compareAndSet(this, c, c + 1));
63+
return this;
5064
}
5165

5266
@Override
@@ -59,17 +73,23 @@ public void drop(MemSegBuffer buf) {
5973
checkValidState(c);
6074
} while (!COUNT.compareAndSet(this, c, n));
6175
if (n == 0) {
62-
delegate.attach(originalBuf);
63-
delegate.drop(originalBuf);
76+
delegate.drop(buf);
6477
}
65-
buf.makeInaccessible();
6678
}
6779

6880
@Override
6981
public void attach(MemSegBuffer obj) {
7082
delegate.attach(obj);
7183
}
7284

85+
boolean isOwned() {
86+
return count <= 1;
87+
}
88+
89+
int countBorrows() {
90+
return count - 1;
91+
}
92+
7393
Drop<MemSegBuffer> unwrap() {
7494
return delegate;
7595
}

0 commit comments

Comments
 (0)