Skip to content

Commit 41d3803

Browse files
authored
Merge pull request #47094 from alesj/grpc_reqctx1
Allow for RoutingContext to be injected into gRPC service
2 parents 77ebed3 + 9b94275 commit 41d3803

File tree

6 files changed

+134
-2
lines changed

6 files changed

+134
-2
lines changed

docs/src/main/asciidoc/grpc-service-implementation.adoc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,21 @@ public class MyInterceptor implements ServerInterceptor, Prioritized {
331331
Interceptors with the highest priority are called first.
332332
The default priority, used if the interceptor does not implement the `Prioritized` interface, is `0`.
333333

334+
There is also a support to inject Vert.x `RoutingContext` instance into your gRPC service, if / when needed.
335+
Quarkus doesn't do that by default, you will need to add `RoutingContextGrpcInterceptor` to your gRPC service.
336+
337+
[source, java]
338+
----
339+
@GrpcService
340+
@RegisterInterceptor(RoutingContextGrpcInterceptor.class)
341+
public class HelloWorldService extends GreeterGrpc.GreeterImplBase {
342+
343+
@Inject
344+
RoutingContext context;
345+
346+
// ...
347+
}
348+
----
334349

335350
== Testing your services
336351

extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;
8787
import io.quarkus.grpc.runtime.supports.context.GrpcDuplicatedContextGrpcInterceptor;
8888
import io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor;
89+
import io.quarkus.grpc.runtime.supports.context.RoutingContextGrpcInterceptor;
8990
import io.quarkus.grpc.runtime.supports.exc.DefaultExceptionHandlerProvider;
9091
import io.quarkus.grpc.runtime.supports.exc.ExceptionInterceptor;
9192
import io.quarkus.kubernetes.spi.KubernetesPortBuildItem;
@@ -565,6 +566,7 @@ void registerBeans(BuildProducer<AdditionalBeanBuildItem> beans,
565566
// Global interceptors are invoked before any of the per-service interceptors
566567
beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcRequestContextGrpcInterceptor.class));
567568
beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcDuplicatedContextGrpcInterceptor.class));
569+
beans.produce(AdditionalBeanBuildItem.unremovableOf(RoutingContextGrpcInterceptor.class));
568570
features.produce(new FeatureBuildItem(GRPC_SERVER));
569571

570572
if (capabilities.isPresent(Capability.SECURITY)) {

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,20 +269,30 @@ public void handle(Void unused) {
269269
capturedVertxContext.runOnContext(new Handler<Void>() {
270270
@Override
271271
public void handle(Void unused) {
272-
server.handle(ctx.request());
272+
routingContextAware(server, ctx);
273273
}
274274
});
275275
return;
276276
}
277277
}
278-
server.handle(ctx.request());
278+
routingContextAware(server, ctx);
279279
}
280280
});
281281
shutdown.addShutdownTask(route::remove); // remove this route at shutdown, this should reset it
282282

283283
initHealthStorage();
284284
}
285285

