Skip to content

Commit f583112

Browse files
committed
grpc-native: Fixing memory leaks
Signed-off-by: Johannes Zottele <[email protected]>
1 parent a76724c commit f583112

File tree

5 files changed

+81
-61
lines changed

5 files changed

+81
-61
lines changed

grpc/grpc-core/build.gradle.kts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ kotlin {
100100
extraOpts("-libraryPath", "$cLibOutDir")
101101
}
102102
}
103+
104+
targets.withType<org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget>().configureEach {
105+
binaries {
106+
// Ensure test binaries are created for both debug and release
107+
test(
108+
buildTypes = listOf(
109+
org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE
110+
)
111+
)
112+
}
113+
}
103114
}
104115

105116
configureLocalProtocGenDevelopmentDependency()

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ package kotlinx.rpc.grpc.test
55

66
import kotlinx.coroutines.CompletableDeferred
77
import kotlinx.coroutines.delay
8+
import kotlinx.coroutines.launch
89
import kotlinx.coroutines.runBlocking
9-
import kotlinx.coroutines.test.runTest
1010
import kotlinx.coroutines.withTimeout
1111
import kotlinx.rpc.grpc.GrpcServer
1212
import kotlinx.rpc.grpc.GrpcTrailers
@@ -27,7 +27,6 @@ import kotlin.test.Test
2727
import kotlin.test.assertEquals
2828
import kotlin.test.assertFails
2929
import kotlin.test.assertFailsWith
30-
import kotlin.time.Duration
3130

3231
private const val PORT = 50051
3332

