Skip to content

Commit 407888e

Browse files
committed
Rework the timeout handling to move the gRPC context outside of the vertx request/response API, update the implementation of gRPC context storage to comply agains the context requirement to attach/detach in the same method.
1 parent f4cd381 commit 407888e

File tree

24 files changed

+575
-165
lines changed

24 files changed

+575
-165
lines changed

vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescr
3838
compressor = null;
3939
}
4040
Executor exec = callOptions.getExecutor();
41+
Context ctx = Context.current();
4142
Deadline deadline = callOptions.getDeadline();
42-
if (deadline == null) {
43-
Context ctx = Context.current();
44-
deadline = ctx.getDeadline();
43+
Deadline contextDeadline = ctx.getDeadline();
44+
if (contextDeadline != null && (deadline == null || contextDeadline.isBefore(deadline))) {
45+
deadline = contextDeadline;
4546
}
4647
return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor, deadline);
4748
}
@@ -50,5 +51,4 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescr
5051
public String authority() {
5152
return null;
5253
}
53-
5454
}

vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientRequest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616
import io.vertx.codegen.annotations.VertxGen;
1717
import io.vertx.core.Future;
1818
import io.vertx.core.Handler;
19-
import io.vertx.core.MultiMap;
20-
import io.vertx.core.buffer.Buffer;
21-
import io.vertx.core.http.HttpClientResponse;
19+
import io.vertx.core.Timer;
2220
import io.vertx.core.http.HttpConnection;
23-
import io.vertx.core.http.HttpHeaders;
2421
import io.vertx.core.streams.ReadStream;
2522
import io.vertx.grpc.common.GrpcWriteStream;
2623
import io.vertx.grpc.common.ServiceName;
@@ -100,6 +97,11 @@ public interface GrpcClientRequest<Req, Resp> extends GrpcWriteStream<Req> {
10097
@Fluent
10198
GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit);
10299

100+
/**
101+
* Schedule a deadline when sending this request
102+
*/
103+
Timer scheduleDeadline();
104+
103105
/**
104106
* Sets the amount of time after which, if the request does not return any data within the timeout period,
105107
* the request/response is cancelled and the related futures.

vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
import io.vertx.core.Future;
55
import io.vertx.core.http.StreamResetException;
66
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.grpc.client.impl.GrpcClientRequestImpl;
8+
import io.vertx.grpc.client.impl.GrpcClientResponseImpl;
79
import io.vertx.grpc.common.GrpcError;
8-
import io.vertx.grpc.common.impl.BridgeMessageDecoder;
9-
import io.vertx.grpc.common.impl.BridgeMessageEncoder;
10-
import io.vertx.grpc.common.impl.ReadStreamAdapter;
11-
import io.vertx.grpc.common.impl.Utils;
12-
import io.vertx.grpc.common.impl.WriteStreamAdapter;
10+
import io.vertx.grpc.common.impl.*;
1311

1412
import javax.annotation.Nullable;
1513
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ScheduledFuture;
1615
import java.util.concurrent.TimeUnit;
1716

1817
class VertxClientCall<RequestT, ResponseT> extends ClientCall<RequestT, ResponseT> {
@@ -76,9 +75,18 @@ public void start(Listener<ResponseT> responseListener, Metadata headers) {
7675
if (ar1.succeeded()) {
7776
request = ar1.result();
7877
Utils.writeMetadata(headers, request.headers());
78+
ScheduledFuture<?> sf;
7979
if (deadline != null) {
8080
long timeout = deadline.timeRemaining(TimeUnit.MILLISECONDS);
8181
request.timeout(timeout, TimeUnit.MILLISECONDS);
82+
sf = deadline.runOnExpiration(new Runnable() {
83+
@Override
84+
public void run() {
85+
request.cancel();
86+
}
87+
}, new VertxScheduledExecutorService(((GrpcClientRequestImpl)request).context()));
88+
} else {
89+
sf = null;
8290
}
8391
if (encoding != null) {
8492
request.encoding(encoding);
@@ -89,6 +97,12 @@ public void start(Listener<ResponseT> responseListener, Metadata headers) {
8997

9098
grpcResponse = ar2.result();
9199

100+
if (sf != null) {
101+
grpcResponse.end().onComplete(ar -> {
102+
sf.cancel(false);
103+
});
104+
}
105+
92106
String respEncoding = grpcResponse.encoding();
93107
Decompressor decompressor = DecompressorRegistry.getDefaultInstance().lookupDecompressor(respEncoding);
94108

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
*/
1111
package io.vertx.grpc.client.impl;
1212

