Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 14d55c3

Browse files
committed
Avoid nesting composite buffers
Motivation: There is no reason that composite buffers should nest when composed. Instead, when composite buffers are used to compose or extend other composite buffers, we should unwrap them and copy the references to their constituent buffers. Modification: Composite buffers now always unwrap and flatten themselves when they participate in composition or extension of other composite buffers. Result: Composite buffers are now always guaranteed* to contain a single level of non-composed leaf buffers. *assuming no other unknown buffer-wrapping buffer type is in the mix.
1 parent 202dd54 commit 14d55c3

File tree

2 files changed

+125
-11
lines changed

2 files changed

+125
-11
lines changed

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
import java.nio.ByteBuffer;
2222
import java.nio.ByteOrder;
2323
import java.util.Arrays;
24+
import java.util.Collections;
25+
import java.util.IdentityHashMap;
2426
import java.util.Objects;
27+
import java.util.Set;
28+
import java.util.stream.Stream;
2529

2630
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
2731
/**
@@ -62,8 +66,25 @@ private static Buf[] filterExternalBufs(Buf[] bufs) {
6266
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
6367
// will never overflow their component counts.
6468
// Allocating a new array unconditionally also prevents external modification of the array.
65-
// TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers
66-
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
69+
bufs = Arrays.stream(bufs)
70+
.filter(b -> b.capacity() > 0)
71+
.flatMap(CompositeBuf::flattenBuffer)
72+
.toArray(Buf[]::new);
73+
// Make sure there are no duplicates among the buffers.
74+
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
75+
duplicatesCheck.addAll(Arrays.asList(bufs));
76+
if (duplicatesCheck.size() < bufs.length) {
77+
throw new IllegalArgumentException(
78+
"Cannot create composite buffer with duplicate constituent buffer components.");
79+
}
80+
return bufs;
81+
}
82+
83+
private static Stream<Buf> flattenBuffer(Buf buf) {
84+
if (buf instanceof CompositeBuf) {
85+
return Stream.of(((CompositeBuf) buf).bufs);
86+
}
87+
return Stream.of(buf);
6788
}
6889

6990
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
@@ -616,9 +637,6 @@ void extendWith(Buf extension) {
616637
if (!isOwned()) {
617638
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
618639
}
619-
if (extension == this) {
620-
throw new IllegalArgumentException("This buffer cannot be extended with itself.");
621-
}
622640
if (bufs.length > 0 && extension.order() != order()) {
623641
throw new IllegalArgumentException(
624642
"This buffer uses " + order() + " byte order, and cannot be extended with " +
@@ -639,14 +657,38 @@ void extendWith(Buf extension) {
639657
// overflow in their component counters.
640658
return;
641659
}
642-
// TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers
643660

644661
long newSize = capacity() + extensionCapacity;
645662
Allocator.checkSize(newSize);
646663

647664
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
648665
try {
649-
unsafeExtendWith(extension.acquire());
666+
if (extension instanceof CompositeBuf) {
667+
// If the extension is itself a composite buffer, then extend this one by all of the constituent
668+
// component buffers.
669+
CompositeBuf compositeExtension = (CompositeBuf) extension;
670+
Buf[] addedBuffers = compositeExtension.bufs;
671+
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
672+
duplicatesCheck.addAll(Arrays.asList(bufs));
673+
duplicatesCheck.addAll(Arrays.asList(addedBuffers));
674+
if (duplicatesCheck.size() < bufs.length + addedBuffers.length) {
675+
throw extensionDuplicatesException();
676+
}
677+
for (Buf addedBuffer : addedBuffers) {
678+
addedBuffer.acquire();
679+
}
680+
int extendAtIndex = bufs.length;
681+
bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length);
682+
System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length);
683+
computeBufferOffsets();
684+
} else {
685+
for (Buf buf : restoreTemp) {
686+
if (buf == extension) {
687+
throw extensionDuplicatesException();
688+
}
689+
}
690+
unsafeExtendWith(extension.acquire());
691+
}
650692
if (restoreTemp.length == 0) {
651693
order = extension.order();
652694
readOnly = extension.readOnly();
@@ -657,6 +699,12 @@ void extendWith(Buf extension) {
657699
}
658700
}
659701

702+
private static IllegalArgumentException extensionDuplicatesException() {
703+
return new IllegalArgumentException(
704+
"The composite buffer cannot be extended with the given extension," +
705+
" as it would cause the buffer to have duplicate constituent buffers.");
706+
}
707+
660708
private void unsafeExtendWith(Buf extension) {
661709
bufs = Arrays.copyOf(bufs, bufs.length + 1);
662710
bufs[bufs.length - 1] = extension;

src/test/java/io/netty/buffer/api/BufTest.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ void sliceWithZeroSizeMustBeAllowed(Fixture fixture) {
879879
}
880880

881881
@ParameterizedTest
882-
@MethodSource("allocators")
882+
@MethodSource("nonCompositeAllocators")
883883
public void acquireComposingAndSlicingMustIncrementBorrows(Fixture fixture) {
884884
try (Allocator allocator = fixture.createAllocator();
885885
Buf buf = allocator.allocate(8)) {
@@ -1628,6 +1628,53 @@ public void compositeBufferCanOnlyBeOwnedWhenAllConstituentBuffersAreOwned() {
16281628
}
16291629
}
16301630

1631+
@Test
1632+
public void compositeBuffersCannotHaveDuplicateComponents() {
1633+
try (Allocator allocator = Allocator.heap();
1634+
Buf a = allocator.allocate(4)) {
1635+
var e = assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, a));
1636+
assertThat(e).hasMessageContaining("duplicate");
1637+
1638+
try (Buf composite = allocator.compose(a)) {
1639+
a.close();
1640+
try {
1641+
e = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, a));
1642+
assertThat(e).hasMessageContaining("duplicate");
1643+
} finally {
1644+
a.acquire();
1645+
}
1646+
}
1647+
}
1648+
}
1649+
1650+
@Test
1651+
public void compositeBufferMustNotBeAllowedToContainThemselves() {
1652+
try (Allocator allocator = Allocator.heap()) {
1653+
Buf a = allocator.allocate(4);
1654+
Buf buf = allocator.compose(a);
1655+
try (buf; a) {
1656+
a.close();
1657+
try {
1658+
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, buf));
1659+
assertTrue(buf.isOwned());
1660+
try (Buf composite = allocator.compose(buf)) {
1661+
// the composing increments the reference count of constituent buffers...
1662+
// counter-act this so it can be extended:
1663+
a.close(); // buf is now owned so it can be extended.
1664+
try {
1665+
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, composite));
1666+
} finally {
1667+
a.acquire(); // restore the reference count to align with our try-with-resources structure.
1668+
}
1669+
}
1670+
assertTrue(buf.isOwned());
1671+
} finally {
1672+
a.acquire();
1673+
}
1674+
}
1675+
}
1676+
}
1677+
16311678
@ParameterizedTest
16321679
@MethodSource("allocators")
16331680
public void ensureWritableMustThrowForBorrowedBuffers(Fixture fixture) {
@@ -1852,13 +1899,32 @@ public void extendingCompositeBufferWithItselfMustThrow() {
18521899
}
18531900
try (composite) {
18541901
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
1855-
assertThat(exc).hasMessageContaining("itself");
1902+
assertThat(exc).hasMessageContaining("cannot be extended");
18561903
}
18571904
}
1905+
}
1906+
1907+
@Test
1908+
public void extendingWithZeroCapacityBufferHasNoEffect() {
18581909
try (Allocator allocator = Allocator.heap();
18591910
Buf composite = allocator.compose()) {
1860-
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
1861-
assertThat(exc).hasMessageContaining("itself");
1911+
Allocator.extend(composite, composite);
1912+
assertThat(composite.capacity()).isZero();
1913+
assertThat(composite.countComponents()).isZero();
1914+
}
1915+
try (Allocator allocator = Allocator.heap()) {
1916+
Buf a = allocator.allocate(1);
1917+
Buf composite = allocator.compose(a);
1918+
a.close();
1919+
assertTrue(composite.isOwned());
1920+
assertThat(composite.capacity()).isOne();
1921+
assertThat(composite.countComponents()).isOne();
1922+
try (Buf b = allocator.compose()) {
1923+
Allocator.extend(composite, b);
1924+
}
1925+
assertTrue(composite.isOwned());
1926+
assertThat(composite.capacity()).isOne();
1927+
assertThat(composite.countComponents()).isOne();
18621928
}
18631929
}
18641930

0 commit comments

Comments
 (0)