Skip to content

Commit 2319098

Browse files
committed
grpc: Add client interceptor support
Signed-off-by: Johannes Zottele <[email protected]>
1 parent a284423 commit 2319098

File tree

16 files changed

+485
-168
lines changed

16 files changed

+485
-168
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.grpc
6+
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.rpc.grpc.internal.GrpcCallOptions
9+
import kotlinx.rpc.grpc.internal.MethodDescriptor
10+
11+
/**
12+
* Represents a client call scope within a coroutine context, providing access to properties and
13+
* functions required to manage the lifecycle and behavior of a client-side remote procedure call
14+
* (RPC) in a coroutine-based environment.
15+
*
16+
* @param Request the type of the request message sent to the gRPC server.
17+
* @param Response the type of the response message received from the gRPC server.
18+
*/
19+
public interface ClientCallScope<Request, Response> {
20+
public val method: MethodDescriptor<Request, Response>
21+
public val metadata: GrpcTrailers
22+
public val callOptions: GrpcCallOptions
23+
public fun onHeaders(block: (GrpcTrailers) -> Unit)
24+
public fun onClose(block: (Status, GrpcTrailers) -> Unit)
25+
public fun cancel(message: String, cause: Throwable? = null)
26+
public fun proceed(request: Flow<Request>): Flow<Response>
27+
}
28+
29+
public interface ClientInterceptor {
30+
31+
/**
32+
* Intercepts and transforms the flow of requests and responses in a client call.
33+
* An interceptor can throw an exception at any time to cancel the call.
34+
*
35+
* @param scope The scope of the client call, providing context and methods for managing
36+
* the call lifecycle and metadata.
37+
* @param request A flow of requests to be sent to the server.
38+
* @return A flow of responses received from the server.
39+
*/
40+
public fun <Request, Response> intercept(
41+
scope: ClientCallScope<Request, Response>,
42+
request: Flow<Request>,
43+
): Flow<Response>
44+
45+
}

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ private typealias RequestClient = Any
3232
* @field channel The [ManagedChannel] used to communicate with remote gRPC services.
3333
*/
3434
public class GrpcClient internal constructor(
35-
private val channel: ManagedChannel,
35+
internal val channel: ManagedChannel,
3636
messageCodecResolver: MessageCodecResolver = EmptyMessageCodecResolver,
37+
internal val interceptors: List<ClientInterceptor>,
3738
) : RpcClient {
3839
private val delegates = RpcInternalConcurrentHashMap<String, GrpcServiceDelegate>()
3940
private val messageCodecResolver = messageCodecResolver + ThrowingMessageCodecResolver
@@ -58,15 +59,13 @@ public class GrpcClient internal constructor(
5859

5960
return when (methodDescriptor.type) {
6061
MethodType.UNARY -> unaryRpc(
61-
channel = channel.platformApi,
6262
descriptor = methodDescriptor,
6363
request = request,
6464
callOptions = callOptions,
6565
trailers = trailers,
6666
)
6767

6868
MethodType.CLIENT_STREAMING -> @Suppress("UNCHECKED_CAST") clientStreamingRpc(
69-
channel = channel.platformApi,
7069
descriptor = methodDescriptor,
7170
requests = request as Flow<RequestClient>,
7271
callOptions = callOptions,
@@ -83,15 +82,13 @@ public class GrpcClient internal constructor(
8382

8483
when (methodDescriptor.type) {
8584
MethodType.SERVER_STREAMING -> serverStreamingRpc(
86-
channel = channel.platformApi,
8785
descriptor = methodDescriptor,
8886
request = request,
8987
callOptions = callOptions,
9088
trailers = trailers,
9189
)
9290

9391
MethodType.BIDI_STREAMING -> @Suppress("UNCHECKED_CAST") bidirectionalStreamingRpc(
94-
channel = channel.platformApi,
9592
descriptor = methodDescriptor,
9693
requests = request as Flow<RequestClient>,
9794
callOptions = callOptions,
@@ -131,23 +128,57 @@ public class GrpcClient internal constructor(
131128
public fun GrpcClient(
132129
hostname: String,
133130
port: Int,
134-
credentials: ClientCredentials? = null,
135-
messageCodecResolver: MessageCodecResolver = EmptyMessageCodecResolver,
136-
configure: ManagedChannelBuilder<*>.() -> Unit = {},
131+
configure: GrpcClientConfiguration.() -> Unit = {},
137132
): GrpcClient {
138-
val channel = ManagedChannelBuilder(hostname, port, credentials).apply(configure).buildChannel()
139-
return GrpcClient(channel, messageCodecResolver)
133+
val config = GrpcClientConfiguration().apply(configure)
134+
return GrpcClient(ManagedChannelBuilder(hostname, port, config.credentials), config)
140135
}
141136

142137
/**
143138
* Constructor function for the [GrpcClient] class.
144139
*/
145140
public fun GrpcClient(
146141
target: String,
147-
credentials: ClientCredentials? = null,
148-
messageCodecResolver: MessageCodecResolver = EmptyMessageCodecResolver,
149-
configure: ManagedChannelBuilder<*>.() -> Unit = {},
142+
configure: GrpcClientConfiguration.() -> Unit = {},
143+
): GrpcClient {
144+
val config = GrpcClientConfiguration().apply(configure)
145+
return GrpcClient(ManagedChannelBuilder(target, config.credentials), config)
146+
}
147+
148+
private fun GrpcClient(
149+
builder: ManagedChannelBuilder<*>,
150+
config: GrpcClientConfiguration,
150151
): GrpcClient {
151-
val channel = ManagedChannelBuilder(target, credentials).apply(configure).buildChannel()
152-
return GrpcClient(channel, messageCodecResolver)
152+
val channel = builder.apply {
153+
config.overrideAuthority?.let { overrideAuthority(it) }
154+
}.buildChannel()
155+
return GrpcClient(channel, config.messageCodecResolver, config.interceptors)
153156
}
157+
158+
public class GrpcClientConfiguration internal constructor() {
159+
internal var messageCodecResolver: MessageCodecResolver = EmptyMessageCodecResolver
160+
internal var credentials: ClientCredentials? = null
161+
internal var overrideAuthority: String? = null
162+
internal val interceptors: MutableList<ClientInterceptor> = mutableListOf()
163+
164+
public fun usePlaintext() {
165+
credentials = createInsecureClientCredentials()
166+
}
167+
168+
public fun useCredentials(credentials: ClientCredentials) {
169+
this@GrpcClientConfiguration.credentials = credentials
170+
}
171+
172+
public fun overrideAuthority(authority: String) {
173+
overrideAuthority = authority
174+
}
175+
176+
public fun useMessageCodecResolver(messageCodecResolver: MessageCodecResolver) {
177+
this.messageCodecResolver = messageCodecResolver
178+
}
179+
180+
public fun intercept(vararg interceptors: ClientInterceptor) {
181+
this.interceptors.addAll(interceptors)
182+
}
183+
184+
}

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public interface ManagedChannel {
6868
* Builder class for [ManagedChannel].
6969
*/
7070
public expect abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>> {
71+
72+
// TODO: Not used anymore
7173
public fun usePlaintext(): T
7274

7375
public abstract fun overrideAuthority(authority: String): T

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/credentials.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public expect abstract class ServerCredentials
1010
public expect class InsecureClientCredentials : ClientCredentials
1111
public expect class InsecureServerCredentials : ServerCredentials
1212

13+
internal expect fun createInsecureClientCredentials(): ClientCredentials
14+
1315
public expect class TlsClientCredentials : ClientCredentials
1416
public expect class TlsServerCredentials : ServerCredentials
1517

File renamed without changes.

0 commit comments

Comments
 (0)