@@ -271,20 +270,31 @@ class GreeterServiceImpl : GreeterService {
271270
* Run this on JVM before executing tests.
272271
*/
273272
@Test
274-
fun runServer() = runTest(timeout = Duration.INFINITE) {
275-
val server = GrpcServer(
276-
port = PORT,
277-
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
278-
)
273+
fun runServer() {
274+
runBlocking {
275+
val server = GrpcServer(
276+
port = PORT,
277+
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
278+
)
279+
280+
launch {
281+
println("Terminating in 10 seconds")
282+
delay(10000)
283+
server.shutdown()
284+
server.awaitTermination()
285+
}
279286

280-
try {
281-
server.start()
282-
println("Server started")
283-
server.awaitTermination()
284-
} finally {
285-
server.shutdown()
286-
server.awaitTermination()
287+
launch {
288+
server.start()
289+
println("Server started")
290+
server.awaitTermination()
291+
}
287292
}
293+
294+
// runBlocking {
295+
// println("Waiting, so GC is collecting stuff")
296+
// delay(20000)
297+
// }
288298
}
289299

290300
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kotlinx.rpc.grpc.internal
88

99
import cnames.structs.grpc_server
1010
import cnames.structs.grpc_server_credentials
11+
import kotlinx.atomicfu.atomic
1112
import kotlinx.cinterop.COpaquePointer
1213
import kotlinx.cinterop.CPointer
1314
import kotlinx.cinterop.CValue
@@ -55,15 +56,9 @@ internal class GrpcServerCredentials(
5556
}
5657
}
5758

58-
internal typealias MethodTag = COpaquePointer
59-
60-
internal data class RegisteredMethod(
61-
val methodDescriptor: ServerMethodDefinition<*, *>,
62-
val tag: MethodTag,
63-
)
64-
6559
internal class NativeServer(
6660
override val port: Int,
61+
// we must reference them, otherwise the credentials are getting garbage collected
6762
@Suppress("Redundant")
6863
private val credentials: GrpcServerCredentials,
6964
) : Server {
@@ -76,30 +71,20 @@ internal class NativeServer(
7671

7772
val raw: CPointer<grpc_server> = grpc_server_create(null, null)!!
7873

79-
@Suppress("unused")
80-
private val rawCleaner = createCleaner(raw) {
81-
grpc_server_destroy(it)
82-
}
83-
8474
// holds all stable references to MethodAllocationCtx objects.
8575
// the stable references must eventually be disposed.
8676
private val methodAllocationCtxs = mutableSetOf<StableRef<MethodAllocationCtx>>()
8777

88-
@Suppress("unused")
89-
private val methodAllocationCtxCleaner = createCleaner(methodAllocationCtxs) { refs ->
90-
refs.forEach { it.dispose() }
91-
}
92-
9378
init {
9479
grpc_server_register_completion_queue(raw, cq.raw, null)
9580
grpc_server_add_http2_port(raw, "localhost:$port", credentials.raw)
9681
addUnknownService()
9782
}
9883

9984
private var started = false
100-
private var isShutdownInternal = false
85+
private var isShutdownInternal = atomic(false)
10186
override val isShutdown: Boolean
102-
get() = isShutdownInternal
87+
get() = isShutdownInternal.value
10388

10489
private val isTerminatedInternal = CompletableDeferred<Unit>()
10590
override val isTerminated: Boolean
@@ -112,6 +97,14 @@ internal class NativeServer(
11297
return this
11398
}
11499

100+
private fun dispose() {
101+
// disposed with completion of shutdown
102+
grpc_server_destroy(raw)
103+
methodAllocationCtxs.forEach { it.dispose() }
104+
// release the grpc runtime, so grpc is shutdown if no other grpc servers are running.
105+
rt.close()
106+
}
107+
115108
fun addService(service: ServerServiceDefinition) {
116109
check(!started) { internalError("Server already started") }
117110

@@ -137,7 +130,7 @@ internal class NativeServer(
137130
)
138131
)
139132
methodAllocationCtxs.add(ctx)
140-
133+
141134
kgrpc_server_set_register_method_allocator(
142135
server = raw,
143136
cq = cq.raw,
@@ -167,14 +160,14 @@ internal class NativeServer(
167160

168161

169162
override fun shutdown(): Server {
170-
if (isShutdownInternal) {
163+
if (!isShutdownInternal.compareAndSet(expect = false, update = true)) {
164+
// shutdown only once
171165
return this
172166
}
173-
isShutdownInternal = true
174167

175168
grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous {
176169
cq.shutdown().onComplete {
177-
methodAllocationCtxs.forEach { it.dispose() }
170+
dispose()
178171
isTerminatedInternal.complete(Unit)
179172
}
180173
})

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import libkgrpc.GRPC_OP_SEND_STATUS_FROM_SERVER
3131
import libkgrpc.grpc_byte_buffer
3232
import libkgrpc.grpc_byte_buffer_destroy
3333
import libkgrpc.grpc_call_cancel_with_status
34-
import libkgrpc.grpc_call_ref
3534
import libkgrpc.grpc_call_unref
3635
import libkgrpc.grpc_op
3736
import libkgrpc.grpc_slice
@@ -42,13 +41,12 @@ import kotlin.experimental.ExperimentalNativeApi
4241
import kotlin.native.ref.createCleaner
4342

4443
internal class NativeServerCall<Request, Response>(
44+
// ownership is transferred to the call
4545
val raw: CPointer<grpc_call>,
46-
val request: ServerCallbackRequest<Request, Response>,
46+
val cq: CompletionQueue,
4747
val methodDescriptor: MethodDescriptor<Request, Response>,
4848
) : ServerCall<Request, Response>() {
4949

50-
private val cq = request.cq
51-
5250
@Suppress("unused")
5351
private val rawCleaner = createCleaner(raw) {
5452
grpc_call_unref(it)
@@ -64,8 +62,6 @@ internal class NativeServerCall<Request, Response>(
6462
private val ready = atomic(true)
6563

6664
init {
67-
// take ownership of the raw pointer
68-
grpc_call_ref(raw)
6965
initialize()
7066
}
7167

@@ -85,11 +81,14 @@ internal class NativeServerCall<Request, Response>(
8581
val result = cq.runBatch(raw, op.ptr, 1u)
8682
if (result !is BatchResult.Submitted) {
8783
// we couldn't submit the initialization batch, so nothing can be done.
84+
arena.clear()
8885
finalize(true)
8986
} else {
9087
initialized = true
9188
result.future.onComplete {
92-
finalize(cancelled.value == 1)
89+
val cancelled = cancelled.value == 1
90+
arena.clear()
91+
finalize(cancelled)
9392
}
9493
}
9594
}
@@ -99,7 +98,6 @@ internal class NativeServerCall<Request, Response>(
9998
*/
10099
private fun finalize(cancelled: Boolean) {
101100
if (finalized.compareAndSet(expect = false, update = true)) {
102-
request.dispose()
103101
if (cancelled) {
104102
this.cancelled = true
105103
callbackMutex.withLock {
@@ -179,7 +177,9 @@ internal class NativeServerCall<Request, Response>(
179177
data.recv_message.recv_message = recvPtr.ptr
180178
}
181179

182-
runBatch(op.ptr, 1u, cleanup = { arena.clear() }) {
180+
runBatch(op.ptr, 1u, cleanup = {
181+
arena.clear()
182+
}) {
183183
// if the call was successful, but no message was received, we reached the end-of-stream.
184184
val buf = recvPtr.value
185185
if (buf == null) {
@@ -189,6 +189,10 @@ internal class NativeServerCall<Request, Response>(
189189
} else {
190190
val msg = methodDescriptor.getRequestMarshaller()
191191
.parse(buf.toKotlin().asInputStream())
192+
193+
// destroy the buffer, we don't need it anymore
194+
grpc_byte_buffer_destroy(buf)
195+
192196
callbackMutex.withLock {
193197
listener.onMessage(msg)
194198
}
@@ -272,7 +276,6 @@ private class DeferredCallListener<T> : ServerCall.Listener<T>() {
272276
private val q = ArrayDeque<(ServerCall.Listener<T>) -> Unit>()
273277

274278
fun setDelegate(d: ServerCall.Listener<T>) {
275-
println("setting delegate...")
276279
mutex.withLock {
277280
if (delegate != null) return
278281
delegate = d

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,25 @@ internal class ServerCallbackRequest<Request, Response>(
5656
}
5757

5858
override fun run(ok: Boolean) {
59-
if (!ok) {
60-
// the call has been shutdown.
61-
// free up the request.
59+
try {
60+
if (!ok) {
61+
// the call has been shutdown.\
62+
return
63+
}
64+
65+
// create a NativeServerCall to control the underlying core call.
66+
// ownership of the core call is transferred to the NativeServerCall.
67+
val call = NativeServerCall(rawCall.value!!, cq, method.getMethodDescriptor())
68+
// TODO: Turn metadata into a kotlin GrpcTrailers.
69+
val trailers = GrpcTrailers()
70+
// start the actual call.
71+
val listener = method.getServerCallHandler().startCall(call, trailers)
72+
call.setListener(listener)
73+
} finally {
74+
// at this point, all return values have been transformed into kotlin ones,
75+
// so we can safely clear all resources.
6276
dispose()
63-
return
6477
}
65-
66-
// TODO: The NativeServerCall must call dispose() ones the call is completed.
67-
// create a NativeServerCall to control the underlying core call.
68-
// ownership of the core call is transferred to the NativeServerCall.
69-
val call = NativeServerCall(rawCall.value!!, this, method.getMethodDescriptor())
70-
// TODO: Implement trailers.
71-
val trailers = GrpcTrailers()
72-
// start the actual call.
73-
val listener = method.getServerCallHandler().startCall(call, trailers)
74-
call.setListener(listener)
7578
}
7679
}
7780

0 commit comments

Comments
 (0)