Skip to content

Commit 1517882

Browse files
sy-leesvc-squareup-copybara
authored andcommitted
use a shadow header x-grpc-timeout-propagate in METRICS/PROPAGATE_ONLY
mode Notable changes: - In `METRICS_ONLY`, `PROPAGATE_ONLY` mode, do not set `grpc-timeout` header on the outgoing request as this causes the grpc client to actually timeout. This can be surprising to those that are only opting in for a propagation mode. Instead pass on a shadow header `x-grpc-timeout-propagate`, and the receiver can decide if they will continue to propagate or enforce it. - Choose the minimum of [requestDeadline, okHttpClient.callTimeout (only if it's non-zero)] when setting deadline on the outgoing request. Also record which was used in metrics. - Throw a custom `DeadlineExceededException` in the outbound interceptor, which if unhandled will bubble up as 504 or grpc-status=4 (DEADLINE_EXCEEDED). - Metrics: - Add more buckets to `deadline_duration_ms` metric distribution - Skip metrics for `livenesscheckaction` and `readinesscheckaction` GitOrigin-RevId: 756b1899a5a8b951de6db63e1a93894fb70785fa
1 parent 2ab73bd commit 1517882

File tree

9 files changed

+374
-116
lines changed

9 files changed

+374
-116
lines changed

misk/api/misk.api

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2516,6 +2516,13 @@ public final class misk/web/proxy/WebProxyEntryKt {
25162516
public static synthetic fun WebProxyEntry$default (Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lmisk/web/proxy/WebProxyEntry;
25172517
}
25182518

2519+
public final class misk/web/requestdeadlines/DeadlineExceededException : java/lang/RuntimeException {
2520+
public fun <init> ()V
2521+
public fun <init> (Ljava/lang/String;)V
2522+
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
2523+
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
2524+
}
2525+
25192526
public final class misk/web/requestdeadlines/DeadlinePropagationModule : misk/inject/KAbstractModule {
25202527
public fun <init> ()V
25212528
}

misk/src/main/kotlin/misk/client/DeadlinePropagationInterceptor.kt

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
package misk.client
22

3+
import com.squareup.wire.GrpcMethod
34
import jakarta.inject.Inject
45
import jakarta.inject.Singleton
5-
import misk.exceptions.GatewayTimeoutException
66
import misk.grpc.GrpcTimeoutMarshaller
77
import misk.scope.ActionScoped
88
import misk.web.RequestDeadlineMode
99
import misk.web.RequestDeadlinesConfig
1010
import misk.web.WebConfig
1111
import misk.web.interceptors.RequestDeadlineInterceptor
12-
import misk.web.mediatype.MediaTypes
12+
import misk.web.interceptors.RequestDeadlineInterceptor.Companion.CUSTOM_GRPC_TIMEOUT_PROPAGATE_HEADER
13+
import misk.web.requestdeadlines.DeadlineExceededException
1314
import misk.web.requestdeadlines.RequestDeadline
1415
import misk.web.requestdeadlines.RequestDeadlineMetrics
16+
import misk.web.requestdeadlines.RequestDeadlineMetrics.SourceLabel.OKHTTP_TIMEOUT
17+
import misk.web.requestdeadlines.RequestDeadlineMetrics.SourceLabel.PROPAGATED_DEADLINE
1518
import okhttp3.Interceptor
1619
import okhttp3.Request
1720
import okhttp3.Response
18-
import java.util.concurrent.TimeUnit
21+
import java.time.Duration
1922

2023
internal class DeadlinePropagationInterceptor(
2124
private val clientAction: ClientAction,
@@ -26,107 +29,132 @@ internal class DeadlinePropagationInterceptor(
2629

2730
override fun intercept(chain: Interceptor.Chain): Response {
2831
val requestDeadline = requestDeadlineActionScope.getIfInScope()
32+
val isGrpc = chain.request().tag(GrpcMethod::class.java) != null
2933

30-
// Handle case when no deadline is in scope
34+
// Handle case when no deadline is in scope, like if requests were being made from executor or coroutine threads
35+
// where the ActionScope has not been explicitly passed along
3136
if (requestDeadline?.remaining() == null) {
32-
return handleNoDeadlineInScope(chain)
37+
return handleNoDeadlineInScope(chain, isGrpc)
3338
}
3439

3540
// Always check deadline and emit metrics based on mode
3641
// At this point we know requestDeadline is not null (null case handled above)
3742
return when (requestDeadlinesConfig.mode) {
38-
RequestDeadlineMode.METRICS_ONLY -> handleDisabledMode(requestDeadline, chain)
39-
RequestDeadlineMode.PROPAGATE_ONLY -> handlePropagateOnlyMode(requestDeadline, chain)
40-
RequestDeadlineMode.ENFORCE_INBOUND -> handlePropagateOnlyMode(requestDeadline, chain)
41-
RequestDeadlineMode.ENFORCE_OUTBOUND, RequestDeadlineMode.ENFORCE_ALL -> handleEnforceMode(requestDeadline, chain)
43+
RequestDeadlineMode.METRICS_ONLY -> handleDisabledMode(requestDeadline, chain, isGrpc)
44+
RequestDeadlineMode.PROPAGATE_ONLY, RequestDeadlineMode.ENFORCE_INBOUND -> handlePropagateOnlyMode(requestDeadline, chain, isGrpc)
45+
RequestDeadlineMode.ENFORCE_OUTBOUND, RequestDeadlineMode.ENFORCE_ALL -> handleEnforceMode(requestDeadline, chain, isGrpc)
4246
}
4347
}
4448

45-
private fun handleNoDeadlineInScope(chain: Interceptor.Chain): Response {
46-
return when (requestDeadlinesConfig.mode) {
47-
RequestDeadlineMode.METRICS_ONLY -> {
48-
chain.proceed(chain.request())
49-
}
50-
else -> {
51-
// No RequestDeadline found in ActionScope, so propagate client.readTimeoutMillis as deadline
52-
val fallbackDeadlineMs: Long = chain.readTimeoutMillis().toLong()
53-
metrics.recordOutboundDeadlinePropagated(clientAction, fallbackDeadlineMs, chain.request())
54-
val newRequestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), fallbackDeadlineMs)
55-
chain.proceed(newRequestBuilder.build())
56-
}
49+
private fun handleNoDeadlineInScope(chain: Interceptor.Chain, isGrpc: Boolean): Response {
50+
metrics.recordNoDeadlineInScope(clientAction, isGrpc)
51+
52+
// For METRICS_ONLY mode or when no fallback deadline exists, proceed with original request
53+
if (requestDeadlinesConfig.mode == RequestDeadlineMode.METRICS_ONLY) {
54+
return chain.proceed(chain.request())
5755
}
56+
57+
val okhttpClientFallbackDeadline = maybeOkHttpClientCallTimeout(chain)
58+
?: return chain.proceed(chain.request())
59+
60+
val enforced = requestDeadlinesConfig.mode in setOf(RequestDeadlineMode.ENFORCE_OUTBOUND, RequestDeadlineMode.ENFORCE_ALL)
61+
metrics.recordOutboundDeadlinePropagated(clientAction, okhttpClientFallbackDeadline, isGrpc, OKHTTP_TIMEOUT)
62+
val newRequestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), okhttpClientFallbackDeadline, isGrpc, enforced)
63+
return chain.proceed(newRequestBuilder.build())
5864
}
5965

60-
private fun handleDisabledMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain): Response {
66+
private fun handleDisabledMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain, isGrpc: Boolean): Response {
6167
// Emit metrics, but do not propagate deadline headers or enforce
6268
if (requestDeadline.expired()) {
6369
metrics.recordOutboundDeadlineExceeded(
64-
clientAction,
65-
enforced = false,
66-
chain.request(),
67-
requestDeadline.expiredDuration().toMillis()
68-
)
70+
clientAction,
71+
enforced = false,
72+
isGrpc,
73+
requestDeadline.expiredDuration())
6974
}
7075
return chain.proceed(chain.request())
7176
}
7277

73-
private fun handlePropagateOnlyMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain): Response {
78+
private fun handlePropagateOnlyMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain, isGrpc: Boolean): Response {
7479
// Always emit metrics, propagate deadline headers, but do not enforce
80+
val enforced = false
7581
if (requestDeadline.expired()) {
7682
metrics.recordOutboundDeadlineExceeded(
7783
clientAction,
78-
enforced = false,
79-
chain.request(),
80-
requestDeadline.expiredDuration().toMillis()
84+
enforced,
85+
isGrpc,
86+
requestDeadline.expiredDuration()
8187
)
8288
// Deadline has expired, but config mode specifies it cannot be enforced. For this special case, omit deadline
8389
// headers, otherwise it will be 0 or a negative number. Let the downstream fallback to a default in its server
8490
// interceptor.
8591
return chain.proceed(chain.request())
8692
} else {
87-
val remainingMs = requestDeadline.remaining()!!.toMillis()
88-
metrics.recordOutboundDeadlinePropagated(clientAction, remainingMs, chain.request())
89-
val requestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), remainingMs)
93+
val (deadline, source) = determineEffectiveDeadline(requestDeadline, chain)
94+
metrics.recordOutboundDeadlinePropagated(clientAction, deadline, isGrpc, source)
95+
val requestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), deadline, isGrpc, enforced)
9096
return chain.proceed(requestBuilder.build())
9197
}
9298
}
9399

