Skip to content

Commit aec6379

Browse files
authored
grpc: Add interceptor API (#484)
* grpc: Add client interceptor support Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add client interceptor support Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add server interceptor support Signed-off-by: Johannes Zottele <[email protected]> * grpc: Refactor server scope API Signed-off-by: Johannes Zottele <[email protected]> * grpc: Refactor client scope API Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add tests Signed-off-by: Johannes Zottele <[email protected]> * grpc: Rename GrpcTrailers to GrpcMetadata.kt Signed-off-by: Johannes Zottele <[email protected]> * grpc: Refactor cancel API in ClientCallScope to return Nothing Signed-off-by: Johannes Zottele <[email protected]> * grpc: Remove println Signed-off-by: Johannes Zottele <[email protected]> * grpc: Adjust metadata names Signed-off-by: Johannes Zottele <[email protected]> * grpc: Fix Ktor server constructor Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add documentation Signed-off-by: Johannes Zottele <[email protected]> * grpc: Adjust client/server DSL and provide documentation Signed-off-by: Johannes Zottele <[email protected]> * grpc: Fix race condition bug Signed-off-by: Johannes Zottele <[email protected]> * grpc: Fix context not set Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add multi interceptor tests Signed-off-by: Johannes Zottele <[email protected]> * grpc: Address PR comments Signed-off-by: Johannes Zottele <[email protected]> * grpc: Add client interceptor execution order test Signed-off-by: Johannes Zottele <[email protected]> * grpc: Address PR comments Signed-off-by: Johannes Zottele <[email protected]> * grpc: Fixing bug after rebase Signed-off-by: Johannes Zottele <[email protected]> * grpc: Fix default proto package in service name --------- Signed-off-by: Johannes Zottele <[email protected]>
1 parent 68887e1 commit aec6379

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1821
-400
lines changed

cinterop-c/MODULE.bazel.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compiler-plugin/compiler-plugin-backend/src/main/kotlin/kotlinx/rpc/codegen/extension/RpcDeclarationScanner.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.jetbrains.kotlin.ir.expressions.IrExpression
1818
import org.jetbrains.kotlin.ir.util.dumpKotlinLike
1919
import org.jetbrains.kotlin.ir.util.getAnnotation
2020
import org.jetbrains.kotlin.ir.util.hasDefaultValue
21+
import org.jetbrains.kotlin.ir.util.packageFqName
2122

2223
/**
2324
* This class scans user declared RPC service
@@ -31,7 +32,9 @@ internal object RpcDeclarationScanner {
3132
var stubClass: IrClass? = null
3233

3334
val grpcAnnotation = service.getAnnotation(RpcClassId.grpcAnnotation.asSingleFqName())
34-
val protoPackage = grpcAnnotation?.arguments?.getOrNull(0)?.asConstString() ?: ""
35+
// if the protoPackage is not set by the annotation, we use the service kotlin package name
36+
val protoPackage = grpcAnnotation?.arguments?.getOrNull(0)?.asConstString()
37+
?: service.packageFqName?.asString() ?: ""
3538

3639
val declarations = service.declarations.memoryOptimizedMap { declaration ->
3740
when (declaration) {
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.grpc
6+
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.rpc.grpc.internal.GrpcCallOptions
9+
import kotlinx.rpc.grpc.internal.MethodDescriptor
10+
11+
/**
12+
* The scope of a single outgoing gRPC client call observed by a [ClientInterceptor].
13+
*
14+
* An interceptor receives this scope instance for every call and can:
15+
* - Inspect the RPC [method] being invoked.
16+
* - Read or populate [requestHeaders] before the request is sent.
17+
* - Read [callOptions] that affect transport-level behavior.
18+
* - Register callbacks with [onHeaders] and [onClose] to observe response metadata and final status.
19+
* - Cancel the call early via [cancel].
20+
* - Continue the call by calling [proceed] with a (possibly transformed) request [Flow].
21+
* - Transform the response by modifying the returned [Flow].
22+
*
23+
* ```kt
24+
* val interceptor = object : ClientInterceptor {
25+
* override fun <Request, Response> ClientCallScope<Request, Response>.intercept(
26+
* request: Flow<Request>
27+
* ): Flow<Response> {
28+
* // Example: add a header before proceeding
29+
* requestHeaders[MyKeys.Authorization] = token
30+
*
31+
* // Example: observe response metadata
32+
* onHeaders { headers -> /* inspect headers */ }
33+
* onClose { status, trailers -> /* log status/trailers */ }
34+
*
35+
* // IMPORTANT: proceed forwards the call to the next interceptor/transport.
36+
* // If you do not call proceed, no request will be sent and the call is short-circuited.
37+
* return proceed(request)
38+
* }
39+
* }
40+
* ```
41+
*
42+
* @param Request the request message type of the RPC.
43+
* @param Response the response message type of the RPC.
44+
*/
45+
public interface ClientCallScope<Request, Response> {
46+
/** Descriptor of the RPC method (name, marshalling, type) being invoked. */
47+
public val method: MethodDescriptor<Request, Response>
48+
49+
/**
50+
* Outgoing request headers for this call.
51+
*
52+
* Interceptors may read and mutate this metadata
53+
* before calling [proceed] so the headers are sent to the server. Headers added after
54+
* the call has already been proceeded may not be reflected on the wire.
55+
*/
56+
public val requestHeaders: GrpcMetadata
57+
58+
/**
59+
* Transport/engine options used for this call (deadlines, compression, etc.).
60+
* Modifying this object is only possible before the call is proceeded.
61+
*/
62+
public val callOptions: GrpcCallOptions
63+
64+
/**
65+
* Register a callback invoked when the initial response headers are received.
66+
* Typical gRPC semantics guarantee headers are delivered at most once per call
67+
* and before the first message is received.
68+
*/
69+
public fun onHeaders(block: (responseHeaders: GrpcMetadata) -> Unit)
70+
71+
/**
72+
* Register a callback invoked when the call completes, successfully or not.
73+
* The final `status` and trailing `responseTrailers` are provided.
74+
*/
75+
public fun onClose(block: (status: Status, responseTrailers: GrpcMetadata) -> Unit)
76+
77+
/**
78+
* Cancel the call locally, providing a human-readable [message] and an optional [cause].
79+
* This method won't return and abort all further processing.
80+
*
81+
* We made cancel throw a [StatusException] instead of returning, so control flow is explicit and
82+
* race conditions between interceptors and the transport layer are avoided.
83+
*/
84+
public fun cancel(message: String, cause: Throwable? = null): Nothing
85+
86+
/**
87+
* Continue the invocation by forwarding it to the next interceptor or to the underlying transport.
88+
*
89+
* This function is the heart of an interceptor:
90+
* - It must be called to actually perform the RPC. If you never call [proceed], the request is not sent
91+
* and the call is effectively short-circuited by the interceptor.
92+
* - You may transform the [request] flow before passing it to [proceed] (e.g., logging, retry orchestration,
93+
* compression, metrics). The returned [Flow] yields response messages and can also be transformed
94+
* before being returned to the caller.
95+
* - Call [proceed] at most once per intercepted call. Calling it multiple times or after cancellation
96+
* is not supported.
97+
*/
98+
public fun proceed(request: Flow<Request>): Flow<Response>
99+
}
100+
101+
/**
102+
* Client-side interceptor for gRPC calls.
103+
*
104+
* Implementations can observe and modify client calls in a structured way. The primary entry point is the
105+
* [intercept] extension function on [ClientCallScope], which receives the inbound request [Flow] and must
106+
* call [ClientCallScope.proceed] to forward the call.
107+
*
108+
* Common use-cases include:
109+
* - Adding authentication or custom headers.
110+
* - Implementing logging/metrics.
111+
* - Observing headers/trailers and final status.
112+
* - Transforming request/response flows (e.g., mapping, buffering, throttling).
113+
*/
114+
public interface ClientInterceptor {
115+
/**
116+
* Intercept a client call.
117+
*
118+
* You can:
119+
* - Inspect [ClientCallScope.method] and [ClientCallScope.callOptions].
120+
* - Read or populate [ClientCallScope.requestHeaders].
121+
* - Register [ClientCallScope.onHeaders] and [ClientCallScope.onClose] callbacks.
122+
* - Transform the [request] flow or wrap the resulting response flow.
123+
*
124+
* IMPORTANT: [ClientCallScope.proceed] must eventually be called to actually execute the RPC and obtain
125+
* the response [Flow]. If [ClientCallScope.proceed] is omitted, the call will not reach the server.
126+
*/
127+
public fun <Request, Response> ClientCallScope<Request, Response>.intercept(
128+
request: Flow<Request>,
129+
): Flow<Response>
130+
131+
}

0 commit comments

Comments
 (0)