13-
import io.grpc.Context;
1413
import io.vertx.core.Future;
1514
import io.vertx.core.Handler;
1615
import io.vertx.core.MultiMap;
16+
import io.vertx.core.Timer;
1717
import io.vertx.core.buffer.Buffer;
1818
import io.vertx.core.http.HttpClientRequest;
1919

@@ -33,7 +33,6 @@
3333
import io.vertx.grpc.common.GrpcMessageEncoder;
3434
import io.vertx.grpc.common.ServiceName;
3535
import io.vertx.grpc.common.impl.GrpcMessageImpl;
36-
import io.vertx.grpc.common.impl.VertxScheduledExecutorService;
3736

3837
/**
3938
* @author <a href="mailto:[email protected]">Julien Viet</a>
@@ -53,7 +52,7 @@ public class GrpcClientRequestImpl<Req, Resp> implements GrpcClientRequest<Req,
5352
private MultiMap headers;
5453
private long timeout;
5554
private TimeUnit timeoutUnit;
56-
private io.grpc.Context grpcContext;
55+
private Timer timeoutTimer;
5756

5857
public GrpcClientRequestImpl(HttpClientRequest httpRequest, GrpcMessageEncoder<Req> messageEncoder, GrpcMessageDecoder<Resp> messageDecoder) {
5958
this.context = ((PromiseInternal<?>)httpRequest.response()).context();
@@ -62,12 +61,16 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest, GrpcMessageEncoder<R
6261
this.response = httpRequest
6362
.response()
6463
.map(httpResponse -> {
65-
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, grpcContext, this, httpResponse, messageDecoder);
64+
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, httpResponse, messageDecoder);
6665
grpcResponse.init();
6766
return grpcResponse;
6867
});
6968
}
7069

70+
public ContextInternal context() {
71+
return context;
72+
}
73+
7174
@Override
7275
public MultiMap headers() {
7376
if (headersSent) {
@@ -147,6 +150,19 @@ public GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit) {
147150
return this;
148151
}
149152

153+
@Override
154+
public Timer scheduleDeadline() {
155+
if (timeout > 0L && timeoutTimer ==null) {
156+
Timer timer = context.timer(timeout, TimeUnit.MILLISECONDS);
157+
timeoutTimer = timer;
158+
timer.onSuccess(v -> {
159+
cancel();
160+
});
161+
return timer;
162+
}
163+
throw new IllegalStateException();
164+
}
165+
150166
@Override
151167
public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
152168
httpRequest.idleTimeout(timeout);
@@ -233,15 +249,6 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) {
233249
httpRequest.setURI(uri);
234250
headersSent = true;
235251
}
236-
if (timeout > 0L) {
237-
Context.CancellableContext deadlineContext = Context.current().withDeadlineAfter(timeout, timeoutUnit, new VertxScheduledExecutorService(context));
238-
deadlineContext.addListener(ctx_ -> {
239-
cancel();
240-
}, Runnable::run);
241-
grpcContext = deadlineContext;
242-
} else {
243-
grpcContext = Context.current();
244-
}
245252
if (end) {
246253
trailersSent = true;
247254
return httpRequest.end(GrpcMessageImpl.encode(message));
@@ -250,6 +257,13 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) {
250257
}
251258
}
252259

260+
void cancelTimeout() {
261+
Timer timer = timeoutTimer;
262+
if (timer != null && timer.cancel()) {
263+
timeoutTimer = null;
264+
}
265+
}
266+
253267
@Override
254268
public Future<Void> write(Req message) {
255269
return writeMessage(messageEncoder.encode(message));

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
4040
private String encoding;
4141

4242
public GrpcClientResponseImpl(ContextInternal context,
43-
io.grpc.Context grpcContext,
4443
GrpcClientRequestImpl<Req, Resp> request,
4544
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
46-
super(context, grpcContext, httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder);
45+
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder);
4746
this.request = request;
4847
this.encoding = httpResponse.headers().get("grpc-encoding");
4948
this.httpResponse = httpResponse;
@@ -76,9 +75,10 @@ public MultiMap trailers() {
7675
}
7776

7877
protected void handleEnd() {
79-
if (grpcContext instanceof Context.CancellableContext) {
80-
((Context.CancellableContext)grpcContext).close();
81-
}
78+
// if (grpcContext instanceof Context.CancellableContext) {
79+
// ((Context.CancellableContext)grpcContext).close();
80+
// }
81+
request.cancelTimeout();
8282
String responseStatus = httpResponse.getTrailer("grpc-status");
8383
if (responseStatus != null) {
8484
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));

vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientRequestTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,9 @@ public void testTimeoutOnClient(TestContext should) throws Exception {
536536
client = GrpcClient.client(vertx);
537537
client.request(SocketAddress.inetSocketAddress(port, "localhost"), StreamingGrpc.getSinkMethod())
538538
.onComplete(should.asyncAssertSuccess(callRequest -> {
539-
callRequest.timeout(1, TimeUnit.SECONDS);
539+
callRequest
540+
.timeout(1, TimeUnit.SECONDS)
541+
.scheduleDeadline();
540542
callRequest.write(Item.getDefaultInstance());
541543
callRequest.response().onComplete(should.asyncAssertFailure(err -> {
542544
should.assertTrue(err instanceof StreamResetException);
@@ -554,7 +556,9 @@ public void testTimeoutPropagationToServer(TestContext should) throws Exception
554556
client = GrpcClient.client(vertx);
555557
client.request(SocketAddress.inetSocketAddress(port, "localhost"), GreeterGrpc.getSayHelloMethod())
556558
.onComplete(should.asyncAssertSuccess(callRequest -> {
557-
callRequest.timeout(10, TimeUnit.SECONDS);
559+
callRequest
560+
.timeout(10, TimeUnit.SECONDS)
561+
.scheduleDeadline();
558562
callRequest.end(HelloRequest.newBuilder().setName("Julien").build());
559563
callRequest.response().onComplete(should.asyncAssertSuccess(e -> {
560564
long timeRemaining = cf.getNow(-1L);

vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ public void testTimeoutOnClient(TestContext should) throws Exception {
379379
HttpServer server = vertx.createHttpServer();
380380
server
381381
.requestHandler(request -> {
382+
String timeout = request.getHeader("grpc-timeout");
383+
should.assertNotNull(timeout);
382384
request.response().exceptionHandler(err -> {
383385
should.assertEquals(StreamResetException.class, err.getClass());
384386
StreamResetException reset = (StreamResetException) err;

vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public Buffer payload() {
4747
};
4848

4949
protected final ContextInternal context;
50-
protected final io.grpc.Context grpcContext;
5150
private final String encoding;
5251
private final ReadStream<Buffer> stream;
5352
private final InboundBuffer<GrpcMessage> queue;
@@ -60,9 +59,8 @@ public Buffer payload() {
6059
private final GrpcMessageDecoder<T> messageDecoder;
6160
private final Promise<Void> end;
6261

63-
protected GrpcReadStreamBase(Context context, io.grpc.Context grpcContext, ReadStream<Buffer> stream, String encoding, GrpcMessageDecoder<T> messageDecoder) {
62+
protected GrpcReadStreamBase(Context context, ReadStream<Buffer> stream, String encoding, GrpcMessageDecoder<T> messageDecoder) {
6463
this.context = (ContextInternal) context;
65-
this.grpcContext = grpcContext;
6664
this.encoding = encoding;
6765
this.stream = stream;
6866
this.queue = new InboundBuffer<>(context);
@@ -187,23 +185,23 @@ protected void handleException(Throwable err) {
187185
end.tryFail(err);
188186
Handler<Throwable> handler = exceptionHandler;
189187
if (handler != null) {
190-
grpcContext.run(() -> handler.handle(err));
188+
context.dispatch(err, handler);
191189
}
192190
}
193191

194192
protected void handleEnd() {
195193
end.tryComplete();
196194
Handler<Void> handler = endHandler;
197195
if (handler != null) {
198-
grpcContext.run(() -> handler.handle(null));
196+
context.dispatch(handler);
199197
}
200198
}
201199

202200
protected void handleMessage(GrpcMessage msg) {
203201
last = msg;
204202
Handler<GrpcMessage> handler = messageHandler;
205203
if (handler != null) {
206-
grpcContext.run(() -> handler.handle(msg));
204+
context.dispatch(msg, messageHandler);
207205
}
208206
}
209207

vertx-grpc-context-storage/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
<dependency>
4343
<groupId>io.vertx</groupId>
4444
<artifactId>vertx-grpc-server</artifactId>
45-
<scope>test</scope>
45+
<!-- <scope>test</scope>-->
4646
</dependency>
4747
<dependency>
4848
<groupId>io.grpc</groupId>

0 commit comments

Comments
 (0)