Skip to content

Commit e9d54ab

Browse files
committed
grpc: Start getRequestMetadata with coroutineContext
1 parent 10c8f86 commit e9d54ab

File tree

11 files changed

+90
-42
lines changed

11 files changed

+90
-42
lines changed

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package kotlinx.rpc.grpc.client.internal
66

7-
import kotlinx.coroutines.CoroutineScope
87
import kotlinx.rpc.grpc.client.GrpcCallOptions
98
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
109
import kotlinx.rpc.internal.utils.InternalRpcApi
10+
import kotlin.coroutines.CoroutineContext
1111

1212
@InternalRpcApi
1313
public expect abstract class GrpcChannel
@@ -16,4 +16,5 @@ public expect abstract class GrpcChannel
1616
public expect fun <RequestT, ResponseT> GrpcChannel.createCall(
1717
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
1818
callOptions: GrpcCallOptions,
19+
coroutineContext: CoroutineContext,
1920
): ClientCall<RequestT, ResponseT>

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc.grpc.client.internal
66

77
import kotlinx.coroutines.CancellationException
88
import kotlinx.coroutines.CoroutineName
9+
import kotlinx.coroutines.Job
910
import kotlinx.coroutines.NonCancellable
1011
import kotlinx.coroutines.cancel
1112
import kotlinx.coroutines.channels.Channel
@@ -22,8 +23,6 @@ import kotlinx.rpc.grpc.Status
2223
import kotlinx.rpc.grpc.StatusCode
2324
import kotlinx.rpc.grpc.StatusException
2425
import kotlinx.rpc.grpc.client.ClientCallScope
25-
import kotlinx.rpc.grpc.client.EmptyCallCredentials
26-
import kotlinx.rpc.grpc.client.GrpcCallCredentials
2726
import kotlinx.rpc.grpc.client.GrpcCallOptions
2827
import kotlinx.rpc.grpc.client.GrpcClient
2928
import kotlinx.rpc.grpc.client.plus
@@ -202,7 +201,10 @@ private class ClientCallScopeImpl<Request, Response>(
202201
coroutineScope {
203202
// attach all call credentials set in the client to the call option ones.
204203
callOptions.callCredentials += client.callCredentials
205-
val call = client.channel.platformApi.createCall(method, callOptions)
204+
// pass this scope's context, so the suspend functions of call credentials launch in the same context.
205+
// it will ensure that getRequestMetadata of the callCredential won't orphan if the call is cancelled
206+
// by the user or by grpc-java
207+
val call = client.channel.platformApi.createCall(method, callOptions, this.coroutineContext)
206208

207209
/*
208210
* We maintain a buffer of size 1, so onMessage never has to block: it only gets called after

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.jvm.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ package kotlinx.rpc.grpc.client
66

77
import io.grpc.CallOptions
88
import kotlinx.rpc.grpc.GrpcCompression
9-
import kotlinx.rpc.internal.utils.InternalRpcApi
109
import java.util.concurrent.TimeUnit
10+
import kotlin.coroutines.CoroutineContext
1111

12-
@InternalRpcApi
13-
public fun GrpcCallOptions.toJvm(): CallOptions {
12+
internal fun GrpcCallOptions.toJvm(coroutineContext: CoroutineContext): CallOptions {
1413
var default = CallOptions.DEFAULT
1514
if (timeout != null) {
1615
default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, TimeUnit.MILLISECONDS)
@@ -19,7 +18,7 @@ public fun GrpcCallOptions.toJvm(): CallOptions {
1918
default = default.withCompression(compression.name)
2019
}
2120
if (callCredentials !is EmptyCallCredentials) {
22-
default = default.withCallCredentials(callCredentials.toJvm())
21+
default = default.withCallCredentials(callCredentials.toJvm(coroutineContext))
2322
}
2423
return default
2524
}

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/credentials.jvm.kt

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import io.grpc.InsecureChannelCredentials
1010
import io.grpc.SecurityLevel
1111
import io.grpc.TlsChannelCredentials
1212
import kotlinx.coroutines.CoroutineScope
13-
import kotlinx.coroutines.asCoroutineDispatcher
1413
import kotlinx.coroutines.launch
1514
import kotlinx.rpc.grpc.Status
1615
import java.util.concurrent.Executor
16+
import kotlin.coroutines.CoroutineContext
1717
import kotlin.coroutines.cancellation.CancellationException
1818

1919
internal fun GrpcClientCredentials.toJvm(): ChannelCredentials {
@@ -46,32 +46,32 @@ private class JvmTlsCLientCredentialBuilder : GrpcTlsClientCredentialsBuilder {
4646
}
4747
}
4848

49-
internal fun GrpcCallCredentials.toJvm(): CallCredentials {
49+
internal fun GrpcCallCredentials.toJvm(coroutineContext: CoroutineContext): CallCredentials {
5050
return object : CallCredentials() {
5151
override fun applyRequestMetadata(
5252
requestInfo: RequestInfo,
5353
appExecutor: Executor,
5454
applier: MetadataApplier
5555
) {
56-
val dispatcher = appExecutor.asCoroutineDispatcher()
57-
CoroutineScope(dispatcher).launch {
58-
try {
59-
check(!requiresTransportSecurity || requestInfo.securityLevel != SecurityLevel.NONE) {
56+
CoroutineScope(coroutineContext).launch {
57+
if (requiresTransportSecurity && requestInfo.securityLevel == SecurityLevel.NONE) {
58+
applier.fail(Status.UNAUTHENTICATED.withDescription(
6059
"Established channel does not have a sufficient security level to transfer call credential."
61-
}
60+
))
61+
return@launch
62+
}
6263

64+
try {
6365
val context = GrpcCallCredentials.Context(requestInfo.authority, requestInfo.methodDescriptor.fullMethodName)
6466
val metadata = context.getRequestMetadata()
6567
applier.apply(metadata)
66-
} catch (e: Exception) {
68+
} catch (err: Throwable) {
6769
// we are not treating StatusExceptions separately, as currently there is no
6870
// clean way to support the same feature on native. So for the sake of similar behavior,
6971
// we always fail with Status.UNAVAILABLE. (KRPC-233)
70-
val description = "Getting metadata from call credentials failed with error: ${e.message}"
71-
applier.fail(Status.UNAVAILABLE.withDescription(description).withCause(e))
72-
if (e is CancellationException) {
73-
throw e
74-
}
72+
val description = "Getting metadata from call credentials failed with error: ${err.message}"
73+
applier.fail(Status.UNAVAILABLE.withDescription(description).withCause(err))
74+
if (err is CancellationException) throw err
7575
}
7676
}
7777
}

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.rpc.grpc.client.GrpcCallOptions
99
import kotlinx.rpc.grpc.client.toJvm
1010
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
1111
import kotlinx.rpc.internal.utils.InternalRpcApi
12+
import kotlin.coroutines.CoroutineContext
1213

1314
@InternalRpcApi
1415
public actual typealias GrpcChannel = Channel
@@ -17,6 +18,7 @@ public actual typealias GrpcChannel = Channel
1718
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
1819
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
1920
callOptions: GrpcCallOptions,
21+
coroutineContext: CoroutineContext,
2022
): ClientCall<RequestT, ResponseT> {
21-
return this.newCall(methodDescriptor, callOptions.toJvm())
23+
return this.newCall(methodDescriptor, callOptions.toJvm(coroutineContext))
2224
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallCredentials.native.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ import kotlinx.rpc.grpc.internal.toRaw
2020
import kotlinx.rpc.grpc.statusCode
2121
import libkgrpc.*
2222
import platform.posix.size_tVar
23+
import kotlin.coroutines.CoroutineContext
24+
import kotlin.coroutines.coroutineContext
2325

2426
// Stable reference holder for Kotlin objects
2527
private class CredentialsPluginState(
2628
val kotlinCreds: GrpcCallCredentials,
27-
val parentJob: Job,
28-
val coroutineDispatcher: CoroutineDispatcher
29+
val coroutineContext: CoroutineContext,
2930
)
3031

3132
private fun getMetadataCallback(
@@ -62,7 +63,7 @@ private fun getMetadataCallback(
6263
}
6364
}
6465

65-
val scope = CoroutineScope(Job(pluginState.parentJob) + pluginState.coroutineDispatcher)
66+
val scope = CoroutineScope(pluginState.coroutineContext)
6667

6768
// Launch coroutine to call a suspend function asynchronously
6869
scope.launch {
@@ -120,11 +121,10 @@ private fun destroyCallback(state: COpaquePointer?) {
120121
}
121122

122123
internal fun GrpcCallCredentials.createRaw(
123-
parentJob: Job,
124-
coroutineDispatcher: CoroutineDispatcher
124+
coroutineContext: CoroutineContext,
125125
): CPointer<grpc_call_credentials>? = memScoped {
126126
// Create a stable reference to keep the Kotlin object alive
127-
val pluginState = CredentialsPluginState(this@createRaw, parentJob, coroutineDispatcher)
127+
val pluginState = CredentialsPluginState(this@createRaw, coroutineContext)
128128
val stableRef = StableRef.create(pluginState)
129129

130130
// Create plugin structure

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,22 @@ import kotlinx.rpc.grpc.GrpcMetadata
99
import kotlinx.rpc.grpc.client.GrpcCallOptions
1010
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
1111
import kotlinx.rpc.internal.utils.InternalRpcApi
12+
import kotlin.coroutines.CoroutineContext
1213

1314
@InternalRpcApi
1415
public actual abstract class GrpcChannel {
1516
public abstract fun <RequestT, ResponseT> newCall(
1617
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
1718
callOptions: GrpcCallOptions,
19+
coroutineContext: CoroutineContext
1820
): ClientCall<RequestT, ResponseT>
1921
}
2022

2123
@InternalRpcApi
2224
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
2325
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
2426
callOptions: GrpcCallOptions,
27+
coroutineContext: CoroutineContext,
2528
): ClientCall<RequestT, ResponseT> {
26-
return this.newCall(methodDescriptor, callOptions)
29+
return this.newCall(methodDescriptor, callOptions, coroutineContext)
2730
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ import kotlinx.cinterop.value
2424
import kotlinx.coroutines.CancellationException
2525
import kotlinx.coroutines.CompletableJob
2626
import kotlinx.coroutines.CoroutineScope
27-
import kotlinx.coroutines.Dispatchers
28-
import kotlinx.coroutines.Job
29-
import kotlinx.coroutines.coroutineScope
30-
import kotlinx.coroutines.launch
3127
import kotlinx.rpc.grpc.GrpcMetadata
3228
import kotlinx.rpc.grpc.Status
3329
import kotlinx.rpc.grpc.StatusCode
@@ -43,14 +39,9 @@ import kotlinx.rpc.grpc.internal.toKotlin
4339
import kotlinx.rpc.protobuf.input.stream.asInputStream
4440
import kotlinx.rpc.protobuf.input.stream.asSource
4541
import kotlinx.rpc.grpc.GrpcCompression
46-
import kotlinx.rpc.grpc.StatusException
4742
import kotlinx.rpc.grpc.client.EmptyCallCredentials
48-
import kotlinx.rpc.grpc.client.GrpcCallCredentials
4943
import kotlinx.rpc.grpc.client.GrpcCallOptions
5044
import kotlinx.rpc.grpc.client.createRaw
51-
import kotlinx.rpc.grpc.internal.toRaw
52-
import kotlinx.rpc.grpc.merge
53-
import kotlinx.rpc.grpc.statusCode
5445
import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA
5546
import libkgrpc.GRPC_OP_RECV_MESSAGE
5647
import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -65,14 +56,14 @@ import libkgrpc.grpc_call_credentials_release
6556
import libkgrpc.grpc_call_error
6657
import libkgrpc.grpc_call_set_credentials
6758
import libkgrpc.grpc_call_unref
68-
import libkgrpc.grpc_channel_credentials_release
6959
import libkgrpc.grpc_metadata_array
7060
import libkgrpc.grpc_metadata_array_destroy
7161
import libkgrpc.grpc_metadata_array_init
7262
import libkgrpc.grpc_op
7363
import libkgrpc.grpc_slice
7464
import libkgrpc.grpc_slice_unref
7565
import libkgrpc.grpc_status_code
66+
import kotlin.coroutines.CoroutineContext
7667
import kotlin.experimental.ExperimentalNativeApi
7768
import kotlin.native.ref.createCleaner
7869

@@ -83,6 +74,7 @@ internal class NativeClientCall<Request, Response>(
8374
private val methodDescriptor: MethodDescriptor<Request, Response>,
8475
private val callOptions: GrpcCallOptions,
8576
private val callJob: CompletableJob,
77+
private val coroutineContext: CoroutineContext,
8678
) : ClientCall<Request, Response>() {
8779

8880
@Suppress("unused")
@@ -91,7 +83,7 @@ internal class NativeClientCall<Request, Response>(
9183
}
9284

9385
private val rawCallCredentials = callOptions.callCredentials.let {
94-
if (it is EmptyCallCredentials) null else it.createRaw(callJob, Dispatchers.Default)
86+
if (it is EmptyCallCredentials) null else it.createRaw(coroutineContext)
9587
}
9688

9789
@Suppress("unused")

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import libkgrpc.grpc_channel_create_call
4242
import libkgrpc.grpc_channel_credentials_release
4343
import libkgrpc.grpc_channel_destroy
4444
import libkgrpc.grpc_slice_unref
45+
import kotlin.coroutines.CoroutineContext
4546
import kotlin.coroutines.cancellation.CancellationException
4647
import kotlin.experimental.ExperimentalNativeApi
4748
import kotlin.native.ref.createCleaner
@@ -171,6 +172,7 @@ internal class NativeManagedChannel(
171172
override fun <RequestT, ResponseT> newCall(
172173
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
173174
callOptions: GrpcCallOptions,
175+
coroutineContext: CoroutineContext
174176
): ClientCall<RequestT, ResponseT> {
175177
check(!isShutdown) { internalError("Channel is shutdown") }
176178

@@ -198,6 +200,7 @@ internal class NativeManagedChannel(
198200
methodDescriptor =methodDescriptor,
199201
callOptions = callOptions,
200202
callJob = Job(callJobSupervisor),
203+
coroutineContext = coroutineContext,
201204
)
202205
}
203206

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package kotlinx.rpc.grpc.test
55

66
import kotlinx.coroutines.CompletableDeferred
77
import kotlinx.coroutines.CoroutineScope
8-
import kotlinx.coroutines.Job
8+
import kotlinx.coroutines.Dispatchers
99
import kotlinx.coroutines.delay
1010
import kotlinx.coroutines.runBlocking
1111
import kotlinx.coroutines.test.runTest
@@ -54,7 +54,7 @@ class GrpcCoreClientTest {
5454
)
5555

5656
private fun ManagedChannel.newHelloCall(fullName: String = "kotlinx.rpc.grpc.test.GreeterService/SayHello"): ClientCall<HelloRequest, HelloReply> =
57-
platformApi.createCall(descriptorFor(fullName), GrpcCallOptions())
57+
platformApi.createCall(descriptorFor(fullName), GrpcCallOptions(), Dispatchers.Default)
5858

5959
private fun createChannel(): ManagedChannel = ManagedChannelBuilder(
6060
target = "localhost:$PORT",

0 commit comments

Comments
 (0)