Skip to content

Commit 9b7be94

Browse files
committed
ReadStream can be adapted to a Java blocking stream.
Motivation: The Vert.x virtual stream programming model provides async/await to interact with asynchronous futures, yet the stream approach has been left naked. Changes: A new ReadStream blockingStream default method is added to convert a ReadStream to a java.util.stream.Stream. This blocking stream can be used from a Vert.x virtual thread or a non Vert.x thread. When used from a Vert.x virtual thread, each blocking interaction with the stream suspends the virtual thread and release the context ownership to let other tasks process tasks, allowing the context execution to happen and handle pending context tasks.
1 parent c85baca commit 9b7be94

File tree

6 files changed

+194
-22
lines changed

6 files changed

+194
-22
lines changed

vertx-core/src/main/asciidoc/virtualthreads.adoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ You still write the traditional Vert.x code processing events, but you have the
66

77
=== Introduction
88

9-
The non-blocking nature of Vert.x leads to asynchronous APIs.
9+
The non-blocking nature of Vert.x leads to asynchronous APIs.
1010
Asynchronous APIs can take various forms including callback style, promises and reactive extensions.
1111

1212
In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do sequentially.
@@ -57,6 +57,13 @@ or on a JDK `CompletionStage`
5757
{@link examples.VirtualThreadExamples#awaitingFutures2}
5858
----
5959

60+
You can also transform a Vert.x `ReadStream` to a Java blocking stream:
61+
62+
[source,java]
63+
----
64+
{@link examples.VirtualThreadExamples#blockingStream}
65+
----
66+
6067
==== Field visibility
6168

6269
A virtual thread verticle can interact safely with fields before an `await` call. However, if you are reading a field before an `await` call and reusing the value after the call, you should keep in mind that this value might have changed.

vertx-core/src/main/java/examples/VirtualThreadExamples.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.vertx.docgen.Source;
1717

1818
import java.util.concurrent.CompletionStage;
19+
import java.util.stream.Stream;
1920

2021
@Source
2122
public class VirtualThreadExamples {
@@ -93,6 +94,18 @@ public void awaitingFutures2(HttpClientResponse response, CompletionStage<Buffer
9394
Buffer body = Future.fromCompletionStage(completionStage).await();
9495
}
9596

97+
public void blockingStream(HttpServer server) {
98+
server.requestHandler(request -> {
99+
Stream<Buffer> blockingStream = request.blockingStream();
100+
HttpServerResponse response = request.response();
101+
response.setChunked(true);
102+
blockingStream
103+
.map(buff -> "" + buff.length())
104+
.forEach(size -> response.write(size));
105+
response.end();
106+
});
107+
}
108+
96109
private Future<String> getRemoteString() {
97110
return null;
98111
}

vertx-core/src/main/java/io/vertx/core/internal/streams/ReadStreamIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public boolean hasNext() {
116116
return true;
117117
}
118118
if (ended != null) {
119-
return false;
119+
return ended != END_SENTINEL;
120120
}
121121
awaitProgress();
122122
}

vertx-core/src/main/java/io/vertx/core/streams/ReadStream.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@
1919
import io.vertx.core.Handler;
2020
import io.vertx.core.Promise;
2121
import io.vertx.core.internal.PromiseInternal;
22+
import io.vertx.core.internal.streams.ReadStreamIterator;
2223
import io.vertx.core.streams.impl.PipeImpl;
2324

25+
import java.util.Iterator;
26+
import java.util.Spliterator;
27+
import java.util.Spliterators;
2428
import java.util.function.BiConsumer;
29+
import java.util.stream.Stream;
30+
import java.util.stream.StreamSupport;
2531

2632
/**
2733
* Represents a stream of items that can be read from.
@@ -135,6 +141,18 @@ default <R, A> Future<R> collect(java.util.stream.Collector<T , A , R> collector
135141
return promise.future();
136142
}
137143

144+
/**
145+
* Adapt this {@code ReadStream} to a blocking sequential {@code Stream}, the return stream usage is restricted to
146+
* non vertx threads or vertx virtual threads.
147+
*
148+
* @return a blocking stream
149+
*/
150+
@GenIgnore
151+
default Stream<T> blockingStream() {
152+
Iterator<T> iterator = ReadStreamIterator.iterator(this);
153+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
154+
}
155+
138156
/**
139157
* Pipe this {@code ReadStream} to the {@code WriteStream}.
140158
* <p>

vertx-core/src/test/java/io/vertx/tests/http/VirtualThreadHttpTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@
2121
import org.junit.Test;
2222

2323
import java.nio.charset.StandardCharsets;
24+
import java.util.ArrayDeque;
25+
import java.util.ArrayList;
26+
import java.util.Deque;
27+
import java.util.List;
2428
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.IntStream;
32+
import java.util.stream.StreamSupport;
2533

2634
public class VirtualThreadHttpTest extends VertxTestBase {
2735

@@ -87,6 +95,43 @@ public void testHttpClient2() throws Exception {
8795
}
8896
}
8997

98+
@Test
99+
public void testHttpClient3() throws Exception {
100+
Assume.assumeTrue(isVirtualThreadAvailable());
101+
HttpServer server = vertx.createHttpServer();
102+
int numChunks = 10;
103+
List<String> expected = IntStream.range(0, numChunks).mapToObj(idx -> "chunk-" + idx).collect(Collectors.toList());
104+
server.requestHandler(req -> {
105+
HttpServerResponse response = req.response();
106+
response.setChunked(true);
107+
Deque<String> toSend = new ArrayDeque<>(expected);
108+
vertx.setPeriodic(10, id -> {
109+
String chunk = toSend.poll();
110+
if (chunk != null) {
111+
response.write(chunk);
112+
} else {
113+
vertx.cancelTimer(id);
114+
response.end();
115+
}
116+
});
117+
});
118+
server.listen(8088, "localhost").await(10, TimeUnit.SECONDS);
119+
vertx.createVirtualThreadContext().runOnContext(v -> {
120+
HttpClient client = vertx.createHttpClient();
121+
for (int i = 0; i < 10; ++i) {
122+
HttpClientRequest req = client.request(HttpMethod.GET, 8088, "localhost", "/").await();
123+
HttpClientResponse resp = req.send().await();
124+
List<String> chunks = new ArrayList<>();
125+
resp.blockingStream().forEach(chunk -> {
126+
chunks.add(chunk.toString());
127+
});
128+
assertEquals(expected, chunks);
129+
}
130+
testComplete();
131+
});
132+
await();
133+
}
134+
90135
@Test
91136
public void testHttpClientTimeout() throws Exception {
92137
Assume.assumeTrue(isVirtualThreadAvailable());

vertx-core/src/test/java/io/vertx/tests/streams/IteratorTest.java

Lines changed: 109 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,23 @@
1616
import io.vertx.core.internal.VertxInternal;
1717
import io.vertx.core.internal.streams.ReadStreamIterator;
1818
import io.vertx.core.streams.ReadStream;
19-
import io.vertx.test.core.AsyncTestBase;
2019
import io.vertx.test.core.Repeat;
21-
import io.vertx.test.core.RepeatRule;
20+
import io.vertx.test.core.VertxTestBase;
2221
import io.vertx.test.fakestream.FakeStream;
23-
import org.junit.Rule;
2422
import org.junit.Assume;
2523
import org.junit.Test;
2624

27-
import java.util.ArrayList;
28-
import java.util.Iterator;
29-
import java.util.List;
30-
import java.util.NoSuchElementException;
31-
import java.util.concurrent.CountDownLatch;
25+
import java.util.*;
3226
import java.util.concurrent.CyclicBarrier;
27+
import java.util.concurrent.Executor;
3328
import java.util.concurrent.atomic.AtomicInteger;
3429
import java.util.concurrent.locks.Condition;
3530
import java.util.concurrent.locks.Lock;
3631
import java.util.concurrent.locks.ReentrantLock;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
3734

38-
public class IteratorTest extends AsyncTestBase {
35+
public class IteratorTest extends VertxTestBase {
3936

4037
@Test
4138
public void testIteratorResuming() {
@@ -87,7 +84,7 @@ public void testFail() {
8784
assertTrue(iterator.hasNext());
8885
iterator.next();
8986
}
90-
assertFalse(iterator.hasNext());
87+
assertTrue(iterator.hasNext());
9188
try {
9289
iterator.next();
9390
fail();
@@ -114,9 +111,6 @@ public void testHasNextSignal() throws Exception {
114111
}
115112
}
116113

117-
@Rule
118-
public RepeatRule rule = new RepeatRule();
119-
120114
@Repeat(times = 100)
121115
@Test
122116
public void testConcurrentReads() throws Exception {
@@ -286,13 +280,9 @@ public void run() {
286280

287281
@Test
288282
public void testVirtualThread() {
289-
VertxInternal vertx = (VertxInternal) Vertx.vertx();
290-
try {
291-
Assume.assumeTrue(vertx.isVirtualThreadAvailable());
292-
doTestVirtualThread(vertx);
293-
} finally {
294-
vertx.close();
295-
}
283+
VertxInternal vertx = (VertxInternal) this.vertx;
284+
Assume.assumeTrue(vertx.isVirtualThreadAvailable());
285+
doTestVirtualThread(vertx);
296286
}
297287

298288
private void doTestVirtualThread(VertxInternal vertx) {
@@ -312,4 +302,103 @@ private void doTestVirtualThread(VertxInternal vertx) {
312302
});
313303
await();
314304
}
305+
306+
@Test
307+
public void testBlockingStreamFromVirtualThread() {
308+
VertxInternal vertx = (VertxInternal) this.vertx;
309+
Assume.assumeTrue(vertx.isVirtualThreadAvailable());
310+
ContextInternal context = vertx.createVirtualThreadContext();
311+
testBlockingStream(task -> {
312+
context.runOnContext(v -> task.run());
313+
});
314+
}
315+
316+
@Test
317+
public void testBlockingStreamFromVanillaThread() {
318+
testBlockingStream(task -> {
319+
Thread thread = new Thread(task);
320+
thread.start();
321+
});
322+
}
323+
324+
@Test
325+
public void testBlockingStreamFromVertxThread() {
326+
VertxInternal vertx = (VertxInternal) this.vertx;
327+
FakeStream<Integer> readStream = new FakeStream<>();
328+
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
329+
Stream<Integer> blockingStream = readStream.blockingStream();
330+
ContextInternal context = vertx.createEventLoopContext();
331+
context.exceptionHandler(errors::add);
332+
context.runOnContext(v -> {
333+
blockingStream.forEach(elt -> {
334+
fail();
335+
});
336+
});
337+
assertWaitUntil(() -> errors.size() == 1);
338+
assertEquals(IllegalStateException.class, errors.get(0).getClass());
339+
}
340+
341+
private void testBlockingStream(Executor runner) {
342+
int numItems = 4;
343+
FakeStream<Integer> stream = new FakeStream<>();
344+
Stream<Integer> blockingStream = stream.blockingStream();
345+
List<Integer> expected = new ArrayList<>();
346+
for (int i = 0;i < numItems;i++) {
347+
expected.add(i);
348+
}
349+
List<Integer> items = Collections.synchronizedList(new ArrayList<>());
350+
runner.execute(() -> items.addAll(blockingStream.collect(Collectors.toList())));
351+
new Thread(() -> {
352+
try {
353+
for (int elt : expected) {
354+
stream.write(elt);
355+
Thread.sleep(10);
356+
}
357+
stream.end();
358+
} catch (InterruptedException ignore) {
359+
}
360+
}).start();
361+
assertWaitUntil(() -> items.equals(expected));
362+
}
363+
364+
@Test
365+
public void testBlockingStreamInterleavingFromVirtualThread() {
366+
VertxInternal vertx = (VertxInternal) this.vertx;
367+
Assume.assumeTrue(vertx.isVirtualThreadAvailable());
368+
ContextInternal context = vertx.createVirtualThreadContext();
369+
FakeStream<Integer> stream = new FakeStream<>();
370+
Stream<Integer> blockingStream = stream.blockingStream();
371+
int num = 32;
372+
context.runOnContext(v -> {
373+
AtomicInteger count = new AtomicInteger();
374+
vertx.setPeriodic(5, id -> {
375+
assertSame(context, ((ContextInternal) Vertx.currentContext()).unwrap());
376+
int i = count.getAndIncrement();
377+
if (i == num) {
378+
vertx.cancelTimer(id);
379+
stream.end();
380+
} else {
381+
stream.write(i);
382+
}
383+
});
384+
List<Integer> collected = new ArrayList<>();
385+
blockingStream.forEach(collected::add);
386+
assertEquals(num, collected.size());
387+
testComplete();
388+
});
389+
await();
390+
}
391+
392+
@Test
393+
public void testStreamFailure() {
394+
RuntimeException expected = new RuntimeException();
395+
FakeStream<Integer> stream = new FakeStream<>();
396+
Stream<Integer> blockingStream = stream.blockingStream();
397+
stream.fail(expected);
398+
try {
399+
blockingStream.collect(Collectors.toList());
400+
} catch (Exception e) {
401+
assertSame(expected, e);
402+
}
403+
}
315404
}

0 commit comments

Comments
 (0)