94-
private fun handleEnforceMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain): Response {
100+
private fun handleEnforceMode(requestDeadline: RequestDeadline, chain: Interceptor.Chain, isGrpc: Boolean): Response {
101+
val enforced = true
95102
if (requestDeadline.expired()) {
96103
metrics.recordOutboundDeadlineExceeded(
97104
clientAction,
98-
enforced = true,
99-
chain.request(),
100-
requestDeadline.expiredDuration().toMillis()
105+
enforced,
106+
isGrpc,
107+
requestDeadline.expiredDuration()
101108
)
102-
throw GatewayTimeoutException(
109+
throw DeadlineExceededException(
103110
"Deadline already expired, not initiating outbound call to ${chain.request().url}"
104111
)
105112
} else {
106-
val remainingMs = requestDeadline.remaining()!!.toMillis()
107-
metrics.recordOutboundDeadlinePropagated(clientAction, remainingMs, chain.request())
108-
val requestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), remainingMs)
113+
val (deadline, source) = determineEffectiveDeadline(requestDeadline, chain)
114+
metrics.recordOutboundDeadlinePropagated(clientAction, deadline, isGrpc, source)
115+
val requestBuilder = setRequestDeadlineHeadersOnOutbound(chain.request(), deadline, isGrpc, enforced)
109116
return chain.proceed(requestBuilder.build())
110117
}
111118
}
112119

