1+ /*
2+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.rpc.grpc
6+
7+ import kotlinx.coroutines.flow.Flow
8+ import kotlinx.rpc.grpc.internal.GrpcCallOptions
9+ import kotlinx.rpc.grpc.internal.MethodDescriptor
10+
11+ /* *
12+ * The scope of a single outgoing gRPC client call observed by a [ClientInterceptor].
13+ *
14+ * An interceptor receives this scope instance for every call and can:
15+ * - Inspect the RPC [method] being invoked.
16+ * - Read or populate [requestHeaders] before the request is sent.
17+ * - Read [callOptions] that affect transport-level behavior.
18+ * - Register callbacks with [onHeaders] and [onClose] to observe response metadata and final status.
19+ * - Cancel the call early via [cancel].
20+ * - Continue the call by calling [proceed] with a (possibly transformed) request [Flow].
21+ * - Transform the response by modifying the returned [Flow].
22+ *
23+ * ```kt
24+ * val interceptor = object : ClientInterceptor {
25+ * override fun <Request, Response> ClientCallScope<Request, Response>.intercept(
26+ * request: Flow<Request>
27+ * ): Flow<Response> {
28+ * // Example: add a header before proceeding
29+ * requestHeaders[MyKeys.Authorization] = token
30+ *
31+ * // Example: observe response metadata
32+ * onHeaders { headers -> /* inspect headers */ }
33+ * onClose { status, trailers -> /* log status/trailers */ }
34+ *
35+ * // IMPORTANT: proceed forwards the call to the next interceptor/transport.
36+ * // If you do not call proceed, no request will be sent and the call is short-circuited.
37+ * return proceed(request)
38+ * }
39+ * }
40+ * ```
41+ *
42+ * @param Request the request message type of the RPC.
43+ * @param Response the response message type of the RPC.
44+ */
45+ public interface ClientCallScope <Request , Response > {
46+ /* * Descriptor of the RPC method (name, marshalling, type) being invoked. */
47+ public val method: MethodDescriptor <Request , Response >
48+
49+ /* *
50+ * Outgoing request headers for this call.
51+ *
52+ * Interceptors may read and mutate this metadata
53+ * before calling [proceed] so the headers are sent to the server. Headers added after
54+ * the call has already been proceeded may not be reflected on the wire.
55+ */
56+ public val requestHeaders: GrpcMetadata
57+
58+ /* *
59+ * Transport/engine options used for this call (deadlines, compression, etc.).
60+ * Modifying this object is only possible before the call is proceeded.
61+ */
62+ public val callOptions: GrpcCallOptions
63+
64+ /* *
65+ * Register a callback invoked when the initial response headers are received.
66+ * Typical gRPC semantics guarantee headers are delivered at most once per call
67+ * and before the first message is received.
68+ */
69+ public fun onHeaders (block : (responseHeaders: GrpcMetadata ) -> Unit )
70+
71+ /* *
72+ * Register a callback invoked when the call completes, successfully or not.
73+ * The final `status` and trailing `responseTrailers` are provided.
74+ */
75+ public fun onClose (block : (status: Status , responseTrailers: GrpcMetadata ) -> Unit )
76+
77+ /* *
78+ * Cancel the call locally, providing a human-readable [message] and an optional [cause].
79+ * This method won't return and abort all further processing.
80+ *
81+ * We made cancel throw a [StatusException] instead of returning, so control flow is explicit and
82+ * race conditions between interceptors and the transport layer are avoided.
83+ */
84+ public fun cancel (message : String , cause : Throwable ? = null): Nothing
85+
86+ /* *
87+ * Continue the invocation by forwarding it to the next interceptor or to the underlying transport.
88+ *
89+ * This function is the heart of an interceptor:
90+ * - It must be called to actually perform the RPC. If you never call [proceed], the request is not sent
91+ * and the call is effectively short-circuited by the interceptor.
92+ * - You may transform the [request] flow before passing it to [proceed] (e.g., logging, retry orchestration,
93+ * compression, metrics). The returned [Flow] yields response messages and can also be transformed
94+ * before being returned to the caller.
95+ * - Call [proceed] at most once per intercepted call. Calling it multiple times or after cancellation
96+ * is not supported.
97+ */
98+ public fun proceed (request : Flow <Request >): Flow <Response >
99+ }
100+
101+ /* *
102+ * Client-side interceptor for gRPC calls.
103+ *
104+ * Implementations can observe and modify client calls in a structured way. The primary entry point is the
105+ * [intercept] extension function on [ClientCallScope], which receives the inbound request [Flow] and must
106+ * call [ClientCallScope.proceed] to forward the call.
107+ *
108+ * Common use-cases include:
109+ * - Adding authentication or custom headers.
110+ * - Implementing logging/metrics.
111+ * - Observing headers/trailers and final status.
112+ * - Transforming request/response flows (e.g., mapping, buffering, throttling).
113+ */
114+ public interface ClientInterceptor {
115+ /* *
116+ * Intercept a client call.
117+ *
118+ * You can:
119+ * - Inspect [ClientCallScope.method] and [ClientCallScope.callOptions].
120+ * - Read or populate [ClientCallScope.requestHeaders].
121+ * - Register [ClientCallScope.onHeaders] and [ClientCallScope.onClose] callbacks.
122+ * - Transform the [request] flow or wrap the resulting response flow.
123+ *
124+ * IMPORTANT: [ClientCallScope.proceed] must eventually be called to actually execute the RPC and obtain
125+ * the response [Flow]. If [ClientCallScope.proceed] is omitted, the call will not reach the server.
126+ */
127+ public fun <Request , Response > ClientCallScope <Request , Response >.intercept (
128+ request : Flow <Request >,
129+ ): Flow <Response >
130+
131+ }
0 commit comments