Skip to content

Commit 88c3271

Browse files
committed
Test early gRPC headers sending
1 parent b876c94 commit 88c3271

File tree

3 files changed

+139
-0
lines changed

3 files changed

+139
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.quarkus.grpc.examples.interceptors;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
import io.grpc.CallOptions;
6+
import io.grpc.Channel;
7+
import io.grpc.ClientCall;
8+
import io.grpc.ClientInterceptor;
9+
import io.grpc.ForwardingClientCall;
10+
import io.grpc.ForwardingClientCallListener;
11+
import io.grpc.Metadata;
12+
import io.grpc.MethodDescriptor;
13+
import io.grpc.Status;
14+
15+
public class EarlyHeaderClientInterceptor implements ClientInterceptor {
16+
17+
private static final Metadata.Key<String> HEADER = Metadata.Key.of("xx-acme-header", Metadata.ASCII_STRING_MARSHALLER);
18+
private final CompletableFuture<String> headerFuture = new CompletableFuture<>();
19+
20+
public CompletableFuture<String> getHeaderFuture() {
21+
return headerFuture;
22+
}
23+
24+
@Override
25+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
26+
CallOptions callOptions, Channel next) {
27+
return new ForwardingClientCall.SimpleForwardingClientCall<>(next.newCall(methodDescriptor, callOptions)) {
28+
@Override
29+
public void start(Listener<RespT> responseListener, Metadata headers) {
30+
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
31+
@Override
32+
public void onHeaders(Metadata headers) {
33+
headerFuture.complete(headers.get(HEADER));
34+
super.onHeaders(headers);
35+
}
36+
37+
@Override
38+
public void onClose(Status status, Metadata trailers) {
39+
if (!headerFuture.isDone()) {
40+
headerFuture.completeExceptionally(new RuntimeException("Call closed before receiving headers"));
41+
}
42+
super.onClose(status, trailers);
43+
}
44+
}, headers);
45+
}
46+
};
47+
}
48+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.quarkus.grpc.examples.interceptors;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
5+
import io.grpc.ForwardingServerCall;
6+
import io.grpc.Metadata;
7+
import io.grpc.ServerCall;
8+
import io.grpc.ServerCallHandler;
9+
import io.grpc.ServerInterceptor;
10+
import io.grpc.Status;
11+
import io.quarkus.grpc.GlobalInterceptor;
12+
13+
@GlobalInterceptor
14+
@ApplicationScoped
15+
public class EarlyHeaderServerInterceptor implements ServerInterceptor {
16+
17+
private static final Metadata.Key<String> HEADER = Metadata.Key.of("xx-acme-header", Metadata.ASCII_STRING_MARSHALLER);
18+
19+
@Override
20+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
21+
ServerCallHandler<ReqT, RespT> next) {
22+
ServerCallDiscardingHeaders<ReqT, RespT> wrappedServerCall = new ServerCallDiscardingHeaders<>(call);
23+
ServerCall.Listener<ReqT> serverCallListener = next.startCall(wrappedServerCall, headers);
24+
if (wrappedServerCall.isClosed()) {
25+
return new ServerCall.Listener<>() {
26+
};
27+
}
28+
29+
Metadata metadata = new Metadata();
30+
metadata.put(HEADER, "whatever");
31+
call.sendHeaders(metadata);
32+
33+
return serverCallListener;
34+
}
35+
36+
private static class ServerCallDiscardingHeaders<ReqT, RespT>
37+
extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
38+
39+
private boolean closed;
40+
41+
protected ServerCallDiscardingHeaders(ServerCall<ReqT, RespT> delegate) {
42+
super(delegate);
43+
}
44+
45+
@Override
46+
public void sendHeaders(Metadata headers) {
47+
// headers have been sent already
48+
}
49+
50+
@Override
51+
public void sendMessage(RespT message) {
52+
super.sendMessage(message);
53+
}
54+
55+
@Override
56+
public void close(Status status, Metadata trailers) {
57+
closed = true;
58+
super.close(status, trailers);
59+
}
60+
61+
boolean isClosed() {
62+
return closed;
63+
}
64+
}
65+
}

integration-tests/grpc-interceptors/src/test/java/io/quarkus/grpc/example/interceptors/HelloWorldServiceTestBase.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import examples.MutinyCopycatGrpc;
1818
import io.grpc.Channel;
1919
import io.grpc.stub.StreamObserver;
20+
import io.quarkus.grpc.examples.interceptors.EarlyHeaderClientInterceptor;
2021
import io.quarkus.grpc.examples.interceptors.HelloExceptionHandlerProvider;
2122
import io.quarkus.grpc.test.utils.GRPCTestUtils;
2223
import io.smallrye.mutiny.Multi;
@@ -53,6 +54,31 @@ private static void assertException(Throwable t) {
5354
Assertions.assertTrue(HelloExceptionHandlerProvider.invoked);
5455
}
5556

57+
@Test
58+
void testEarlyHeaders() throws Exception {
59+
EarlyHeaderClientInterceptor earlyHeaderClientInterceptor = new EarlyHeaderClientInterceptor();
60+
GreeterGrpc.GreeterStub helloGrpcStub = GreeterGrpc.newStub(channel).withInterceptors(earlyHeaderClientInterceptor);
61+
helloGrpcStub.sayHello(HelloRequest.newBuilder().setName("gRPC").build(), new StreamObserver<>() {
62+
@Override
63+
public void onNext(HelloReply helloReply) {
64+
// ignore
65+
}
66+
67+
@Override
68+
public void onError(Throwable throwable) {
69+
throwable.printStackTrace();
70+
}
71+
72+
@Override
73+
public void onCompleted() {
74+
// ignore
75+
}
76+
});
77+
78+
String headerValue = earlyHeaderClientInterceptor.getHeaderFuture().get(5, TimeUnit.SECONDS);
79+
Assertions.assertEquals(headerValue, "whatever");
80+
}
81+
5682
@Test
5783
public void testExceptionHandlerObserver() throws Exception {
5884
HelloExceptionHandlerProvider.invoked = false;

0 commit comments

Comments
 (0)