Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 4036dac

Browse files
committed
Add a flag to allow Buf.ensureWritable to compact buffer
Motivation: The main use case with Buf.compact is in conjunction with ensureWritable. It turns out we can get a simpler API, and faster methods, by combining those two operations, because it allows us to relax some guarantees and skip some steps in certain cases, which wouldn't be as neat or clean if they were two separate steps. Modification: Add a new Buf.ensureWritable method, which takes an allowCompaction argument. In MemSegBuf, we can just delegate to compact() when applicable. In CompositeBuf, we can sometimes get away with just reorganising the bufs array. Result: We can now do ensureWritable without allocating in some cases, and this can in particular make the operation faster for CompositeBuf.
1 parent 252ac38 commit 4036dac

File tree

4 files changed

+145
-35
lines changed

4 files changed

+145
-35
lines changed

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,12 +367,53 @@ default ByteCursor openReverseCursor() {
367367
* If this buffer already has the necessary space, then this method returns immediately.
368368
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
369369
* the buffer was created with.
370+
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
371+
* {@code false}.
370372
*
371373
* @param size The requested number of bytes of space that should be available for writing.
372374
* @throws IllegalStateException if this buffer is not in an owned state.
373-
* That is, if {@link #countBorrows()} is not {@code 0}.
375+
* That is, if {@link #isOwned()} is {@code false}.
374376
*/
375-
void ensureWritable(int size);
377+
default void ensureWritable(int size) {
378+
ensureWritable(size, true);
379+
}
380+
381+
/**
382+
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
383+
* bytes.
384+
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
385+
* If this buffer already has the necessary space, then this method returns immediately.
386+
* If this buffer does not already have the necessary space, then space will be made available in one or all of
387+
* the following available ways:
388+
*
389+
* <ul>
390+
* <li>
391+
* If {@code allowCompaction} is {@code true}, and sum of the read and writable bytes would be enough to
392+
* satisfy the request, and it (depending on the buffer implementation) seems faster and easier to compact
393+
* the existing buffer rather than allocation a new buffer, then the requested bytes will be made available
394+
* that way. The compaction will not necessarily work the same way as the {@link #compact()} method, as the
395+
* implementation may be able to make the requested bytes available with less effort than is strictly
396+
* mandated by the {@link #compact()} method.
397+
* </li>
398+
* <li>
399+
* Regardless of the value of the {@code allowCompaction}, the implementation may make more space available
400+
* by just allocating more or larger buffers. This allocation would use the same {@link Allocator} that this
401+
* buffer was created with.
402+
* </li>
403+
* <li>
404+
* If {@code allowCompaction} is {@code true}, then the implementation may choose to do a combination of
405+
* compaction and allocation.
406+
* </li>
407+
* </ul>
408+
*
409+
* @param size The requested number of bytes of space that should be available for writing.
410+
* @param allowCompaction {@code true} if the method is allowed to modify the
411+
* {@linkplain #readerOffset() reader offset} and
412+
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
413+
* @throws IllegalStateException if this buffer is not in an owned state.
414+
* That is, if {@link #isOwned()} is {@code false}.
415+
*/
416+
void ensureWritable(int size, boolean allowCompaction);
376417

377418
/**
378419
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -523,20 +523,54 @@ public int bytesLeft() {
523523
}
524524

525525
@Override
526-
public void ensureWritable(int size) {
526+
public void ensureWritable(int size, boolean allowCompaction) {
527527
if (!isOwned()) {
528528
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
529529
}
530530
if (size < 0) {
531531
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
532532
}
533-
if (writableBytes() < size) {
534-
long newSize = capacity() + (long) size;
535-
Allocator.checkSize(newSize);
536-
int growth = size - writableBytes();
537-
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
538-
unsafeExtendWith(extension);
533+
if (writableBytes() >= size) {
534+
// We already have enough space.
535+
return;
536+
}
537+
538+
if (allowCompaction && size <= roff) {
539+
// Let's see if we can solve some or all of the requested size with compaction.
540+
// We always compact as much as is possible, regardless of size. This amortizes our work.
541+
int compactableBuffers = 0;
542+
for (Buf buf : bufs) {
543+
if (buf.capacity() != buf.readerOffset()) {
544+
break;
545+
}
546+
compactableBuffers++;
547+
}
548+
if (compactableBuffers > 0) {
549+
Buf[] compactable;
550+
if (compactableBuffers < bufs.length) {
551+
compactable = new Buf[compactableBuffers];
552+
System.arraycopy(bufs, 0, compactable, 0, compactable.length);
553+
System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length);
554+
System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length);
555+
} else {
556+
compactable = bufs;
557+
}
558+
for (Buf buf : compactable) {
559+
buf.reset();
560+
}
561+
computeBufferOffsets();
562+
if (writableBytes() >= size) {
563+
// Now we have enough space.
564+
return;
565+
}
566+
}
539567
}
568+
569+
long newSize = capacity() + (long) size;
570+
Allocator.checkSize(newSize);
571+
int growth = size - writableBytes();
572+
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
573+
unsafeExtendWith(extension);
540574
}
541575

542576
void extendWith(Buf extension) {

src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -323,38 +323,50 @@ public int bytesLeft() {
323323
}
324324

325325
@Override
326-
public void ensureWritable(int size) {
326+
public void ensureWritable(int size, boolean allowCompaction) {
327327
if (!isOwned()) {
328328
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
329329
}
330330
if (size < 0) {
331331
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
332332
}
333-
if (writableBytes() < size) {
334-
long newSize = capacity() + size - (long) writableBytes();
335-
Allocator.checkSize(newSize);
336-
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
337-
var newSegment = recoverableMemory.segment;
338-
newSegment.copyFrom(seg);
339-
340-
// Release old memory segment:
341-
var drop = unsafeGetDrop();
342-
if (drop instanceof BifurcatedDrop) {
343-
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
344-
int roff = this.roff;
345-
int woff = this.woff;
346-
drop.drop(this);
347-
drop = ((BifurcatedDrop) drop).unwrap();
348-
unsafeSetDrop(drop);
349-
this.roff = roff;
350-
this.woff = woff;
351-
} else {
352-
alloc.recoverMemory(recoverableMemory());
353-
}
333+
if (writableBytes() >= size) {
334+
// We already have enough space.
335+
return;
336+
}
354337

355-
seg = newSegment;
356-
drop.attach(this);
338+
if (allowCompaction && writableBytes() + readerOffset() >= size) {
339+
// We can solve this with compaction.
340+
compact();
341+
return;
342+
}
343+
344+
// Allocate a bigger buffer.
345+
long newSize = capacity() + size - (long) writableBytes();
346+
Allocator.checkSize(newSize);
347+
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
348+
var newSegment = recoverableMemory.segment;
349+
350+
// Copy contents.
351+
newSegment.copyFrom(seg);
352+
353+
// Release old memory segment:
354+
var drop = unsafeGetDrop();
355+
if (drop instanceof BifurcatedDrop) {
356+
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
357+
int roff = this.roff;
358+
int woff = this.woff;
359+
drop.drop(this);
360+
drop = ((BifurcatedDrop) drop).unwrap();
361+
unsafeSetDrop(drop);
362+
this.roff = roff;
363+
this.woff = woff;
364+
} else {
365+
alloc.recoverMemory(recoverableMemory());
357366
}
367+
368+
seg = newSegment;
369+
drop.attach(this);
358370
}
359371

360372
@Override

src/test/java/io/netty/buffer/api/BufTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
383383
}
384384
}
385385

386-
private void verifyInaccessible(Buf buf) {
386+
private static void verifyInaccessible(Buf buf) {
387387
assertThrows(IllegalStateException.class, () -> buf.readByte());
388388
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
389389
assertThrows(IllegalStateException.class, () -> buf.readChar());
@@ -1689,6 +1689,29 @@ public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteO
16891689
}
16901690
}
16911691

1692+
@ParameterizedTest
1693+
@MethodSource("nonSliceAllocators")
1694+
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
1695+
try (Allocator allocator = fixture.createAllocator();
1696+
Buf buf = allocator.allocate(64)) {
1697+
while (buf.writableBytes() > 0) {
1698+
buf.writeByte((byte) 42);
1699+
}
1700+
while (buf.readableBytes() > 0) {
1701+
buf.readByte();
1702+
}
1703+
buf.ensureWritable(4, true);
1704+
buf.writeInt(42);
1705+
assertThat(buf.capacity()).isEqualTo(64);
1706+
1707+
buf.writerOffset(60).readerOffset(60);
1708+
buf.ensureWritable(8, true);
1709+
buf.writeLong(42);
1710+
// Don't assert the capacity on this one, because single-component
1711+
// composite buffers may choose to allocate rather than compact.
1712+
}
1713+
}
1714+
16921715
@ParameterizedTest
16931716
@MethodSource("allocators")
16941717
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@@ -2139,7 +2162,7 @@ public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
21392162
}
21402163
}
21412164

2142-
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
2165+
private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
21432166
try (Buf a = buf.bifurcate()) {
21442167
a.ensureWritable(4);
21452168
buf.ensureWritable(4);

0 commit comments

Comments
 (0)