Skip to content

Commit 796d4eb

Browse files
committed
grpc: Add documentation
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 7dc4db6 commit 796d4eb

File tree

6 files changed

+206
-17
lines changed

6 files changed

+206
-17
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ClientInterceptor.kt

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,118 @@ import kotlinx.coroutines.flow.Flow
88
import kotlinx.rpc.grpc.internal.GrpcCallOptions
99
import kotlinx.rpc.grpc.internal.MethodDescriptor
1010

11+
/**
12+
* The scope of a single outgoing gRPC client call observed by a [ClientInterceptor].
13+
*
14+
* An interceptor receives this scope instance for every call and can:
15+
* - Inspect the RPC [method] being invoked.
16+
* - Read or populate [requestHeaders] before the request is sent.
17+
* - Read [callOptions] that affect transport-level behavior.
18+
* - Register callbacks with [onHeaders] and [onClose] to observe response metadata and final status.
19+
* - Cancel the call early via [cancel].
20+
* - Continue the call by calling [proceed] with a (possibly transformed) request [Flow].
21+
* - Transform the response by modifying the returned [Flow].
22+
*
23+
* ```kt
24+
* val interceptor = object : ClientInterceptor {
25+
* override fun <Request, Response> ClientCallScope<Request, Response>.intercept(
26+
* request: Flow<Request>
27+
* ): Flow<Response> {
28+
* // Example: add a header before proceeding
29+
* requestHeaders[MyKeys.Authorization] = token
30+
*
31+
* // Example: observe response metadata
32+
* onHeaders { headers -> /* inspect headers */ }
33+
* onClose { status, trailers -> /* log status/trailers */ }
34+
*
35+
* // IMPORTANT: proceed forwards the call to the next interceptor/transport.
36+
* // If you do not call proceed, no request will be sent and the call is short-circuited.
37+
* return proceed(request)
38+
* }
39+
* }
40+
* ```
41+
*
42+
* @param Request the request message type of the RPC.
43+
* @param Response the response message type of the RPC.
44+
*/
1145
public interface ClientCallScope<Request, Response> {
46+
/** Descriptor of the RPC method (name, marshalling, type) being invoked. */
1247
public val method: MethodDescriptor<Request, Response>
48+
49+
/**
50+
* Outgoing request headers for this call.
51+
*
52+
* Interceptors may read and mutate this metadata
53+
* before calling [proceed] so the headers are sent to the server. Headers added after
54+
* the call has already been proceeded may not be reflected on the wire.
55+
*/
1356
public val requestHeaders: GrpcMetadata
57+
58+
/**
59+
* Transport/engine options used for this call (deadlines, compression, etc.).
60+
* Modifying this object is only possible before the call is proceeded.
61+
*/
1462
public val callOptions: GrpcCallOptions
63+
64+
/**
65+
* Register a callback invoked when the initial response headers are received.
66+
* Typical gRPC semantics guarantee headers are delivered at most once per call
67+
* and before the first message is received.
68+
*/
1569
public fun onHeaders(block: (responseHeaders: GrpcMetadata) -> Unit)
16-
public fun onClose(block: (closeStatus: Status, responseTrailers: GrpcMetadata) -> Unit)
70+
71+
/**
72+
* Register a callback invoked when the call completes, successfully or not.
73+
* The final `status` and trailing `responseTrailers` are provided.
74+
*/
75+
public fun onClose(block: (status: Status, responseTrailers: GrpcMetadata) -> Unit)
76+
77+
/**
78+
* Cancel the call locally, providing a human-readable [message] and an optional [cause].
79+
* This method won't return and abort all further processing.
80+
*/
1781
public fun cancel(message: String, cause: Throwable? = null): Nothing
82+
83+
/**
84+
* Continue the invocation by forwarding it to the next interceptor or to the underlying transport.
85+
*
86+
* This function is the heart of an interceptor:
87+
* - It must be called to actually perform the RPC. If you never call [proceed], the request is not sent
88+
* and the call is effectively short-circuited by the interceptor.
89+
* - You may transform the [request] flow before passing it to [proceed] (e.g., logging, retry orchestration,
90+
* compression, metrics). The returned [Flow] yields response messages and can also be transformed
91+
* before being returned to the caller.
92+
* - Call [proceed] at most once per intercepted call. Calling it multiple times or after cancellation
93+
* is not supported.
94+
*/
1895
public fun proceed(request: Flow<Request>): Flow<Response>
1996
}
2097

98+
/**
99+
* Client-side interceptor for gRPC calls.
100+
*
101+
* Implementations can observe and modify client calls in a structured way. The primary entry point is the
102+
* [intercept] extension function on [ClientCallScope], which receives the inbound request [Flow] and must
103+
* call [ClientCallScope.proceed] to forward the call.
104+
*
105+
* Common use-cases include:
106+
* - Adding authentication or custom headers.
107+
* - Implementing logging/metrics.
108+
* - Observing headers/trailers and final status.
109+
* - Transforming request/response flows (e.g., mapping, buffering, throttling).
110+
*/
21111
public interface ClientInterceptor {
22-
23112
/**
24-
* Intercepts and transforms the flow of requests and responses in a client call.
25-
* An interceptor can throw an exception at any time to cancel the call.
113+
* Intercept a client call.
26114
*
27-
* The interceptor must ensure that it emits an expected number of values.
28-
* E.g. if the intercepted method is a unary call, the interceptor's returned flow must emit exactly one value.
115+
* You can:
116+
* - Inspect [ClientCallScope.method] and [ClientCallScope.callOptions].
117+
* - Read or populate [ClientCallScope.requestHeaders].
118+
* - Register [ClientCallScope.onHeaders] and [ClientCallScope.onClose] callbacks.
119+
* - Transform the [request] flow or wrap the resulting response flow.
29120
*
30-
* @param this The scope of the client call, providing context and methods for managing
31-
* the call lifecycle and metadata.
32-
* @param request A flow of requests to be sent to the server.
33-
* @return A flow of responses received from the server.
121+
* IMPORTANT: [ClientCallScope.proceed] must eventually be called to actually execute the RPC and obtain
122+
* the response [Flow]. If [ClientCallScope.proceed] is omitted, the call will not reach the server.
34123
*/
35124
public fun <Request, Response> ClientCallScope<Request, Response>.intercept(
36125
request: Flow<Request>,

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerInterceptor.kt

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,120 @@ package kotlinx.rpc.grpc
66

77
import kotlinx.coroutines.flow.Flow
88
import kotlinx.coroutines.flow.FlowCollector
9+
import kotlinx.rpc.grpc.internal.GrpcContext
910
import kotlinx.rpc.grpc.internal.MethodDescriptor
1011

12+
/**
13+
* Th scope of a single incoming gRPC server call observed by a [ServerInterceptor].
14+
*
15+
* An interceptor receives this scope instance for every RPC invocation arriving to the server and can:
16+
* - Inspect the target RPC [method].
17+
* - Read client-provided [requestHeaders].
18+
* - Populate [responseHeaders] (sent before the first response message) and [responseTrailers]
19+
* (sent when the call completes).
20+
* - Register a completion callback with [onClose].
21+
* - Abort the call early with [close].
22+
* - Continue handling by calling [proceed] with the inbound request [Flow] and optionally transform
23+
* the returned response [Flow].
24+
*
25+
* @param Request the request message type of the RPC.
26+
* @param Response the response message type of the RPC.
27+
*/
1128
public interface ServerCallScope<Request, Response> {
29+
/** Descriptor of the RPC method (name, marshalling, type) being executed. */
1230
public val method: MethodDescriptor<Request, Response>
31+
32+
/** Metadata received from the client with the initial request headers. Read-only from the server perspective. */
1333
public val requestHeaders: GrpcMetadata
34+
35+
/**
36+
* Initial response headers to be sent to the client.
37+
* Interceptors and handlers may add entries before the first response element is emitted
38+
* (i.e., before proceeding or before producing output), otherwise headers might have already been sent.
39+
*/
1440
public val responseHeaders: GrpcMetadata
41+
42+
/**
43+
* Trailing metadata to be sent with the final status when the call completes.
44+
* Interceptors can add diagnostics or custom metadata here.
45+
*/
1546
public val responseTrailers: GrpcMetadata
1647

48+
/**
49+
* The [GrpcContext] associated with this call.
50+
*
51+
* It can be used by the interceptor to provide call-scoped information about
52+
* the current call, such as the identity of the caller or the current authentication state.
53+
*/
54+
public val grpcContext: GrpcContext
55+
56+
/**
57+
* Register a callback invoked when the call is closed (successfully or exceptionally).
58+
* Provides the final [Status] and the sent [GrpcMetadata] trailers.
59+
*/
1760
public fun onClose(block: (Status, GrpcMetadata) -> Unit)
61+
62+
/**
63+
* Immediately terminate the call with the given [status] and optional [trailers].
64+
*
65+
* This method does not return (declared as [Nothing]). After calling it, no further messages will be processed
66+
* or sent. Prefer setting [responseHeaders]/[responseTrailers] before closing if you need to include metadata.
67+
*/
1868
public fun close(status: Status, trailers: GrpcMetadata = GrpcMetadata()): Nothing
69+
70+
/**
71+
* Continue processing by forwarding the request to the next interceptor or the actual service implementation.
72+
*
73+
* IMPORTANT:
74+
* - You must call [proceed] exactly once to actually handle the RPC; otherwise, the call will be short-circuited
75+
* and the service method will not be invoked.
76+
* - You may transform the incoming [request] flow (e.g., validation, logging, metering) before passing it to
77+
* [proceed]. You may also transform the resulting response [Flow] before returning it to the framework.
78+
* - The interceptor must ensure to provide and return a valid number of messages, depending on the method type.
79+
* - The interceptor must not throw an exception. Use [close] to terminate the call with an error.
80+
*/
1981
public fun proceed(request: Flow<Request>): Flow<Response>
2082

83+
/**
84+
* Convenience for flow builders: proceeds with [request] and emits the resulting response elements into this
85+
* [FlowCollector]. Useful inside `flow {}` blocks within interceptors.
86+
*/
2187
public suspend fun FlowCollector<Response>.proceedFlow(request: Flow<Request>) {
2288
proceed(request).collect {
2389
emit(it)
2490
}
2591
}
2692
}
2793

94+
/**
95+
* Server-side interceptor for gRPC calls.
96+
*
97+
* Implementations can observe and modify server handling in a structured way. The entry point is the
98+
* [intercept] extension function on [ServerCallScope], which receives the inbound request [Flow] and must
99+
* call [ServerCallScope.proceed] to forward the call to the next interceptor or the target service method.
100+
*
101+
* Common use-cases include:
102+
* - Authentication/authorization checks and context propagation.
103+
* - Setting response headers and trailers.
104+
* - Structured logging and metrics.
105+
* - Transforming request/response flows (e.g., validation, mapping, throttling).
106+
*
107+
* See ServerInterceptorTest for practical usage patterns.
108+
*/
28109
public interface ServerInterceptor {
110+
/**
111+
* Intercept a server call.
112+
*
113+
* You can:
114+
* - Inspect [ServerCallScope.method].
115+
* - Read [ServerCallScope.requestHeaders] and populate [ServerCallScope.responseHeaders]/[ServerCallScope.responseTrailers].
116+
* - Register [ServerCallScope.onClose] callbacks.
117+
* - Transform the [request] flow or wrap the resulting response flow.
118+
* - Append information to the [ServerCallScope.grpcContext].
119+
*
120+
* IMPORTANT: You must eventually call [ServerCallScope.proceed] to actually invoke the service logic and produce
121+
* the response [Flow]. If [ServerCallScope.proceed] is omitted, the call will never reach the service.
122+
*/
29123
public fun <Request, Response> ServerCallScope<Request, Response>.intercept(
30124
request: Flow<Request>,
31125
): Flow<Response>

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ package kotlinx.rpc.grpc.internal
66

77
import kotlin.coroutines.CoroutineContext
88

9-
internal expect class GrpcContext
9+
public expect class GrpcContext
10+
1011
internal expect val CurrentGrpcContext: GrpcContext
1112

1213
internal expect class GrpcContextElement : CoroutineContext.Element {
14+
val grpcContext: GrpcContext
15+
1316
companion object Key : CoroutineContext.Key<GrpcContextElement> {
1417
fun current(): GrpcContextElement
1518
}

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,17 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
157157
}
158158
}
159159

160+
val context = GrpcContextElement.current()
160161
val serverCallScope = ServerCallScopeImpl(
161162
method = descriptor,
162163
interceptors = interceptors,
163164
implementation = implementation,
164165
requestHeaders = requestHeaders,
165166
serverCall = handler,
167+
grpcContext = context.grpcContext,
166168
)
167169

168-
val rpcJob = launch(GrpcContextElement.current()) {
170+
val rpcJob = launch() {
169171
val mutex = Mutex()
170172
val headersSent = AtomicBoolean(false) // enforces only sending headers once
171173
val failure = runCatching {
@@ -273,6 +275,7 @@ private class ServerCallScopeImpl<Request, Response>(
273275
val implementation: (Flow<Request>) -> Flow<Response>,
274276
override val requestHeaders: GrpcMetadata,
275277
val serverCall: ServerCall<Request, Response>,
278+
override val grpcContext: GrpcContext,
276279
) : ServerCallScope<Request, Response> {
277280

278281
override val responseHeaders: GrpcMetadata = GrpcMetadata()

grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import io.grpc.Context
88
import kotlinx.coroutines.ThreadContextElement
99
import kotlin.coroutines.CoroutineContext
1010

11-
internal actual typealias GrpcContext = Context
11+
public actual typealias GrpcContext = Context
1212

1313
internal actual val CurrentGrpcContext: GrpcContext
1414
get() = GrpcContext.current()
1515

16-
internal actual class GrpcContextElement(private val grpcContext: GrpcContext) : ThreadContextElement<GrpcContext> {
16+
internal actual class GrpcContextElement(actual val grpcContext: GrpcContext) : ThreadContextElement<GrpcContext> {
1717
actual companion object Key : CoroutineContext.Key<GrpcContextElement> {
1818
actual fun current(): GrpcContextElement = GrpcContextElement(CurrentGrpcContext)
1919
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.native.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ package kotlinx.rpc.grpc.internal
66

77
import kotlin.coroutines.CoroutineContext
88

9-
internal actual class GrpcContext
9+
public actual class GrpcContext
1010

1111
private val currentGrpcContext = GrpcContext()
1212

1313
internal actual val CurrentGrpcContext: GrpcContext
1414
get() = currentGrpcContext
1515

16-
internal actual class GrpcContextElement : CoroutineContext.Element {
16+
internal actual class GrpcContextElement(actual val grpcContext: GrpcContext) : CoroutineContext.Element {
1717
actual override val key: CoroutineContext.Key<GrpcContextElement>
1818
get() = Key
1919

2020
actual companion object Key : CoroutineContext.Key<GrpcContextElement> {
2121
actual fun current(): GrpcContextElement {
22-
return GrpcContextElement()
22+
return GrpcContextElement(currentGrpcContext)
2323
}
2424
}
2525
}

0 commit comments

Comments
 (0)