Skip to content

Commit 72990e3

Browse files
authored
Merge pull request #569 from alex268/master
Added validation of grpc context deadline before call execution
2 parents 8782c7e + a7d2cf6 commit 72990e3

File tree

1 file changed

+38
-37
lines changed

1 file changed

+38
-37
lines changed

core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import io.grpc.CallOptions;
99
import io.grpc.ClientCall;
10+
import io.grpc.Context;
11+
import io.grpc.Deadline;
1012
import io.grpc.Metadata;
1113
import io.grpc.MethodDescriptor;
1214
import org.slf4j.Logger;
@@ -64,32 +66,46 @@ public void close() {
6466
}
6567
}
6668

67-
@Override
68-
public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
69-
MethodDescriptor<ReqT, RespT> method,
70-
GrpcRequestSettings settings,
71-
ReqT request
72-
) {
73-
if (isClosed.get()) {
74-
return CompletableFuture.completedFuture(SHUTDOWN_RESULT.map(o -> null));
75-
}
76-
77-
String traceId = settings.getTraceId();
69+
private CallOptions prepareCallOptions(GrpcRequestSettings settings) {
7870
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
7971
if (settings.getDeadlineAfter() != 0) {
8072
final long now = System.nanoTime();
8173
if (now >= settings.getDeadlineAfter()) {
82-
return CompletableFuture.completedFuture(deadlineExpiredResult(method, settings));
74+
return null; // DEADLINE
8375
}
8476
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
8577
}
8678
if (settings.isDeadlineDisabled()) {
8779
options = options.withDeadline(null);
8880
}
8981

82+
Deadline deadline = Context.current().getDeadline();
83+
if (deadline != null && deadline.isExpired()) {
84+
return null; // DEADLINE
85+
}
86+
87+
return options;
88+
}
89+
90+
@Override
91+
public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
92+
MethodDescriptor<ReqT, RespT> method,
93+
GrpcRequestSettings settings,
94+
ReqT request
95+
) {
96+
if (isClosed.get()) {
97+
return CompletableFuture.completedFuture(SHUTDOWN_RESULT.map(o -> null));
98+
}
99+
100+
String traceId = settings.getTraceId();
90101
try {
91102
GrpcChannel channel = getChannel(settings);
92103
String endpoint = channel.getEndpoint().getHostAndPort();
104+
CallOptions options = prepareCallOptions(settings);
105+
if (options == null) {
106+
return CompletableFuture.completedFuture(deadlineExpiredResult(method, settings));
107+
}
108+
93109
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
94110
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
95111

@@ -121,21 +137,14 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
121137
}
122138

123139
String traceId = settings.getTraceId();
124-
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
125-
if (settings.getDeadlineAfter() != 0) {
126-
final long now = System.nanoTime();
127-
if (now >= settings.getDeadlineAfter()) {
128-
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
129-
}
130-
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
131-
}
132-
if (settings.isDeadlineDisabled()) {
133-
options = options.withDeadline(null);
134-
}
135-
136140
try {
137141
GrpcChannel channel = getChannel(settings);
138142
String endpoint = channel.getEndpoint().getHostAndPort();
143+
CallOptions options = prepareCallOptions(settings);
144+
if (options == null) {
145+
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
146+
}
147+
139148
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
140149
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);
141150

@@ -159,7 +168,6 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
159168
}
160169
}
161170

162-
163171
@Override
164172
public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
165173
MethodDescriptor<ReqT, RespT> method,
@@ -170,21 +178,14 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
170178
}
171179

172180
String traceId = settings.getTraceId();
173-
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
174-
if (settings.getDeadlineAfter() != 0) {
175-
final long now = System.nanoTime();
176-
if (now >= settings.getDeadlineAfter()) {
177-
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
178-
}
179-
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
180-
}
181-
if (settings.isDeadlineDisabled()) {
182-
options = options.withDeadline(null);
183-
}
184-
185181
try {
186182
GrpcChannel channel = getChannel(settings);
187183
String endpoint = channel.getEndpoint().getHostAndPort();
184+
CallOptions options = prepareCallOptions(settings);
185+
if (options == null) {
186+
return new EmptyStream<>(deadlineExpiredStatus(method, settings));
187+
}
188+
188189
ClientCall<ReqT, RespT> call = channel.getReadyChannel().newCall(method, options);
189190
ChannelStatusHandler hdlr = new ChannelStatusHandler(channel, settings);
190191

0 commit comments

Comments
 (0)