Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 46ed145

Browse files
committed
Add Buf.forEachWritable
Pass iteration indexes through.
1 parent d382017 commit 46ed145

File tree

6 files changed

+552
-57
lines changed

6 files changed

+552
-57
lines changed

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.nio.ByteOrder;
20-
import java.util.function.Consumer;
2120

2221
/**
2322
* A reference counted buffer of memory, with separate reader and writer offsets.
@@ -490,19 +489,74 @@ default void ensureWritable(int size) {
490489
*
491490
* @return The number of components in this buffer.
492491
*/
493-
int componentCount();
492+
int countComponents();
494493

495494
/**
496-
* Process all readable components of this buffer, and return the number of components consumed.
495+
* Get the number of "components" in this buffer, that are readable. These are the components that would be
496+
* processed by {@link #forEachReadable(int, ComponentProcessor)}. For composite buffers, this is the number of
497+
* transitive constituent buffers that are readable, while non-composite buffers only have at most one readable
498+
* component.
497499
* <p>
498-
* The number of components consumed may be less than the {@linkplain #componentCount() component count} if not all
499-
* of them have readable data.
500+
* The number of readable components may be less than the {@link #countComponents() component count}, if not all of
501+
* them have readable data.
500502
*
503+
* @return The number of readable components in this buffer.
504+
*/
505+
int countReadableComponents();
506+
507+
/**
508+
* Get the number of "components" in this buffer, that are writable. These are the components that would be
509+
* processed by {@link #forEachWritable(int, ComponentProcessor)}. For composite buffers, this is the number of
510+
* transitive constituent buffers that are writable, while non-composite buffers only have at most one writable
511+
* component.
512+
* <p>
513+
* The number of writable components may be less than the {@link #countComponents() component count}, if not all of
514+
* them have space for writing.
515+
*
516+
* @return The number of writable components in this buffer.
517+
*/
518+
int countWritableComponents();
519+
520+
/**
521+
* Process all readable components of this buffer, and return the number of components processed.
522+
* <p>
523+
* The given {@linkplain ComponentProcessor processor} is called for each component in this buffer, and passed a
524+
* component index, for the given component in the iteration, and a {@link Component} object for accessing the data
525+
* within the given component.
526+
* <p>
527+
* The component index is specific to the particular invokation of this method, and may change. The first call to
528+
* the consumer will be passed the given initial index, and the next call will be passed the initial index plus one,
529+
* and so on.
530+
* <p>
531+
* The {@link ComponentProcessor} may stop the iteration at any time by returning {@code false}. This may cause the
532+
* number of components processed to be returned as a negative number (to signal early return), and the number of
533+
* components processed may then be less than the {@linkplain #countReadableComponents() readable component count}.
534+
* <p>
501535
* <strong>Note</strong> that the {@link Component} instance passed to the consumer could be reused for multiple
502536
* calls, so the data must be extracted from the component in the context of the iteration.
537+
* <p>
538+
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
539+
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
540+
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
541+
* <p>
542+
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
543+
* or immediately after the iteration.
544+
* <p>
545+
* <strong>Note</strong> that the arrays, memory addresses, and byte buffers exposed as components by this method,
546+
* should not be used for changing the buffer contents. Doing so may cause undefined behaviour.
547+
* <p>
548+
* Changes to position and limit of the byte buffers exposed via the processed components, are not reflected back to
549+
* this buffer instance.
503550
*
504-
* @param consumer The consumer that will be used to process the buffer components.
505-
* @return The number of readable components processed, which may be less than {@link #componentCount()}.
551+
* @param initialIndex The initial index of the iteration, and the index that will be passed to the first call to
552+
* the {@linkplain ComponentProcessor#process(int, Component) processor}.
553+
* @param processor The processor that will be used to process the buffer components.
554+
* @return The number of readable components processed, as a positive number of all readable components were
555+
* processed, or as a negative number if the iteration was stopped because
556+
* {@link ComponentProcessor#process(int, Component)} returned {@code false}.
557+
* In any case, the number of components processed may be less than {@link #countComponents()}.
506558
*/
507-
int forEachReadable(Consumer<Component> consumer);
559+
int forEachReadable(int initialIndex, ComponentProcessor processor);
560+
561+
int forEachWritable(int initialIndex, ComponentProcessor processor);
508562
}

src/main/java/io/netty/buffer/api/Component.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
package io.netty.buffer.api;
1717

1818
import java.nio.ByteBuffer;
19-
import java.util.function.Consumer;
2019

2120
/**
22-
* A view onto the buffer component being processed in a given iteration of {@link Buf#forEachReadable(Consumer)}.
21+
* A view onto the buffer component being processed in a given iteration of
22+
* {@link Buf#forEachReadable(int, ComponentProcessor)}, or {@link Buf#forEachWritable(int, ComponentProcessor)}.
2323
* <p>
2424
* Instances of this interface are allowed to be mutable behind the scenes, and the data is only guaranteed to be
2525
* consistent within the given iteration.
@@ -28,30 +28,44 @@ public interface Component {
2828

2929
/**
3030
* Check if this component is backed by a cached byte array than can be accessed cheaply.
31+
* <p>
32+
* <strong>Note</strong> that regardless of what this method returns, the array should not be used to modify the
33+
* contents of this buffer component.
3134
*
3235
* @return {@code true} if {@link #array()} is a cheap operation, otherwise {@code false}.
3336
*/
34-
boolean hasCachedArray();
37+
boolean hasArray();
3538

