Skip to content

Commit ec4074a

Browse files
authored
Merge pull request quarkusio#36466 from mkouba/issue-36430
Reactive routes: virtual threads support
2 parents 87379b2 + 8638dcb commit ec4074a

File tree

56 files changed

+350
-866
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+350
-866
lines changed

docs/src/main/asciidoc/reactive-routes.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void blocking(RoutingContext rc) {
138138
// ...
139139
}
140140
----
141-
When `@Blocking` is used, it ignores the `type` attribute of `@Route`.
141+
When `@Blocking` is used, the `type` attribute of the `@Route` is ignored.
142142
====
143143

144144
The `@Route` annotation is repeatable and so you can declare several routes for a single method:
@@ -164,6 +164,12 @@ String person() {
164164
----
165165
<1> If the `accept` header matches `text/html`, we set the content type automatically to `text/html`.
166166

167+
=== Executing route on a virtual thread
168+
169+
You can annotate a route method with `@io.smallrye.common.annotation.RunOnVirtualThread` in order to execute it on a virtual thread.
170+
However, keep in mind that not everything can run safely on virtual threads.
171+
You should read the xref:virtual-threads.adoc#run-code-on-virtual-threads-using-runonvirtualthread[Virtual thread support reference] carefully and get acquainted with all the details.
172+
167173
=== Handling conflicting routes
168174

169175
You may end up with multiple routes matching a given path.

docs/src/main/asciidoc/virtual-threads.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ In this scenario, it is worse than useless to have thousands of threads if we ha
7979
Even worse, when running a CPU-bound workload on a virtual thread, the virtual thread monopolizes the carrier thread on which it is mounted.
8080
It will either reduce the chance for the other virtual thread to run or will start creating new carrier threads, leading to high memory usage.
8181

82+
[[run-code-on-virtual-threads-using-runonvirtualthread]]
8283
== Run code on virtual threads using @RunOnVirtualThread
8384

8485
In Quarkus, the support of virtual thread is implemented using the link:{runonvthread}[@RunOnVirtualThread] annotation.

extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.jboss.jandex.Type.Kind;
77

88
import io.quarkus.hibernate.validator.spi.BeanValidationAnnotationsBuildItem;
9-
import io.quarkus.vertx.http.runtime.HandlerType;
109

1110
/**
1211
* Describe a request handler.
@@ -15,15 +14,15 @@ class HandlerDescriptor {
1514

1615
private final MethodInfo method;
1716
private final BeanValidationAnnotationsBuildItem validationAnnotations;
18-
private final HandlerType handlerType;
17+
private final boolean failureHandler;
1918
private final Type payloadType;
2019
private final String[] contentTypes;
2120

22-
HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, HandlerType handlerType,
21+
HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, boolean failureHandler,
2322
String[] producedTypes) {
2423
this.method = method;
2524
this.validationAnnotations = bvAnnotations;
26-
this.handlerType = handlerType;
25+
this.failureHandler = failureHandler;
2726
Type returnType = method.returnType();
2827
if (returnType.kind() == Kind.VOID) {
2928
payloadType = null;
@@ -120,8 +119,8 @@ boolean isPayloadMutinyBuffer() {
120119
return type.name().equals(DotNames.MUTINY_BUFFER);
121120
}
122121

123-
HandlerType getHandlerType() {
124-
return handlerType;
122+
boolean isFailureHandler() {
123+
return failureHandler;
125124
}
126125

127126
}

extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/ReactiveRoutesProcessor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ public boolean test(String name) {
445445
if (routeHandler == null) {
446446
String handlerClass = generateHandler(
447447
new HandlerDescriptor(businessMethod.getMethod(), beanValidationAnnotations.orElse(null),
448-
handlerType, produces),
448+
handlerType == HandlerType.FAILURE, produces),
449449
businessMethod.getBean(), businessMethod.getMethod(), classOutput, transformedAnnotations,
450450
routeString, reflectiveHierarchy, produces.length > 0 ? produces[0] : null,
451451
validatorAvailable, index);
@@ -458,6 +458,13 @@ public boolean test(String name) {
458458
// Wrap the route handler if necessary
459459
// Note that route annotations with the same values share a single handler implementation
460460
routeHandler = recorder.compressRouteHandler(routeHandler, businessMethod.getCompression());
461+
if (businessMethod.getMethod().hasDeclaredAnnotation(DotNames.RUN_ON_VIRTUAL_THREAD)) {
462+
LOGGER.debugf("Route %s#%s() will be executed on a virtual thread",
463+
businessMethod.getMethod().declaringClass().name(), businessMethod.getMethod().name());
464+
routeHandler = recorder.runOnVirtualThread(routeHandler);
465+
// The handler must be executed on the event loop
466+
handlerType = HandlerType.NORMAL;
467+
}
461468

462469
RouteMatcher matcher = new RouteMatcher(path, regex, produces, consumes, methods, order);
463470
matchers.put(matcher, businessMethod.getMethod());
@@ -489,7 +496,7 @@ public boolean test(String name) {
489496

490497
for (AnnotatedRouteFilterBuildItem filterMethod : routeFilterBusinessMethods) {
491498
String handlerClass = generateHandler(
492-
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), HandlerType.NORMAL,
499+
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), false,
493500
new String[0]),
494501
filterMethod.getBean(), filterMethod.getMethod(), classOutput, transformedAnnotations,
495502
filterMethod.getRouteFilter().toString(true), reflectiveHierarchy, null, validatorAvailable, index);
@@ -785,7 +792,7 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
785792
defaultProduces == null ? invoke.loadNull() : invoke.load(defaultProduces));
786793

787794
// For failure handlers attempt to match the failure type
788-
if (descriptor.getHandlerType() == HandlerType.FAILURE) {
795+
if (descriptor.isFailureHandler()) {
789796
Type failureType = getFailureType(parameters, index);
790797
if (failureType != null) {
791798
ResultHandle failure = invoke.invokeInterfaceMethod(Methods.FAILURE, routingContext);

extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/VertxWebRecorder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public Handler<RoutingContext> createHandler(String handlerClassName) {
5151
}
5252
}
5353

54+
public Handler<RoutingContext> runOnVirtualThread(Handler<RoutingContext> routeHandler) {
55+
return new VirtualThreadsRouteHandler(routeHandler);
56+
}
57+
5458
public Handler<RoutingContext> compressRouteHandler(Handler<RoutingContext> routeHandler, HttpCompression compression) {
5559
if (httpBuildTimeConfig.enableCompression) {
5660
return new HttpCompressionHandler(routeHandler, compression,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.quarkus.vertx.web.runtime;
2+
3+
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
4+
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
5+
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
6+
import io.smallrye.common.vertx.VertxContext;
7+
import io.vertx.core.Context;
8+
import io.vertx.core.Handler;
9+
import io.vertx.ext.web.RoutingContext;
10+
11+
public class VirtualThreadsRouteHandler implements Handler<RoutingContext> {
12+
13+
private final Handler<RoutingContext> routeHandler;
14+
15+
public VirtualThreadsRouteHandler(Handler<RoutingContext> routeHandler) {
16+
this.routeHandler = routeHandler;
17+
}
18+
19+
@Override
20+
public void handle(RoutingContext context) {
21+
Context vertxContext = VertxContext.getOrCreateDuplicatedContext(VertxCoreRecorder.getVertx().get());
22+
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
23+
vertxContext.runOnContext(new Handler<Void>() {
24+
@Override
25+
public void handle(Void event) {
26+
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
27+
@Override
28+
public void run() {
29+
routeHandler.handle(context);
30+
}
31+
});
32+
}
33+
});
34+
}
35+
36+
}

integration-tests/virtual-threads/amqp-virtual-threads/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@
5252
<artifactId>assertj-core</artifactId>
5353
<scope>test</scope>
5454
</dependency>
55+
<!-- Use the "compile" scope because we need to include the VirtualThreadsAssertions in the app -->
5556
<dependency>
5657
<groupId>io.quarkus</groupId>
5758
<artifactId>quarkus-test-vertx</artifactId>
58-
<scope>test</scope>
5959
</dependency>
6060
<dependency>
6161
<groupId>io.quarkus</groupId>

integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/AssertHelper.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceConsumer.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.quarkus.it.vthreads.amqp;
22

3-
import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnADuplicatedContext;
4-
import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnVirtualThread;
5-
63
import java.util.Random;
74
import java.util.concurrent.CompletionStage;
85
import java.util.concurrent.atomic.AtomicInteger;
@@ -14,6 +11,7 @@
1411
import org.eclipse.microprofile.reactive.messaging.Outgoing;
1512
import org.eclipse.microprofile.rest.client.inject.RestClient;
1613

14+
import io.quarkus.test.vertx.VirtualThreadsAssertions;
1715
import io.smallrye.common.annotation.RunOnVirtualThread;
1816

1917
@ApplicationScoped
@@ -25,12 +23,12 @@ public class PriceConsumer {
2523
@Incoming("prices")
2624
@RunOnVirtualThread
2725
public CompletionStage<Void> consume(Message<Double> msg) {
28-
assertThatItRunsOnVirtualThread();
29-
assertThatItRunsOnADuplicatedContext();
26+
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
27+
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
3028
double price = msg.getPayload();
3129
alertService.alertMessage(price);
3230
return msg.ack().thenAccept(x -> {
33-
assertThatItRunsOnADuplicatedContext();
31+
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
3432
// While the ack always runs on event loop thread
3533
// the post-ack may run on the processing virtual-thread which executed the method.
3634
});
@@ -39,8 +37,8 @@ public CompletionStage<Void> consume(Message<Double> msg) {
3937
@Incoming("prices")
4038
@RunOnVirtualThread
4139
public void consume(double price) {
42-
assertThatItRunsOnVirtualThread();
43-
assertThatItRunsOnADuplicatedContext();
40+
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
41+
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
4442
alertService.alert(price);
4543
}
4644

@@ -50,7 +48,7 @@ public void consume(double price) {
5048
@Outgoing("prices-out")
5149
@RunOnVirtualThread
5250
public Message<Double> randomPriceGenerator() {
53-
assertThatItRunsOnVirtualThread();
51+
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
5452
return Message.of(r.nextDouble() * 10 * i.incrementAndGet());
5553
}
5654

integration-tests/virtual-threads/grpc-virtual-threads/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
<groupId>io.quarkus</groupId>
2323
<artifactId>quarkus-grpc</artifactId>
2424
</dependency>
25-
25+
<!-- Use the "compile" scope because we need to include the VirtualThreadsAssertions in the app -->
26+
<dependency>
27+
<groupId>io.quarkus</groupId>
28+
<artifactId>quarkus-test-vertx</artifactId>
29+
</dependency>
2630
<dependency>
2731
<groupId>io.quarkus</groupId>
2832
<artifactId>quarkus-junit5</artifactId>

0 commit comments

Comments
 (0)