Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 41b3c02

Browse files
committed
Remove thread-confinement of Buffers
Motivation: Thread-confinement ends up being too confusing to code for, and also prevents some legitimate use cases. Additionally, thread-confinement exposed implementation specific behavioural differences of buffers, where we would ideally like all buffers to always behave the same, regardless of implementation. Modification: All MemorySegment based buffers now always use shared segments. For heap-based segments, we avoid the overhead associated with the closing of shared segments, by just not closing them, and instead just leave the whole thing for the GC to deal with. Result: Buffers can now always be accessed from multiple different threads at the same time.
1 parent 9afad3a commit 41b3c02

File tree

4 files changed

+52
-6
lines changed

4 files changed

+52
-6
lines changed

src/main/java/io/netty/buffer/api/ManagedAllocator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class ManagedAllocator implements Allocator, AllocatorControl {
3131
@Override
3232
public Buf allocate(int size) {
3333
Allocator.checkSize(size);
34-
return manager.allocateConfined(this, size, manager.drop(), cleaner);
34+
return manager.allocateShared(this, size, manager.drop(), cleaner);
3535
}
3636

3737
@Override
3838
public Object allocateUntethered(Buf originator, int size) {
3939
Allocator.checkSize(size);
40-
var buf = manager.allocateConfined(this, size, NO_OP_DROP, null);
40+
var buf = manager.allocateShared(this, size, NO_OP_DROP, null);
4141
return manager.unwrapRecoverableMemory(buf);
4242
}
4343

src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.netty.buffer.api.memseg;
1717

18+
import io.netty.buffer.api.Buf;
19+
import io.netty.buffer.api.Drop;
1820
import jdk.incubator.foreign.MemorySegment;
1921

2022
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@@ -27,4 +29,15 @@ public boolean isNative() {
2729
protected MemorySegment createSegment(long size) {
2830
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
2931
}
32+
33+
@Override
34+
public Drop<Buf> drop() {
35+
return convert(buf -> buf.makeInaccessible());
36+
}
37+
38+
@SuppressWarnings({ "unchecked", "UnnecessaryLocalVariable" })
39+
private static Drop<Buf> convert(Drop<MemSegBuf> drop) {
40+
Drop<?> tmp = drop;
41+
return (Drop<Buf>) tmp;
42+
}
3043
}

src/test/java/io/netty/buffer/api/BufTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Arrays;
3434
import java.util.List;
3535
import java.util.concurrent.ArrayBlockingQueue;
36+
import java.util.concurrent.ExecutionException;
3637
import java.util.concurrent.ExecutorService;
3738
import java.util.concurrent.Executors;
3839
import java.util.concurrent.Future;
@@ -465,6 +466,22 @@ public void bufferShouldNotBeAccessibleAfterClose(Fixture fixture) {
465466
}
466467
}
467468

469+
@ParameterizedTest
470+
@MethodSource("allocators")
471+
public void bufferMustNotBeThreadConfined(Fixture fixture) throws Exception {
472+
try (Allocator allocator = fixture.createAllocator();
473+
Buf buf = allocator.allocate(8)) {
474+
buf.writeInt(42);
475+
Future<Integer> fut = executor.submit(() -> buf.readInt());
476+
assertEquals(42, fut.get());
477+
fut = executor.submit(() -> {
478+
buf.writeInt(32);
479+
return buf.readInt();
480+
});
481+
assertEquals(32, fut.get());
482+
}
483+
}
484+
468485
@ParameterizedTest
469486
@MethodSource("initialAllocators")
470487
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {

src/test/java/io/netty/buffer/api/benchmarks/MemorySegmentClosedByCleanerBenchmark.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434

3535
import static java.util.concurrent.CompletableFuture.completedFuture;
3636

37-
@Warmup(iterations = 30, time = 1)
38-
@Measurement(iterations = 30, time = 1)
37+
@Warmup(iterations = 15, time = 1)
38+
@Measurement(iterations = 15, time = 1)
3939
@Fork(value = 5, jvmArgsAppend = { "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints" })
4040
@BenchmarkMode(Mode.AverageTime)
4141
@OutputTimeUnit(TimeUnit.MICROSECONDS)
4242
@State(Scope.Benchmark)
4343
public class MemorySegmentClosedByCleanerBenchmark {
44+
private static final Allocator heap = Allocator.heap();
45+
private static final Allocator heapPooled = Allocator.pooledHeap();
4446
private static final Allocator direct = Allocator.direct();
4547
private static final Allocator directPooled = Allocator.pooledDirect();
4648

@@ -60,14 +62,28 @@ public void setUp() {
6062
}
6163

6264
@Benchmark
63-
public Buf explicitClose() throws Exception {
65+
public Buf explicitCloseHeap() throws Exception {
66+
try (Buf buf = process(heap.allocate(256))) {
67+
return buf;
68+
}
69+
}
70+
71+
@Benchmark
72+
public Buf explicitPooledCloseHeap() throws Exception {
73+
try (Buf buf = process(heapPooled.allocate(256))) {
74+
return buf;
75+
}
76+
}
77+
78+
@Benchmark
79+
public Buf explicitCloseDirect() throws Exception {
6480
try (Buf buf = process(direct.allocate(256))) {
6581
return buf;
6682
}
6783
}
6884

6985
@Benchmark
70-
public Buf explicitPooledClose() throws Exception {
86+
public Buf explicitPooledCloseDirect() throws Exception {
7187
try (Buf buf = process(directPooled.allocate(256))) {
7288
return buf;
7389
}

0 commit comments

Comments
 (0)