286+
private static void routingContextAware(GrpcServer server, RoutingContext context) {
287+
Context currentContext = Vertx.currentContext();
288+
currentContext.putLocal(RoutingContext.class.getName(), context);
289+
try {
290+
server.handle(context.request());
291+
} finally {
292+
currentContext.removeLocal(RoutingContext.class.getName());
293+
}
294+
}
295+
286296
// TODO -- handle Avro, plain text ... when supported / needed
287297
private static boolean isGrpc(RoutingContext rc) {
288298
HttpServerRequest request = rc.request();

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/Interceptors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public final class Interceptors {
1616

1717
public static final int DUPLICATE_CONTEXT = Integer.MAX_VALUE - 5;
1818
public static final int REQUEST_CONTEXT = Integer.MAX_VALUE - 50;
19+
public static final int ROUTING_CONTEXT = Integer.MAX_VALUE - 55;
1920
public static final int BLOCKING_HANDLER = Integer.MAX_VALUE - 60;
2021
public static final int EXCEPTION_HANDLER = Integer.MAX_VALUE - 70;
2122

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.quarkus.grpc.runtime.supports.context;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
import jakarta.enterprise.inject.spi.Prioritized;
5+
6+
import io.grpc.ForwardingServerCallListener;
7+
import io.grpc.Metadata;
8+
import io.grpc.ServerCall;
9+
import io.grpc.ServerCallHandler;
10+
import io.grpc.ServerInterceptor;
11+
import io.quarkus.arc.Arc;
12+
import io.quarkus.arc.ArcContainer;
13+
import io.quarkus.arc.ManagedContext;
14+
import io.quarkus.grpc.runtime.Interceptors;
15+
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
16+
import io.vertx.core.Context;
17+
import io.vertx.core.Vertx;
18+
import io.vertx.ext.web.RoutingContext;
19+
20+
@ApplicationScoped
21+
public class RoutingContextGrpcInterceptor implements ServerInterceptor, Prioritized {
22+
23+
@Override
24+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
25+
ServerCall<ReqT, RespT> call,
26+
Metadata headers,
27+
ServerCallHandler<ReqT, RespT> next) {
28+
29+
Context currentContext = Vertx.currentContext();
30+
RoutingContext routingContext = currentContext.getLocal(RoutingContext.class.getName());
31+
32+
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
33+
next.startCall(call, headers)) {
34+
35+
private void invoke(Runnable runnable) {
36+
ArcContainer container = Arc.container();
37+
ManagedContext reqContext = container.requestContext();
38+
CurrentVertxRequest cvr = null;
39+
if (reqContext.isActive()) {
40+
cvr = container.select(CurrentVertxRequest.class).get();
41+
cvr.setCurrent(routingContext);
42+
}
43+
try {
44+
runnable.run();
45+
} finally {
46+
// can be non-active, after onComplete + handleClose -> onHalfClose
47+
if (cvr != null && reqContext.isActive()) {
48+
cvr.setCurrent(null);
49+
}
50+
}
51+
}
52+
53+
@Override
54+
public void onMessage(ReqT message) {
55+
invoke(() -> super.onMessage(message));
56+
}
57+
58+
@Override
59+
public void onHalfClose() {
60+
invoke(super::onHalfClose);
61+
}
62+
63+
@Override
64+
public void onCancel() {
65+
invoke(super::onCancel);
66+
}
67+
68+
@Override
69+
public void onComplete() {
70+
invoke(super::onComplete);
71+
}
72+
73+
@Override
74+
public void onReady() {
75+
invoke(super::onReady);
76+
}
77+
};
78+
}
79+
80+
@Override
81+
public int getPriority() {
82+
return Interceptors.ROUTING_CONTEXT;
83+
}
84+
}

integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HelloWorldService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,35 @@
11
package io.quarkus.grpc.examples.interceptors;
22

3+
import jakarta.inject.Inject;
4+
5+
import org.eclipse.microprofile.config.inject.ConfigProperty;
6+
37
import examples.GreeterGrpc;
48
import examples.HelloReply;
59
import examples.HelloRequest;
610
import io.grpc.stub.StreamObserver;
711
import io.quarkus.grpc.GrpcService;
12+
import io.quarkus.grpc.RegisterInterceptor;
13+
import io.quarkus.grpc.runtime.supports.context.RoutingContextGrpcInterceptor;
14+
import io.vertx.ext.web.RoutingContext;
815

916
@GrpcService
17+
@RegisterInterceptor(RoutingContextGrpcInterceptor.class)
1018
public class HelloWorldService extends GreeterGrpc.GreeterImplBase {
1119

20+
@Inject
21+
RoutingContext context;
22+
23+
@ConfigProperty(name = "quarkus.profile")
24+
String profile;
25+
1226
private HelloReply getReply(HelloRequest request) {
27+
// just poke for Vert.x based gRPC server-side
28+
if ("vertx".equals(profile) || "o2n".equals(profile)) {
29+
// just poke context, so we know it works
30+
context.request().getHeader("foobar");
31+
}
32+
1333
String name = request.getName();
1434
if (name.equals("Fail")) {
1535
throw new HelloException(name);

0 commit comments

Comments
 (0)