3639
/**
3740
* Get a byte array of the contents of this component.
3841
* <p>
3942
* <strong>Note</strong> that the array is meant to be read-only. It may either be a direct reference to the
4043
* concrete array instance that is backing this component, or it is a fresh copy.
44+
* Writing to the array may produce undefined behaviour.
4145
*
4246
* @return A byte array of the contents of this component.
4347
*/
4448
byte[] array();
4549

50+
int arrayOffset();
51+
4652
/**
4753
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
54+
* <p>
55+
* <strong>Note</strong> that the address should not be used for writing to the buffer memory, and doing so may
56+
* produce undefined behaviour.
57+
*
4858
* @return The native memory address, if any, otherwise 0.
4959
*/
5060
long nativeAddress();
5161

5262
/**
53-
* Build a {@link ByteBuffer} instance for this memory component.
63+
* Get a {@link ByteBuffer} instance for this memory component.
64+
* <p>
65+
* <strong>Note</strong> that the {@link ByteBuffer} is read-only, to prevent write accesses to the memory,
66+
* when the buffer component is obtained through {@link Buf#forEachReadable(int, ComponentProcessor)}.
67+
*
5468
* @return A new {@link ByteBuffer} for this memory component.
5569
*/
56-
ByteBuffer byteBuffer();
70+
ByteBuffer buffer();
5771
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.buffer.api;
17+
18+
@FunctionalInterface
19+
public interface ComponentProcessor {
20+
boolean process(int index, Component component);
21+
}

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.nio.ByteOrder;
2020
import java.util.Arrays;
2121
import java.util.Objects;
22-
import java.util.function.Consumer;
2322

2423
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
2524
/**
@@ -49,7 +48,12 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
4948
private boolean readOnly;
5049

5150
CompositeBuf(Allocator allocator, Buf[] bufs) {
52-
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
51+
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
52+
}
53+
54+
private static Buf[] filterExternalBufs(Buf[] bufs) {
55+
// Allocating a new array unconditionally also prevents external modification of the array.
56+
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
5357
}
5458

5559
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
@@ -617,7 +621,14 @@ void extendWith(Buf extension) {
617621
(extension.readOnly()? "read-only." : "writable."));
618622
}
619623

620-
long newSize = capacity() + (long) extension.capacity();
624+
long extensionCapacity = extension.capacity();
625+
if (extensionCapacity == 0) {
626+
// Extending by a zero-sized buffer makes no difference. Especially since it's not allowed to change the
627+
// capacity of buffers that are constiuents of composite buffers.
628+
return;
629+
}
630+
631+
long newSize = capacity() + extensionCapacity;
621632
Allocator.checkSize(newSize);
622633

623634
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
@@ -702,21 +713,63 @@ public void compact() {
702713
}
703714

704715
@Override
705-
public int componentCount() {
716+
public int countComponents() {
706717
int sum = 0;
707718
for (Buf buf : bufs) {
708-
sum += buf.componentCount();
719+
sum += buf.countComponents();
709720
}
710721
return sum;
711722
}
712723

713724
@Override
714-
public int forEachReadable(Consumer<Component> consumer) {
725+
public int countReadableComponents() {
726+
int sum = 0;
727+
for (Buf buf : bufs) {
728+
sum += buf.countReadableComponents();
729+
}
730+
return sum;
731+
}
732+
733+
@Override
734+
public int countWritableComponents() {
735+
int sum = 0;
736+
for (Buf buf : bufs) {
737+
sum += buf.countWritableComponents();
738+
}
739+
return sum;
740+
}
741+
742+
@Override
743+
public int forEachReadable(int initialIndex, ComponentProcessor processor) {
715744
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
716745
int visited = 0;
717746
for (Buf buf : bufs) {
718747
if (buf.readableBytes() > 0) {
719-
visited += buf.forEachReadable(consumer);
748+
int count = buf.forEachReadable(visited + initialIndex, processor);
749+
if (count > 0) {
750+
visited += count;
751+
} else {
752+
visited = -visited + count;
753+
break;
754+
}
755+
}
756+
}
757+
return visited;
758+
}
759+
760+
@Override
761+
public int forEachWritable(int initialIndex, ComponentProcessor processor) {
762+
checkWriteBounds(writerOffset(), Math.max(1, writableBytes()));
763+
int visited = 0;
764+
for (Buf buf : bufs) {
765+
if (buf.writableBytes() > 0) {
766+
int count = buf.forEachWritable(visited + initialIndex, processor);
767+
if (count > 0) {
768+
visited += count;
769+
} else {
770+
visited = -visited + count;
771+
break;
772+
}
720773
}
721774
}
722775
return visited;

0 commit comments

Comments
 (0)