Skip to content

Commit a94d849

Browse files
committed
grpc: Address PR comments
Signed-off-by: Johannes Zottele <[email protected]>
1 parent ebd7a64 commit a94d849

File tree

7 files changed

+96
-23
lines changed

7 files changed

+96
-23
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ClientInterceptor.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public interface ClientCallScope<Request, Response> {
7777
/**
7878
* Cancel the call locally, providing a human-readable [message] and an optional [cause].
7979
* This method won't return and abort all further processing.
80+
*
81+
* We made cancel throw a [StatusException] instead of returning, so control flow is explicit and
82+
* race conditions between interceptors and the transport layer are avoided.
8083
*/
8184
public fun cancel(message: String, cause: Throwable? = null): Nothing
8285

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ public interface ManagedChannel {
6868
* Builder class for [ManagedChannel].
6969
*/
7070
public expect abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>> {
71-
72-
// TODO: Not used anymore
73-
public fun usePlaintext(): T
74-
7571
public abstract fun overrideAuthority(authority: String): T
7672
}
7773

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerInterceptor.kt

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public interface ServerCallScope<Request, Response> {
5151
* It can be used by the interceptor to provide call-scoped information about
5252
* the current call, such as the identity of the caller or the current authentication state.
5353
*/
54-
public val grpcContext: GrpcContext
54+
public val context: GrpcContext
5555

5656
/**
5757
* Register a callback invoked when the call is closed (successfully or exceptionally).
@@ -64,6 +64,9 @@ public interface ServerCallScope<Request, Response> {
6464
*
6565
* This method does not return (declared as [Nothing]). After calling it, no further messages will be processed
6666
* or sent. Prefer setting [responseHeaders]/[responseTrailers] before closing if you need to include metadata.
67+
*
68+
* We made close throw a [StatusException] instead of returning, so control flow is explicit and race conditions
69+
* between interceptors and the service implementation are avoided.
6770
*/
6871
public fun close(status: Status, trailers: GrpcMetadata = GrpcMetadata()): Nothing
6972

@@ -83,8 +86,22 @@ public interface ServerCallScope<Request, Response> {
8386
/**
8487
* Convenience for flow builders: proceeds with [request] and emits the resulting response elements into this
8588
* [FlowCollector]. Useful inside `flow {}` blocks within interceptors.
89+
*
90+
* ```
91+
* val myAuthInterceptor = object : ServerInterceptor {
92+
* override fun <Request, Response> ServerCallScope<Request, Response>.intercept(request: Flow<Request>): Flow<Response> =
93+
* flow {
94+
* val authorized = mySuspendAuth(requestHeaders)
95+
* if (!authorized) {
96+
* close(Status(StatusCode.UNAUTHENTICATED, "Not authorized"))
97+
* }
98+
*
99+
* proceedUnmodified(request)
100+
* }
101+
* }
102+
* ```
86103
*/
87-
public suspend fun FlowCollector<Response>.proceedFlow(request: Flow<Request>) {
104+
public suspend fun FlowCollector<Response>.proceedUnmodified(request: Flow<Request>) {
88105
proceed(request).collect {
89106
emit(it)
90107
}
@@ -115,7 +132,7 @@ public interface ServerInterceptor {
115132
* - Read [ServerCallScope.requestHeaders] and populate [ServerCallScope.responseHeaders]/[ServerCallScope.responseTrailers].
116133
* - Register [ServerCallScope.onClose] callbacks.
117134
* - Transform the [request] flow or wrap the resulting response flow.
118-
* - Append information to the [ServerCallScope.grpcContext].
135+
* - Append information to the [ServerCallScope.context].
119136
*
120137
* IMPORTANT: You must eventually call [ServerCallScope.proceed] to actually invoke the service logic and produce
121138
* the response [Flow]. If [ServerCallScope.proceed] is omitted, the call will never reach the service.

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
164164
implementation = implementation,
165165
requestHeaders = requestHeaders,
166166
serverCall = handler,
167-
grpcContext = context.grpcContext,
167+
context = context.grpcContext,
168168
)
169169

170170
val rpcJob = launch(context) {
@@ -275,7 +275,7 @@ private class ServerCallScopeImpl<Request, Response>(
275275
val implementation: (Flow<Request>) -> Flow<Response>,
276276
override val requestHeaders: GrpcMetadata,
277277
val serverCall: ServerCall<Request, Response>,
278-
override val grpcContext: GrpcContext,
278+
override val context: GrpcContext,
279279
) : ServerCallScope<Request, Response> {
280280

281281
override val responseHeaders: GrpcMetadata = GrpcMetadata()

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import kotlinx.rpc.grpc.ManagedChannelBuilder
1515
import kotlinx.rpc.grpc.Status
1616
import kotlinx.rpc.grpc.StatusCode
1717
import kotlinx.rpc.grpc.buildChannel
18+
import kotlinx.rpc.grpc.createInsecureClientCredentials
1819
import kotlinx.rpc.grpc.internal.ClientCall
1920
import kotlinx.rpc.grpc.internal.GrpcDefaultCallOptions
2021
import kotlinx.rpc.grpc.internal.MethodDescriptor
@@ -52,9 +53,10 @@ class GrpcCoreClientTest {
5253
private fun ManagedChannel.newHelloCall(fullName: String = "kotlinx.rpc.grpc.test.GreeterService/SayHello"): ClientCall<HelloRequest, HelloReply> =
5354
platformApi.newCall(descriptorFor(fullName), GrpcDefaultCallOptions)
5455

55-
private fun createChannel(): ManagedChannel = ManagedChannelBuilder("localhost:$PORT")
56-
.usePlaintext()
57-
.buildChannel()
56+
private fun createChannel(): ManagedChannel = ManagedChannelBuilder(
57+
target = "localhost:$PORT",
58+
credentials = createInsecureClientCredentials()
59+
).buildChannel()
5860

5961

6062
private fun helloReq(timeout: UInt = 0u): HelloRequest = HelloRequest {

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
package kotlinx.rpc.grpc.test.proto
66

77
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.flow
89
import kotlinx.coroutines.flow.flowOf
910
import kotlinx.coroutines.flow.map
11+
import kotlinx.coroutines.flow.toList
1012
import kotlinx.rpc.RpcServer
1113
import kotlinx.rpc.grpc.GrpcClient
1214
import kotlinx.rpc.grpc.GrpcMetadata
@@ -184,13 +186,61 @@ class ServerInterceptorTest : GrpcProtoTest() {
184186
@Test
185187
fun `proceedFlow - should succeed on client`() {
186188
val interceptor = interceptor {
187-
kotlinx.coroutines.flow.flow {
188-
proceedFlow(it)
189+
flow {
190+
proceedUnmodified(it)
189191
}
190192
}
191193
runGrpcTest(serverInterceptors = interceptor, test = ::unaryCall)
192194
}
193195

196+
@Test
197+
fun `test exact order of interceptor execution`() {
198+
val order = mutableListOf<Int>()
199+
val interceptor1 = interceptor { request ->
200+
flow {
201+
order.add(1)
202+
var i1 = 0
203+
val ids = listOf(3, 7)
204+
val req = request.map { order.add(ids[i1++]); it }
205+
206+
var i2 = 0
207+
val respIds = listOf(6, 10)
208+
proceed(req).collect {
209+
order.add(respIds[i2++])
210+
emit(it)
211+
}
212+
213+
order.add(12)
214+
}
215+
}
216+
217+
val interceptor2 = interceptor { request ->
218+
flow {
219+
order.add(2)
220+
var i1 = 0
221+
val reqIds = listOf(4, 8)
222+
val req = request.map { order.add(reqIds[i1++]); it }
223+
224+
var i2 = 0
225+
val respIds = listOf(5, 9)
226+
proceed(req).collect {
227+
order.add(respIds[i2++])
228+
emit(it)
229+
}
230+
231+
order.add(11)
232+
}
233+
}
234+
val both = interceptor1 + interceptor2
235+
236+
runGrpcTest(serverInterceptors = both) { bidiStream(it, 2) }
237+
238+
assertEquals(
239+
listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12),
240+
order
241+
)
242+
}
243+
194244
@Test
195245
fun `method descriptor - full method name is exposed`() {
196246
var methodName: String? = null
@@ -207,6 +257,20 @@ class ServerInterceptorTest : GrpcProtoTest() {
207257
val response = service.UnaryEcho(EchoRequest { message = "Hello" })
208258
assertEquals("Hello", response.message)
209259
}
260+
261+
private suspend fun bidiStream(grpcClient: GrpcClient, count: Int = 5) {
262+
val service = grpcClient.withService<EchoService>()
263+
val responses = service.BidirectionalStreamingEcho(flow {
264+
repeat(count) {
265+
emit(EchoRequest { message = "Echo-$it" })
266+
}
267+
}).toList()
268+
assertEquals(count, responses.size)
269+
repeat(count) {
270+
assertEquals("Echo-$it", responses[it].message)
271+
}
272+
}
273+
210274
}
211275

212276

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ public actual abstract class ManagedChannelPlatform : GrpcChannel()
1919
* Builder class for [ManagedChannel].
2020
*/
2121
public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>> {
22-
public actual open fun usePlaintext(): T {
23-
error("Builder does not support usePlaintext()")
24-
}
25-
2622
public actual abstract fun overrideAuthority(authority: String): T
2723
}
2824

@@ -33,11 +29,6 @@ internal class NativeManagedChannelBuilder(
3329

3430
private var authority: String? = null
3531

36-
override fun usePlaintext(): NativeManagedChannelBuilder {
37-
credentials = lazy { createInsecureClientCredentials() }
38-
return this
39-
}
40-
4132
override fun overrideAuthority(authority: String): NativeManagedChannelBuilder {
4233
this.authority = authority
4334
return this

0 commit comments

Comments
 (0)