Skip to content

Commit 0648833

Browse files
committed
grpc: Refactor client scope API
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 4a62ed2 commit 0648833

File tree

3 files changed

+56
-53
lines changed

3 files changed

+56
-53
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ClientInterceptor.kt

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,6 @@ import kotlinx.coroutines.flow.Flow
88
import kotlinx.rpc.grpc.internal.GrpcCallOptions
99
import kotlinx.rpc.grpc.internal.MethodDescriptor
1010

11-
/**
12-
* Represents a client call scope within a coroutine context, providing access to properties and
13-
* functions required to manage the lifecycle and behavior of a client-side remote procedure call
14-
* (RPC) in a coroutine-based environment.
15-
*
16-
* @param Request the type of the request message sent to the gRPC server.
17-
* @param Response the type of the response message received from the gRPC server.
18-
*/
1911
public interface ClientCallScope<Request, Response> {
2012
public val method: MethodDescriptor<Request, Response>
2113
public val metadata: GrpcTrailers
@@ -32,13 +24,15 @@ public interface ClientInterceptor {
3224
* Intercepts and transforms the flow of requests and responses in a client call.
3325
* An interceptor can throw an exception at any time to cancel the call.
3426
*
35-
* @param scope The scope of the client call, providing context and methods for managing
27+
* The interceptor must ensure that it emits an expected number of values.
28+
* E.g. if the intercepted method is a unary call, the interceptor's returned flow must emit exactly one value.
29+
*
30+
* @param this The scope of the client call, providing context and methods for managing
3631
* the call lifecycle and metadata.
3732
* @param request A flow of requests to be sent to the server.
3833
* @return A flow of responses received from the server.
3934
*/
40-
public fun <Request, Response> intercept(
41-
scope: ClientCallScope<Request, Response>,
35+
public fun <Request, Response> ClientCallScope<Request, Response>.intercept(
4236
request: Flow<Request>,
4337
): Flow<Response>
4438

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,26 @@
44

55
package kotlinx.rpc.grpc.internal
66

7-
import kotlinx.coroutines.*
7+
import kotlinx.coroutines.CancellationException
8+
import kotlinx.coroutines.CoroutineName
9+
import kotlinx.coroutines.NonCancellable
10+
import kotlinx.coroutines.cancel
811
import kotlinx.coroutines.channels.Channel
912
import kotlinx.coroutines.channels.onFailure
13+
import kotlinx.coroutines.coroutineScope
1014
import kotlinx.coroutines.flow.Flow
1115
import kotlinx.coroutines.flow.flow
1216
import kotlinx.coroutines.flow.flowOf
1317
import kotlinx.coroutines.flow.single
14-
import kotlinx.rpc.grpc.*
18+
import kotlinx.coroutines.launch
19+
import kotlinx.coroutines.withContext
20+
import kotlinx.rpc.grpc.ClientCallScope
21+
import kotlinx.rpc.grpc.GrpcClient
22+
import kotlinx.rpc.grpc.GrpcTrailers
23+
import kotlinx.rpc.grpc.Status
24+
import kotlinx.rpc.grpc.StatusCode
25+
import kotlinx.rpc.grpc.StatusException
26+
import kotlinx.rpc.grpc.statusCode
1527
import kotlinx.rpc.internal.utils.InternalRpcApi
1628

1729
// heavily inspired by
@@ -218,8 +230,9 @@ private class ClientCallScopeImpl<Request, Response>(
218230

219231
override fun proceed(request: Flow<Request>): Flow<Response> {
220232
return if (interceptorIndex < interceptors.size) {
221-
interceptors[interceptorIndex++]
222-
.intercept(this, request)
233+
with(interceptors[interceptorIndex++]) {
234+
intercept(request)
235+
}
223236
} else {
224237
// if the interceptor chain is exhausted, we start the actual call
225238
doCall(request)

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ClientInterceptorTest.kt

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,17 @@
44

55
package kotlinx.rpc.grpc.test.proto
66

7-
import kotlinx.coroutines.CompletableDeferred
8-
import kotlinx.coroutines.coroutineScope
9-
import kotlinx.coroutines.delay
107
import kotlinx.coroutines.flow.Flow
118
import kotlinx.coroutines.flow.channelFlow
12-
import kotlinx.coroutines.flow.collect
13-
import kotlinx.coroutines.flow.conflate
149
import kotlinx.coroutines.flow.flow
1510
import kotlinx.coroutines.flow.map
1611
import kotlinx.coroutines.flow.toList
17-
import kotlinx.coroutines.launch
1812
import kotlinx.rpc.RpcServer
1913
import kotlinx.rpc.grpc.ClientCallScope
2014
import kotlinx.rpc.grpc.ClientInterceptor
2115
import kotlinx.rpc.grpc.GrpcClient
2216
import kotlinx.rpc.grpc.StatusCode
2317
import kotlinx.rpc.grpc.StatusException
24-
import kotlinx.rpc.grpc.internal.bidirectionalStreamingRpc
2518
import kotlinx.rpc.grpc.statusCode
2619
import kotlinx.rpc.grpc.test.EchoRequest
2720
import kotlinx.rpc.grpc.test.EchoResponse
@@ -37,7 +30,7 @@ import kotlin.test.assertFailsWith
3730
import kotlin.test.assertIs
3831
import kotlin.test.assertTrue
3932

40-
class ClientInterceptorTest: GrpcProtoTest() {
33+
class ClientInterceptorTest : GrpcProtoTest() {
4134

4235
override fun RpcServer.registerServices() {
4336
registerService<EchoService> { EchoServiceImpl() }
@@ -46,7 +39,7 @@ class ClientInterceptorTest: GrpcProtoTest() {
4639
@Test
4740
fun `throw during intercept - should fail with thrown exception`() {
4841
val error = assertFailsWith<IllegalStateException> {
49-
val interceptor = interceptor { _, _ ->
42+
val interceptor = interceptor {
5043
throw IllegalStateException("Failing in interceptor")
5144
}
5245
runGrpcTest(clientInterceptors = interceptor, test = ::unaryCall)
@@ -58,11 +51,11 @@ class ClientInterceptorTest: GrpcProtoTest() {
5851
@Test
5952
fun `throw during onHeader - should fail with status exception containing the thrown exception`() {
6053
val error = assertFailsWith<StatusException> {
61-
val interceptor = interceptor { scope, req ->
62-
scope.onHeaders {
54+
val interceptor = interceptor {
55+
onHeaders {
6356
throw IllegalStateException("Failing in onHeader")
6457
}
65-
scope.proceed(req)
58+
proceed(it)
6659
}
6760
runGrpcTest(clientInterceptors = interceptor, test = ::unaryCall)
6861
}
@@ -75,11 +68,11 @@ class ClientInterceptorTest: GrpcProtoTest() {
7568
@Test
7669
fun `throw during onClose - should fail with status exception containing the thrown exception`() {
7770
val error = assertFailsWith<StatusException> {
78-
val interceptor = interceptor { scope, req ->
79-
scope.onClose { _, _ ->
71+
val interceptor = interceptor {
72+
onClose { _, _ ->
8073
throw IllegalStateException("Failing in onClose")
8174
}
82-
scope.proceed(req)
75+
proceed(it)
8376
}
8477
runGrpcTest(clientInterceptors = interceptor, test = ::unaryCall)
8578
}
@@ -92,9 +85,9 @@ class ClientInterceptorTest: GrpcProtoTest() {
9285
@Test
9386
fun `cancel in intercept - should fail with cancellation`() {
9487
val error = assertFailsWith<StatusException> {
95-
val interceptor = interceptor { scope, req ->
96-
scope.cancel("Canceling in interceptor", IllegalStateException("Cancellation cause"))
97-
scope.proceed(req)
88+
val interceptor = interceptor {
89+
cancel("Canceling in interceptor", IllegalStateException("Cancellation cause"))
90+
proceed(it)
9891
}
9992
runGrpcTest(clientInterceptors = interceptor, test = ::unaryCall)
10093
}
@@ -107,9 +100,9 @@ class ClientInterceptorTest: GrpcProtoTest() {
107100

108101
@Test
109102
fun `modify request message - should return modified message`() {
110-
val interceptor = interceptor { scope, req ->
111-
val modified = req.map { EchoRequest { message = "Modified" } }
112-
scope.proceed(modified)
103+
val interceptor = interceptor {
104+
val modified = it.map { EchoRequest { message = "Modified" } }
105+
proceed(modified)
113106
}
114107
runGrpcTest(clientInterceptors = interceptor) {
115108
val service = it.withService<EchoService>()
@@ -120,8 +113,8 @@ class ClientInterceptorTest: GrpcProtoTest() {
120113

121114
@Test
122115
fun `modify response message - should return modified message`() {
123-
val interceptor = interceptor { scope, req ->
124-
scope.proceed(req).map { EchoResponse { message = "Modified" } }
116+
val interceptor = interceptor {
117+
proceed(it).map { EchoResponse { message = "Modified" } }
125118
}
126119
runGrpcTest(clientInterceptors = interceptor) {
127120
val service = it.withService<EchoService>()
@@ -132,24 +125,26 @@ class ClientInterceptorTest: GrpcProtoTest() {
132125

133126
@Test
134127
fun `append a response message once closed`() {
135-
val interceptor = interceptor { scope, req -> channelFlow {
136-
scope.proceed(req).collect {
137-
trySend(it)
138-
}
139-
scope.onClose { status, _ ->
140-
trySend(EchoResponse { message = "Appended-after-close-with-${status.statusCode}" })
128+
val interceptor = interceptor {
129+
channelFlow {
130+
proceed(it).collect {
131+
trySend(it)
132+
}
133+
onClose { status, _ ->
134+
trySend(EchoResponse { message = "Appended-after-close-with-${status.statusCode}" })
135+
}
141136
}
142-
} }
137+
}
143138

144139
runGrpcTest(
145140
clientInterceptors = interceptor
146141
) { client ->
147142
val svc = client.withService<EchoService>()
148143
val responses = svc.BidirectionalStreamingEcho(flow {
149-
repeat(5) {
150-
emit(EchoRequest { message = "Eccchhooo" })
151-
}
152-
}).toList()
144+
repeat(5) {
145+
emit(EchoRequest { message = "Eccchhooo" })
146+
}
147+
}).toList()
153148
assertEquals(6, responses.size)
154149
assertTrue(responses.any { it.message == "Appended-after-close-with-OK" })
155150
}
@@ -164,15 +159,16 @@ class ClientInterceptorTest: GrpcProtoTest() {
164159
}
165160

166161
private fun interceptor(
167-
block: (ClientCallScope<Any, Any> , Flow<Any>) -> Flow<Any>
162+
block: ClientCallScope<Any, Any>.(Flow<Any>) -> Flow<Any>,
168163
): List<ClientInterceptor> {
169164
return listOf(object : ClientInterceptor {
170165
@Suppress("UNCHECKED_CAST")
171-
override fun <Req, Resp> intercept(
172-
scope: ClientCallScope<Req, Resp>,
166+
override fun <Req, Resp> ClientCallScope<Req, Resp>.intercept(
173167
request: Flow<Req>,
174168
): Flow<Resp> {
175-
return block(scope as ClientCallScope<Any, Any>, request as Flow<Any>) as Flow<Resp>
169+
with(this as ClientCallScope<Any, Any>) {
170+
return block(request as Flow<Any>) as Flow<Resp>
171+
}
176172
}
177173
})
178174
}

0 commit comments

Comments
 (0)