77
88import io .grpc .CallOptions ;
99import io .grpc .ClientCall ;
10+ import io .grpc .Context ;
11+ import io .grpc .Deadline ;
1012import io .grpc .Metadata ;
1113import io .grpc .MethodDescriptor ;
1214import org .slf4j .Logger ;
@@ -64,32 +66,46 @@ public void close() {
6466 }
6567 }
6668
67- @ Override
68- public <ReqT , RespT > CompletableFuture <Result <RespT >> unaryCall (
69- MethodDescriptor <ReqT , RespT > method ,
70- GrpcRequestSettings settings ,
71- ReqT request
72- ) {
73- if (isClosed .get ()) {
74- return CompletableFuture .completedFuture (SHUTDOWN_RESULT .map (o -> null ));
75- }
76-
77- String traceId = settings .getTraceId ();
69+ private CallOptions prepareCallOptions (GrpcRequestSettings settings ) {
7870 CallOptions options = getAuthCallOptions ().getGrpcCallOptions ();
7971 if (settings .getDeadlineAfter () != 0 ) {
8072 final long now = System .nanoTime ();
8173 if (now >= settings .getDeadlineAfter ()) {
82- return CompletableFuture . completedFuture ( deadlineExpiredResult ( method , settings ));
74+ return null ; // DEADLINE
8375 }
8476 options = options .withDeadlineAfter (settings .getDeadlineAfter () - now , TimeUnit .NANOSECONDS );
8577 }
8678 if (settings .isDeadlineDisabled ()) {
8779 options = options .withDeadline (null );
8880 }
8981
82+ Deadline deadline = Context .current ().getDeadline ();
83+ if (deadline != null && deadline .isExpired ()) {
84+ return null ; // DEADLINE
85+ }
86+
87+ return options ;
88+ }
89+
90+ @ Override
91+ public <ReqT , RespT > CompletableFuture <Result <RespT >> unaryCall (
92+ MethodDescriptor <ReqT , RespT > method ,
93+ GrpcRequestSettings settings ,
94+ ReqT request
95+ ) {
96+ if (isClosed .get ()) {
97+ return CompletableFuture .completedFuture (SHUTDOWN_RESULT .map (o -> null ));
98+ }
99+
100+ String traceId = settings .getTraceId ();
90101 try {
91102 GrpcChannel channel = getChannel (settings );
92103 String endpoint = channel .getEndpoint ().getHostAndPort ();
104+ CallOptions options = prepareCallOptions (settings );
105+ if (options == null ) {
106+ return CompletableFuture .completedFuture (deadlineExpiredResult (method , settings ));
107+ }
108+
93109 ClientCall <ReqT , RespT > call = channel .getReadyChannel ().newCall (method , options );
94110 ChannelStatusHandler handler = new ChannelStatusHandler (channel , settings );
95111
@@ -121,21 +137,14 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
121137 }
122138
123139 String traceId = settings .getTraceId ();
124- CallOptions options = getAuthCallOptions ().getGrpcCallOptions ();
125- if (settings .getDeadlineAfter () != 0 ) {
126- final long now = System .nanoTime ();
127- if (now >= settings .getDeadlineAfter ()) {
128- return new EmptyStream <>(deadlineExpiredStatus (method , settings ));
129- }
130- options = options .withDeadlineAfter (settings .getDeadlineAfter () - now , TimeUnit .NANOSECONDS );
131- }
132- if (settings .isDeadlineDisabled ()) {
133- options = options .withDeadline (null );
134- }
135-
136140 try {
137141 GrpcChannel channel = getChannel (settings );
138142 String endpoint = channel .getEndpoint ().getHostAndPort ();
143+ CallOptions options = prepareCallOptions (settings );
144+ if (options == null ) {
145+ return new EmptyStream <>(deadlineExpiredStatus (method , settings ));
146+ }
147+
139148 ClientCall <ReqT , RespT > call = channel .getReadyChannel ().newCall (method , options );
140149 ChannelStatusHandler handler = new ChannelStatusHandler (channel , settings );
141150
@@ -159,7 +168,6 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
159168 }
160169 }
161170
162-
163171 @ Override
164172 public <ReqT , RespT > GrpcReadWriteStream <RespT , ReqT > readWriteStreamCall (
165173 MethodDescriptor <ReqT , RespT > method ,
@@ -170,21 +178,14 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
170178 }
171179
172180 String traceId = settings .getTraceId ();
173- CallOptions options = getAuthCallOptions ().getGrpcCallOptions ();
174- if (settings .getDeadlineAfter () != 0 ) {
175- final long now = System .nanoTime ();
176- if (now >= settings .getDeadlineAfter ()) {
177- return new EmptyStream <>(deadlineExpiredStatus (method , settings ));
178- }
179- options = options .withDeadlineAfter (settings .getDeadlineAfter () - now , TimeUnit .NANOSECONDS );
180- }
181- if (settings .isDeadlineDisabled ()) {
182- options = options .withDeadline (null );
183- }
184-
185181 try {
186182 GrpcChannel channel = getChannel (settings );
187183 String endpoint = channel .getEndpoint ().getHostAndPort ();
184+ CallOptions options = prepareCallOptions (settings );
185+ if (options == null ) {
186+ return new EmptyStream <>(deadlineExpiredStatus (method , settings ));
187+ }
188+
188189 ClientCall <ReqT , RespT > call = channel .getReadyChannel ().newCall (method , options );
189190 ChannelStatusHandler hdlr = new ChannelStatusHandler (channel , settings );
190191
0 commit comments