Skip to content

Commit 8506d8a

Browse files
committed
grpc: Refactor cancel API in ClientCallScope to return Nothing
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 3d67b7f commit 8506d8a

File tree

5 files changed

+126
-5
lines changed

5 files changed

+126
-5
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ClientInterceptor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public interface ClientCallScope<Request, Response> {
1414
public val callOptions: GrpcCallOptions
1515
public fun onHeaders(block: (GrpcMetadata) -> Unit)
1616
public fun onClose(block: (Status, GrpcMetadata) -> Unit)
17-
public fun cancel(message: String, cause: Throwable? = null)
17+
public fun cancel(message: String, cause: Throwable? = null): Nothing
1818
public fun proceed(request: Flow<Request>): Flow<Response>
1919
}
2020

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ private class ClientCallScopeImpl<Request, Response>(
224224
onCloseFuture.onComplete { block(it.first, it.second) }
225225
}
226226

227-
override fun cancel(message: String, cause: Throwable?) {
228-
call.cancel(message, cause)
227+
override fun cancel(message: String, cause: Throwable?): Nothing {
228+
throw StatusException(Status(StatusCode.CANCELLED, message, cause))
229229
}
230230

231231
override fun proceed(request: Flow<Request>): Flow<Response> {
@@ -302,7 +302,15 @@ private class ClientCallScopeImpl<Request, Response>(
302302
responses: Channel<Response>,
303303
ready: Ready,
304304
) = clientCallListener(
305-
onHeaders = { onHeadersFuture.complete(it) },
305+
onHeaders = {
306+
try {
307+
onHeadersFuture.complete(it)
308+
} catch (e: StatusException) {
309+
// if a client interceptor called cancel, we throw a StatusException.
310+
// as the JVM implementation treats them differently, we need to catch them here.
311+
call.cancel(e.message, e.cause)
312+
}
313+
},
306314
onMessage = { message: Response ->
307315
responses.trySend(message).onFailure { e ->
308316
throw e ?: AssertionError("onMessage should never be called until responses is ready")

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ClientInterceptorTest.kt

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ class ClientInterceptorTest : GrpcProtoTest() {
8787
val error = assertFailsWith<StatusException> {
8888
val interceptor = interceptor {
8989
cancel("Canceling in interceptor", IllegalStateException("Cancellation cause"))
90-
proceed(it)
9190
}
9291
runGrpcTest(clientInterceptors = interceptor, test = ::unaryCall)
9392
}
@@ -98,6 +97,85 @@ class ClientInterceptorTest : GrpcProtoTest() {
9897
assertEquals("Cancellation cause", error.cause?.message)
9998
}
10099

100+
@Test
101+
fun `cancel in request flow - should fail with cancellation`() {
102+
val error = assertFailsWith<StatusException> {
103+
val interceptor = interceptor {
104+
proceed(it.map {
105+
val msg = it as EchoRequest
106+
if (msg.message == "Echo-3") {
107+
cancel("Canceling in request flow", IllegalStateException("Cancellation cause"))
108+
}
109+
it
110+
})
111+
}
112+
runGrpcTest(clientInterceptors = interceptor, test = ::bidiStream)
113+
}
114+
115+
assertEquals(StatusCode.CANCELLED, error.getStatus().statusCode)
116+
assertContains(error.message!!, "Canceling in request flow")
117+
assertIs<IllegalStateException>(error.cause)
118+
assertEquals("Cancellation cause", error.cause?.message)
119+
}
120+
121+
@Test
122+
fun `cancel in response flow - should fail with cancellation`() {
123+
val error = assertFailsWith<StatusException> {
124+
val interceptor = interceptor {
125+
flow {
126+
proceed(it).collect { resp ->
127+
val msg = resp as EchoResponse
128+
if (msg.message == "Echo-3") {
129+
cancel("Canceling in response flow", IllegalStateException("Cancellation cause"))
130+
}
131+
emit(resp)
132+
}
133+
}
134+
}
135+
runGrpcTest(clientInterceptors = interceptor, test = ::bidiStream)
136+
}
137+
138+
assertEquals(StatusCode.CANCELLED, error.getStatus().statusCode)
139+
assertContains(error.message!!, "Canceling in response flow")
140+
assertIs<IllegalStateException>(error.cause)
141+
assertEquals("Cancellation cause", error.cause?.message)
142+
}
143+
144+
@Test
145+
fun `cancel onHeaders - should fail with cancellation`() {
146+
val error = assertFailsWith<StatusException> {
147+
val interceptor = interceptor {
148+
this.onHeaders {
149+
cancel("Canceling in headers", IllegalStateException("Cancellation cause"))
150+
}
151+
proceed(it)
152+
}
153+
runGrpcTest(clientInterceptors = interceptor, test = ::bidiStream)
154+
}
155+
156+
assertEquals(StatusCode.CANCELLED, error.getStatus().statusCode)
157+
assertContains(error.message!!, "Canceling in headers")
158+
assertIs<IllegalStateException>(error.cause)
159+
assertEquals("Cancellation cause", error.cause?.message)
160+
}
161+
162+
@Test
163+
fun `cancel onClose - should fail with cancellation`() {
164+
val error = assertFailsWith<StatusException> {
165+
val interceptor = interceptor {
166+
this.onClose { _, _ ->
167+
cancel("Canceling in onClose", IllegalStateException("Cancellation cause"))
168+
}
169+
proceed(it)
170+
}
171+
runGrpcTest(clientInterceptors = interceptor, test = ::bidiStream)
172+
}
173+
assertEquals(StatusCode.CANCELLED, error.getStatus().statusCode)
174+
assertContains(error.message!!, "Canceling in onClose")
175+
assertIs<IllegalStateException>(error.cause)
176+
assertEquals("Cancellation cause", error.cause?.message)
177+
}
178+
101179
@Test
102180
fun `modify request message - should return modified message`() {
103181
val interceptor = interceptor {
@@ -156,6 +234,19 @@ class ClientInterceptorTest : GrpcProtoTest() {
156234
assertEquals("Hello", response.message)
157235
}
158236

237+
private suspend fun bidiStream(grpcClient: GrpcClient) {
238+
val service = grpcClient.withService<EchoService>()
239+
val responses = service.BidirectionalStreamingEcho(flow {
240+
repeat(5) {
241+
emit(EchoRequest { message = "Echo-$it" })
242+
}
243+
}).toList()
244+
assertEquals(5, responses.size)
245+
repeat(5) {
246+
assertEquals("Echo-$it", responses[it].message)
247+
}
248+
}
249+
159250
}
160251

161252
private fun interceptor(

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,27 @@ class ServerInterceptorTest : GrpcProtoTest() {
148148
}
149149
}
150150

151+
@Test
152+
fun `proceedFlow - should succeed on client`() {
153+
val interceptor = interceptor {
154+
kotlinx.coroutines.flow.flow {
155+
proceedFlow(it)
156+
}
157+
}
158+
runGrpcTest(serverInterceptors = interceptor, test = ::unaryCall)
159+
}
160+
161+
@Test
162+
fun `method descriptor - full method name is exposed`() {
163+
var methodName: String? = null
164+
val interceptor = interceptor {
165+
methodName = method.getFullMethodName()
166+
proceed(it)
167+
}
168+
runGrpcTest(serverInterceptors = interceptor, test = ::unaryCall)
169+
assertContains(methodName!!, "EchoService/UnaryEcho")
170+
}
171+
151172
private suspend fun unaryCall(grpcClient: GrpcClient) {
152173
val service = grpcClient.withService<EchoService>()
153174
val response = service.UnaryEcho(EchoRequest { message = "Hello" })

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ internal class NativeServerCall<Request, Response>(
101101
}
102102

103103
private fun initialize() {
104+
println("Initializing native server call")
104105
// finishes if the whole connection is closed.
105106
// this triggers onClose()/onCanceled() callback.
106107
val arena = Arena()

0 commit comments

Comments
 (0)