Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 6697840

Browse files
authored
Merge pull request #18 from netty/compact
Buffer Compaction
2 parents cc685c0 + 4036dac commit 6697840

File tree

4 files changed

+235
-35
lines changed

4 files changed

+235
-35
lines changed

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,12 +367,53 @@ default ByteCursor openReverseCursor() {
367367
* If this buffer already has the necessary space, then this method returns immediately.
368368
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
369369
* the buffer was created with.
370+
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
371+
* {@code false}.
370372
*
371373
* @param size The requested number of bytes of space that should be available for writing.
372374
* @throws IllegalStateException if this buffer is not in an owned state.
373-
* That is, if {@link #countBorrows()} is not {@code 0}.
375+
* That is, if {@link #isOwned()} is {@code false}.
374376
*/
375-
void ensureWritable(int size);
377+
default void ensureWritable(int size) {
378+
ensureWritable(size, true);
379+
}
380+
381+
/**
382+
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
383+
* bytes.
384+
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
385+
* If this buffer already has the necessary space, then this method returns immediately.
386+
* If this buffer does not already have the necessary space, then space will be made available in one or all of
387+
* the following available ways:
388+
*
389+
* <ul>
390+
* <li>
391+
* If {@code allowCompaction} is {@code true}, and sum of the read and writable bytes would be enough to
392+
* satisfy the request, and it (depending on the buffer implementation) seems faster and easier to compact
393+
* the existing buffer rather than allocation a new buffer, then the requested bytes will be made available
394+
* that way. The compaction will not necessarily work the same way as the {@link #compact()} method, as the
395+
* implementation may be able to make the requested bytes available with less effort than is strictly
396+
* mandated by the {@link #compact()} method.
397+
* </li>
398+
* <li>
399+
* Regardless of the value of the {@code allowCompaction}, the implementation may make more space available
400+
* by just allocating more or larger buffers. This allocation would use the same {@link Allocator} that this
401+
* buffer was created with.
402+
* </li>
403+
* <li>
404+
* If {@code allowCompaction} is {@code true}, then the implementation may choose to do a combination of
405+
* compaction and allocation.
406+
* </li>
407+
* </ul>
408+
*
409+
* @param size The requested number of bytes of space that should be available for writing.
410+
* @param allowCompaction {@code true} if the method is allowed to modify the
411+
* {@linkplain #readerOffset() reader offset} and
412+
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
413+
* @throws IllegalStateException if this buffer is not in an owned state.
414+
* That is, if {@link #isOwned()} is {@code false}.
415+
*/
416+
void ensureWritable(int size, boolean allowCompaction);
376417

377418
/**
378419
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
@@ -417,4 +458,11 @@ default ByteCursor openReverseCursor() {
417458
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
418459
*/
419460
Buf bifurcate();
461+
462+
/**
463+
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
464+
*
465+
* The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown.
466+
*/
467+
void compact();
420468
}

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.util.Arrays;
2121
import java.util.Objects;
2222

23+
import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset;
24+
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
25+
2326
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
2427
/**
2528
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
@@ -520,20 +523,54 @@ public int bytesLeft() {
520523
}
521524

522525
@Override
523-
public void ensureWritable(int size) {
526+
public void ensureWritable(int size, boolean allowCompaction) {
524527
if (!isOwned()) {
525528
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
526529
}
527530
if (size < 0) {
528531
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
529532
}
530-
if (writableBytes() < size) {
531-
long newSize = capacity() + (long) size;
532-
Allocator.checkSize(newSize);
533-
int growth = size - writableBytes();
534-
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
535-
unsafeExtendWith(extension);
533+
if (writableBytes() >= size) {
534+
// We already have enough space.
535+
return;
536536
}
537+
538+
if (allowCompaction && size <= roff) {
539+
// Let's see if we can solve some or all of the requested size with compaction.
540+
// We always compact as much as is possible, regardless of size. This amortizes our work.
541+
int compactableBuffers = 0;
542+
for (Buf buf : bufs) {
543+
if (buf.capacity() != buf.readerOffset()) {
544+
break;
545+
}
546+
compactableBuffers++;
547+
}
548+
if (compactableBuffers > 0) {
549+
Buf[] compactable;
550+
if (compactableBuffers < bufs.length) {
551+
compactable = new Buf[compactableBuffers];
552+
System.arraycopy(bufs, 0, compactable, 0, compactable.length);
553+
System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length);
554+
System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length);
555+
} else {
556+
compactable = bufs;
557+
}
558+
for (Buf buf : compactable) {
559+
buf.reset();
560+
}
561+
computeBufferOffsets();
562+
if (writableBytes() >= size) {
563+
// Now we have enough space.
564+
return;
565+
}
566+
}
567+
}
568+
569+
long newSize = capacity() + (long) size;
570+
Allocator.checkSize(newSize);
571+
int growth = size - writableBytes();
572+
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
573+
unsafeExtendWith(extension);
537574
}
538575

539576
void extendWith(Buf extension) {
@@ -600,6 +637,35 @@ public Buf bifurcate() {
600637
}
601638
}
602639

640+
@Override
641+
public void compact() {
642+
if (!isOwned()) {
643+
throw new IllegalStateException("Buffer must be owned in order to compact.");
644+
}
645+
int distance = roff;
646+
if (distance == 0) {
647+
return;
648+
}
649+
int pos = 0;
650+
var oldOrder = order;
651+
order = ByteOrder.BIG_ENDIAN;
652+
try {
653+
var cursor = openCursor();
654+
while (cursor.readLong()) {
655+
setLong(pos, cursor.getLong());
656+
pos += Long.BYTES;
657+
}
658+
while (cursor.readByte()) {
659+
setByte(pos, cursor.getByte());
660+
pos++;
661+
}
662+
} finally {
663+
order = oldOrder;
664+
}
665+
readerOffset(0);
666+
writerOffset(woff - distance);
667+
}
668+
603669
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
604670
@Override
605671
public byte readByte() {

src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -323,38 +323,50 @@ public int bytesLeft() {
323323
}
324324

325325
@Override
326-
public void ensureWritable(int size) {
326+
public void ensureWritable(int size, boolean allowCompaction) {
327327
if (!isOwned()) {
328328
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
329329
}
330330
if (size < 0) {
331331
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
332332
}
333-
if (writableBytes() < size) {
334-
long newSize = capacity() + size - (long) writableBytes();
335-
Allocator.checkSize(newSize);
336-
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
337-
var newSegment = recoverableMemory.segment;
338-
newSegment.copyFrom(seg);
339-
340-
// Release old memory segment:
341-
var drop = unsafeGetDrop();
342-
if (drop instanceof BifurcatedDrop) {
343-
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
344-
int roff = this.roff;
345-
int woff = this.woff;
346-
drop.drop(this);
347-
drop = ((BifurcatedDrop) drop).unwrap();
348-
unsafeSetDrop(drop);
349-
this.roff = roff;
350-
this.woff = woff;
351-
} else {
352-
alloc.recoverMemory(recoverableMemory());
353-
}
333+
if (writableBytes() >= size) {
334+
// We already have enough space.
335+
return;
336+
}
354337

355-
seg = newSegment;
356-
drop.attach(this);
338+
if (allowCompaction && writableBytes() + readerOffset() >= size) {
339+
// We can solve this with compaction.
340+
compact();
341+
return;
342+
}
343+
344+
// Allocate a bigger buffer.
345+
long newSize = capacity() + size - (long) writableBytes();
346+
Allocator.checkSize(newSize);
347+
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
348+
var newSegment = recoverableMemory.segment;
349+
350+
// Copy contents.
351+
newSegment.copyFrom(seg);
352+
353+
// Release old memory segment:
354+
var drop = unsafeGetDrop();
355+
if (drop instanceof BifurcatedDrop) {
356+
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
357+
int roff = this.roff;
358+
int woff = this.woff;
359+
drop.drop(this);
360+
drop = ((BifurcatedDrop) drop).unwrap();
361+
unsafeSetDrop(drop);
362+
this.roff = roff;
363+
this.woff = woff;
364+
} else {
365+
alloc.recoverMemory(recoverableMemory());
357366
}
367+
368+
seg = newSegment;
369+
drop.attach(this);
358370
}
359371

360372
@Override
@@ -384,6 +396,20 @@ public Buf bifurcate() {
384396
return bifurcatedBuf;
385397
}
386398

399+
@Override
400+
public void compact() {
401+
if (!isOwned()) {
402+
throw new IllegalStateException("Buffer must be owned in order to compact.");
403+
}
404+
int distance = roff;
405+
if (distance == 0) {
406+
return;
407+
}
408+
seg.copyFrom(seg.asSlice(roff, woff - roff));
409+
roff -= distance;
410+
woff -= distance;
411+
}
412+
387413
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
388414
@Override
389415
public byte readByte() {

src/test/java/io/netty/buffer/api/BufTest.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
383383
}
384384
}
385385

386-
private void verifyInaccessible(Buf buf) {
386+
private static void verifyInaccessible(Buf buf) {
387387
assertThrows(IllegalStateException.class, () -> buf.readByte());
388388
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
389389
assertThrows(IllegalStateException.class, () -> buf.readChar());
@@ -1689,6 +1689,29 @@ public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteO
16891689
}
16901690
}
16911691

1692+
@ParameterizedTest
1693+
@MethodSource("nonSliceAllocators")
1694+
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
1695+
try (Allocator allocator = fixture.createAllocator();
1696+
Buf buf = allocator.allocate(64)) {
1697+
while (buf.writableBytes() > 0) {
1698+
buf.writeByte((byte) 42);
1699+
}
1700+
while (buf.readableBytes() > 0) {
1701+
buf.readByte();
1702+
}
1703+
buf.ensureWritable(4, true);
1704+
buf.writeInt(42);
1705+
assertThat(buf.capacity()).isEqualTo(64);
1706+
1707+
buf.writerOffset(60).readerOffset(60);
1708+
buf.ensureWritable(8, true);
1709+
buf.writeLong(42);
1710+
// Don't assert the capacity on this one, because single-component
1711+
// composite buffers may choose to allocate rather than compact.
1712+
}
1713+
}
1714+
16921715
@ParameterizedTest
16931716
@MethodSource("allocators")
16941717
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@@ -2139,7 +2162,7 @@ public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
21392162
}
21402163
}
21412164

2142-
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
2165+
private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
21432166
try (Buf a = buf.bifurcate()) {
21442167
a.ensureWritable(4);
21452168
buf.ensureWritable(4);
@@ -2195,6 +2218,43 @@ public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws
21952218
}
21962219
}
21972220

2221+
@ParameterizedTest
2222+
@MethodSource("nonSliceAllocators")
2223+
public void compactMustDiscardReadBytes(Fixture fixture) {
2224+
try (Allocator allocator = fixture.createAllocator();
2225+
Buf buf = allocator.allocate(16, ByteOrder.BIG_ENDIAN)) {
2226+
buf.writeLong(0x0102030405060708L).writeInt(0x090A0B0C);
2227+
assertEquals(0x01020304, buf.readInt());
2228+
assertEquals(12, buf.writerOffset());
2229+
assertEquals(4, buf.readerOffset());
2230+
assertEquals(4, buf.writableBytes());
2231+
assertEquals(8, buf.readableBytes());
2232+
assertEquals(16, buf.capacity());
2233+
buf.compact();
2234+
assertEquals(8, buf.writerOffset());
2235+
assertEquals(0, buf.readerOffset());
2236+
assertEquals(8, buf.writableBytes());
2237+
assertEquals(8, buf.readableBytes());
2238+
assertEquals(16, buf.capacity());
2239+
assertEquals(0x05060708090A0B0CL, buf.readLong());
2240+
}
2241+
}
2242+
2243+
@ParameterizedTest
2244+
@MethodSource("nonSliceAllocators")
2245+
public void compactMustThrowForUnownedBuffer(Fixture fixture) {
2246+
try (Allocator allocator = fixture.createAllocator();
2247+
Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
2248+
buf.writeLong(0x0102030405060708L);
2249+
assertEquals((byte) 0x01, buf.readByte());
2250+
try (Buf ignore = buf.acquire()) {
2251+
assertThrows(IllegalStateException.class, () -> buf.compact());
2252+
assertEquals(1, buf.readerOffset());
2253+
}
2254+
assertEquals((byte) 0x02, buf.readByte());
2255+
}
2256+
}
2257+
21982258
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
21992259
@ParameterizedTest
22002260
@MethodSource("allocators")

0 commit comments

Comments
 (0)