Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 1a741ba

Browse files
authored
Merge pull request #26 from netty/update-examples
Update examples
2 parents 6e24f5d + 2df000a commit 1a741ba

File tree

6 files changed

+292
-35
lines changed

6 files changed

+292
-35
lines changed

src/main/java/io/netty/buffer/api/Buf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ default void ensureWritable(int size) {
562562
* {@link ReadableComponentProcessor#process(int, ReadableComponent)} returned {@code false}.
563563
* In any case, the number of components processed may be less than {@link #countComponents()}.
564564
*/
565-
int forEachReadable(int initialIndex, ReadableComponentProcessor processor);
565+
<E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor) throws E;
566566

567567
/**
568568
* Process all writable components of this buffer, and return the number of components processed.
@@ -601,5 +601,5 @@ default void ensureWritable(int size) {
601601
* {@link WritableComponentProcessor#process(int, WritableComponent)} returned {@code false}.
602602
* In any case, the number of components processed may be less than {@link #countComponents()}.
603603
*/
604-
int forEachWritable(int initialIndex, WritableComponentProcessor processor);
604+
<E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor) throws E;
605605
}

src/main/java/io/netty/buffer/api/ComponentProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface ComponentProcessor {
2626
* A processor of {@linkplain ReadableComponent readable components}.
2727
*/
2828
@FunctionalInterface
29-
interface ReadableComponentProcessor extends ComponentProcessor {
29+
interface ReadableComponentProcessor<E extends Exception> extends ComponentProcessor {
3030
/**
3131
* Process the given component at the given index in the
3232
* {@link Buf#forEachReadable(int, ReadableComponentProcessor) iteration}.
@@ -41,14 +41,14 @@ interface ReadableComponentProcessor extends ComponentProcessor {
4141
* @return {@code true} if the iteration should continue and more components should be processed, otherwise
4242
* {@code false} to stop the iteration early.
4343
*/
44-
boolean process(int index, ReadableComponent component);
44+
boolean process(int index, ReadableComponent component) throws E;
4545
}
4646

4747
/**
4848
* A processor of {@linkplain WritableComponent writable components}.
4949
*/
5050
@FunctionalInterface
51-
interface WritableComponentProcessor extends ComponentProcessor {
51+
interface WritableComponentProcessor<E extends Exception> extends ComponentProcessor {
5252
/**
5353
* Process the given component at the given index in the
5454
* {@link Buf#forEachWritable(int, WritableComponentProcessor)} iteration}.
@@ -63,7 +63,7 @@ interface WritableComponentProcessor extends ComponentProcessor {
6363
* @return {@code true} if the iteration should continue and more components should be processed, otherwise
6464
* {@code false} to stop the iteration early.
6565
*/
66-
boolean process(int index, WritableComponent component);
66+
boolean process(int index, WritableComponent component) throws E;
6767
}
6868

6969
/**

src/main/java/io/netty/buffer/api/CompositeBuf.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,8 @@ public int countWritableComponents() {
801801
}
802802

803803
@Override
804-
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) {
804+
public <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor)
805+
throws E {
805806
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
806807
int visited = 0;
807808
for (Buf buf : bufs) {
@@ -819,7 +820,8 @@ public int forEachReadable(int initialIndex, ReadableComponentProcessor processo
819820
}
820821

821822
@Override
822-
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) {
823+
public <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor)
824+
throws E {
823825
checkWriteBounds(writerOffset(), Math.max(1, writableBytes()));
824826
int visited = 0;
825827
for (Buf buf : bufs) {

src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,15 @@ public int countWritableComponents() {
548548
}
549549

550550
@Override
551-
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) {
551+
public <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor)
552+
throws E {
552553
checkRead(readerOffset(), Math.max(1, readableBytes()));
553554
return processor.process(initialIndex, this)? 1 : -1;
554555
}
555556

556557
@Override
557-
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) {
558+
public <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor)
559+
throws E {
558560
checkWrite(writerOffset(), Math.max(1, writableBytes()));
559561
return processor.process(initialIndex, this)? 1 : -1;
560562
}

src/test/java/io/netty/buffer/api/examples/FileCopyExample.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.buffer.api.Buf;
2020
import io.netty.buffer.api.Send;
2121

22-
import java.nio.ByteBuffer;
2322
import java.nio.channels.FileChannel;
2423
import java.nio.file.Path;
2524
import java.util.concurrent.ArrayBlockingQueue;
@@ -34,27 +33,23 @@
3433
public final class FileCopyExample {
3534
public static void main(String[] args) throws Exception {
3635
ExecutorService executor = Executors.newFixedThreadPool(2);
37-
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(8);
38-
Object done = new Object();
36+
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(8);
3937
try (Allocator allocator = Allocator.pooledDirect();
4038
var input = FileChannel.open(Path.of("/dev/urandom"), READ);
4139
var output = FileChannel.open(Path.of("random.bin"), CREATE, TRUNCATE_EXISTING, WRITE)) {
40+
Send<Buf> done = allocator.compose().send();
4241

4342
var reader = executor.submit(() -> {
44-
var buf = ByteBuffer.allocateDirect(1024);
4543
for (int i = 0; i < 1024; i++) {
46-
buf.clear();
47-
while (buf.hasRemaining()) {
48-
int r = input.read(buf);
49-
System.out.println("r = " + r);
50-
System.out.println("buf = " + buf);
51-
}
52-
buf.flip();
5344
try (Buf in = allocator.allocate(1024)) {
5445
System.out.println("in = " + in);
55-
while (buf.hasRemaining()) {
56-
in.writeByte(buf.get());
57-
}
46+
in.forEachWritable(0, (index, component) -> {
47+
var bb = component.writableBuffer();
48+
while (bb.hasRemaining()) {
49+
input.read(bb);
50+
}
51+
return true;
52+
});
5853
System.out.println("Sending " + in.readableBytes() + " bytes.");
5954
queue.put(in.send());
6055
}
@@ -64,19 +59,17 @@ public static void main(String[] args) throws Exception {
6459
});
6560

6661
var writer = executor.submit(() -> {
67-
var buf = ByteBuffer.allocateDirect(1024);
68-
Object msg;
69-
while ((msg = queue.take()) != done) {
70-
buf.clear();
71-
@SuppressWarnings("unchecked")
72-
Send<Buf> send = (Send<Buf>) msg;
62+
Send<Buf> send;
63+
while ((send = queue.take()) != done) {
7364
try (Buf out = send.receive()) {
7465
System.out.println("Received " + out.readableBytes() + " bytes.");
75-
out.copyInto(0, buf, 0, out.readableBytes());
76-
buf.position(0).limit(out.readableBytes());
77-
}
78-
while (buf.hasRemaining()) {
79-
output.write(buf);
66+
out.forEachReadable(0, (index, component) -> {
67+
var bb = component.readableBuffer();
68+
while (bb.hasRemaining()) {
69+
output.write(bb);
70+
}
71+
return true;
72+
});
8073
}
8174
}
8275
output.force(true);

0 commit comments

Comments
 (0)