113-
private fun setRequestDeadlineHeadersOnOutbound(request: Request, deadlineMs: Long): Request.Builder {
114-
val builder = request.newBuilder()
120+
/**
121+
* Determines the effective deadline by taking the minimum of the request deadline and OkHttp client timeout.
122+
* For OkHttpClient timeout, prefer callTimeout if it exists and > 0.
123+
* @return Pair of (deadline, source) where deadline is a Duration and source indicates which timeout was used
124+
*/
125+
private fun determineEffectiveDeadline(requestDeadline: RequestDeadline, chain: Interceptor.Chain): Pair<Duration, String> {
126+
val propagatedDeadline = requestDeadline.remaining()!!
127+
val okHttpClientTimeout = maybeOkHttpClientCallTimeout(chain)
128+
129+
return if (okHttpClientTimeout == null || propagatedDeadline <= okHttpClientTimeout) {
130+
propagatedDeadline to PROPAGATED_DEADLINE
131+
} else {
132+
okHttpClientTimeout to OKHTTP_TIMEOUT
133+
}
134+
}
115135

116-
// nb: Content-Type header not available yet at this point to distinguish http from grpc, so use "te"
117-
val isGrpcRequest = request.header("te")?.equals("trailers") == true
118-
119-
if (isGrpcRequest) {
120-
// gRPC request - only add gRPC timeout header
121-
if (request.headers.get(GrpcTimeoutMarshaller.TIMEOUT_KEY).isNullOrEmpty()) {
122-
builder.header(
123-
GrpcTimeoutMarshaller.TIMEOUT_KEY,
124-
GrpcTimeoutMarshaller.toAsciiString(TimeUnit.MILLISECONDS.toNanos(deadlineMs)),
125-
)
126-
}
136+
private fun maybeOkHttpClientCallTimeout(chain: Interceptor.Chain): Duration? {
137+
val callTimeoutNanos = chain.call().timeout().timeoutNanos()
138+
return Duration.ofNanos(callTimeoutNanos).takeIf { callTimeoutNanos != 0L }
139+
}
140+
141+
private fun setRequestDeadlineHeadersOnOutbound(
142+
request: Request,
143+
deadline: Duration,
144+
isGrpc: Boolean,
145+
enforced: Boolean
146+
): Request.Builder {
147+
val builder = request.newBuilder()
148+
if (isGrpc) {
149+
// gRPC request - use real header `grpc-timeout` for enforcing modes, shadow header for non-enforcing modes
150+
val grpcTimeoutHeader = if (enforced) GrpcTimeoutMarshaller.TIMEOUT_KEY else CUSTOM_GRPC_TIMEOUT_PROPAGATE_HEADER
151+
builder.header(
152+
grpcTimeoutHeader,
153+
GrpcTimeoutMarshaller.toAsciiString(deadline.toNanos()),
154+
)
127155
} else {
128-
// HTTP request - only add HTTP deadline header
129-
builder.header(RequestDeadlineInterceptor.HTTP_HEADER_ENVOY_DEADLINE, deadlineMs.toString())
156+
// HTTP request - only add HTTP deadline header using ISO8601 duration format
157+
builder.header(RequestDeadlineInterceptor.HTTP_HEADER_X_REQUEST_DEADLINE, deadline.toString())
130158
}
131159

132160
return builder

misk/src/main/kotlin/misk/web/MiskWebModule.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import misk.web.actions.StatusAction
4141
import misk.web.concurrencylimits.ConcurrencyLimiterFactory
4242
import misk.web.concurrencylimits.ConcurrencyLimitsModule
4343
import misk.web.exceptions.ActionExceptionLogLevelConfig
44+
import misk.web.exceptions.DeadlineExceededExceptionMapper
4445
import misk.web.exceptions.EofExceptionMapper
4546
import misk.web.exceptions.ExceptionHandlingInterceptor
4647
import misk.web.exceptions.ExceptionMapperModule
@@ -53,6 +54,7 @@ import misk.web.extractors.PathParamFeatureBinding
5354
import misk.web.extractors.QueryParamFeatureBinding
5455
import misk.web.extractors.RequestBodyException
5556
import misk.web.extractors.RequestBodyFeatureBinding
57+
import misk.web.requestdeadlines.DeadlineExceededException
5658
import misk.web.extractors.RequestCookieFeatureBinding
5759
import misk.web.extractors.RequestCookiesFeatureBinding
5860
import misk.web.extractors.RequestHeaderFeatureBinding
@@ -266,6 +268,7 @@ class MiskWebModule @JvmOverloads constructor(
266268
install(ExceptionMapperModule.create<IOException, IOExceptionMapper>())
267269
install(ExceptionMapperModule.create<EofException, EofExceptionMapper>())
268270
install(ExceptionMapperModule.create<RequestBodyException, RequestBodyExceptionMapper>())
271+
install(ExceptionMapperModule.create<DeadlineExceededException, DeadlineExceededExceptionMapper>())
269272

270273
// Register built-in feature bindings.
271274
multibind<FeatureBinding.Factory>().toInstance(PathParamFeatureBinding.Factory)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package misk.web.exceptions
2+
3+
import com.squareup.wire.GrpcStatus.Companion.DEADLINE_EXCEEDED
4+
import jakarta.inject.Inject
5+
import misk.client.HTTP_GATEWAY_TIMEOUT
6+
import misk.web.Response
7+
import misk.web.mediatype.MediaTypes
8+
import misk.web.requestdeadlines.DeadlineExceededException
9+
import misk.web.toResponseBody
10+
import okhttp3.Headers.Companion.headersOf
11+
import org.slf4j.event.Level
12+
13+
/**
14+
* Maps [DeadlineExceededException] to HTTP 504 Gateway Timeout responses.
15+
* This indicates the server was acting as a gateway and didn't receive a timely response.
16+
*/
17+
internal class DeadlineExceededExceptionMapper @Inject internal constructor() : ExceptionMapper<DeadlineExceededException> {
18+
override fun loggingLevel(th: DeadlineExceededException) = Level.WARN
19+
20+
override fun toResponse(th: DeadlineExceededException) = DEADLINE_EXCEEDED_RESPONSE
21+
22+
override fun toGrpcResponse(th: DeadlineExceededException): GrpcErrorResponse? {
23+
return GrpcErrorResponse(DEADLINE_EXCEEDED, th.message)
24+
}
25+
26+
companion object {
27+
val DEADLINE_EXCEEDED_RESPONSE = Response(
28+
"deadline exceeded".toResponseBody(),
29+
headersOf("Content-Type", MediaTypes.TEXT_PLAIN_UTF8),
30+
HTTP_GATEWAY_TIMEOUT
31+
)
32+
}
33+
}

misk/src/main/kotlin/misk/web/interceptors/RequestDeadlineInterceptor.kt

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,24 @@ internal class RequestDeadlineInterceptor private constructor(
102102

103103
return when (action.dispatchMechanism) {
104104
DispatchMechanism.GRPC -> {
105-
val grpcTimeoutString = httpCall.requestHeaders[GrpcTimeoutMarshaller.TIMEOUT_KEY]
106-
val grpcTimeoutNanos = grpcTimeoutString?.let { GrpcTimeoutMarshaller.parseAsciiString(it) }?.takeIf { it > 0 }
105+
val grpcTimeoutValue = httpCall.requestHeaders[GrpcTimeoutMarshaller.TIMEOUT_KEY]
106+
val grpcTimeoutNanos = grpcTimeoutValue?.let { GrpcTimeoutMarshaller.parseAsciiString(it) }?.takeIf { it > 0 }
107107

108-
if (grpcTimeoutNanos != null) {
109-
Duration.ofNanos(grpcTimeoutNanos) to SourceLabel.GRPC_TIMEOUT
108+
// "shadow" grpc headers used in METRICS_ONLY/PROPAGATE_ONLY mode to prevent GrpcClient actually hanging up on grpc-timeout
109+
val shadowGrpcTimeoutValue = httpCall.requestHeaders[CUSTOM_GRPC_TIMEOUT_PROPAGATE_HEADER]
110+
val shadowGrpcTimeoutNanos = shadowGrpcTimeoutValue?.let { GrpcTimeoutMarshaller.parseAsciiString(it) }?.takeIf { it > 0 }
111+
112+
// Prefer actual to shadow value, though only one of them should be set.
113+
val effectiveGrpcTimeout = grpcTimeoutNanos ?: shadowGrpcTimeoutNanos
114+
if (effectiveGrpcTimeout != null) {
115+
Duration.ofNanos(effectiveGrpcTimeout) to SourceLabel.GRPC_TIMEOUT
110116
} else {
111117
fallbackTimeout to fallbackSource
112118
}
113119
}
114120
else -> {
115121
// HTTP timeout logic with source tracking
116-
val xRequestDeadlineTimeout = parseXRequestDeadlineHeader(httpCall.requestHeaders[HTTP_HEADER_X_REQUEST_DEADLINE_SECONDS])
122+
val xRequestDeadlineTimeout = parseXRequestDeadlineHeader(httpCall.requestHeaders[HTTP_HEADER_X_REQUEST_DEADLINE])
117123
val envoyTimeoutMs = httpCall.requestHeaders[HTTP_HEADER_ENVOY_DEADLINE]?.toLongOrNull()
118124

119125
val validTimeouts = mutableListOf<Pair<Duration, String>>()
@@ -162,8 +168,12 @@ internal class RequestDeadlineInterceptor private constructor(
162168
}
163169

164170
companion object {
165-
const val HTTP_HEADER_X_REQUEST_DEADLINE_SECONDS = "X-Request-Deadline"
171+
const val HTTP_HEADER_X_REQUEST_DEADLINE = "x-request-deadline"
166172
const val HTTP_HEADER_ENVOY_DEADLINE = "x-envoy-expected-rq-timeout-ms" // milliseconds
173+
// custom header used in METRICS_ONLY/PROPAGATE_ONLY mode to propagate grpc-timeout value, to prevent
174+
// GrpcClient actually hanging up
175+
const val CUSTOM_GRPC_TIMEOUT_PROPAGATE_HEADER = "x-grpc-timeout-propagate"
176+
167177
const val MISK_REQUEST_DEADLINE_HEADER = "Misk-Request-Deadline"
168178

169179
const val DEADLINE_EXCEEDED_MESSAGE = "deadline exceeded: queued for too long"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package misk.web.requestdeadlines
2+
3+
/**
4+
* Exception thrown when a request deadline has been exceeded.
5+
* This is used internally by the deadline propagation system to enforce timeouts.
6+
*/
7+
class DeadlineExceededException @JvmOverloads constructor(
8+
message: String = "deadline exceeded",
9+
cause: Throwable? = null
10+
) : RuntimeException(message, cause)

0 commit comments

Comments
 (0)