Skip to content

Commit 4557761

Browse files
authored
Fix realm scheduler leak (#1463)
1 parent 6b1fa2f commit 4557761

File tree

21 files changed

+260
-165
lines changed

21 files changed

+260
-165
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
### Fixed
1010
* Rare corruption causing 'Invalid streaming format cookie'-exception. Typically following compact, convert or copying to a new file. (Issue [#1440](https://github.com/realm/realm-kotlin/issues/1440))
1111
* Compiler error when using Kotlin 1.9.0 and backlinks. (Issue [#1469](https://github.com/realm/realm-kotlin/issues/1469))
12+
* Leaking `JVMScheduler` instances. In certain circumstances, it could lead to a JNI crash. (Issue [#1463](https://github.com/realm/realm-kotlin/pull/1463))
1213
* [Sync] Changing a subscriptions query type or query itself will now trigger the `WaitForSync.FIRST_TIME` behaviour, rather than only checking changes to the name. (Issues [#1466](https://github.com/realm/realm-kotlin/issues/1466))
1314

1415
### Compatibility

packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/NativePointer.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,14 @@ public interface NativePointer<T : CapiT> {
4343
*/
4444
public fun isReleased(): Boolean
4545
}
46+
47+
/**
48+
* Deletes the underlying pointer after executing the lambda block.
49+
*/
50+
fun <T : CapiT, R> NativePointer<T>.use(
51+
block: (NativePointer<T>) -> R,
52+
): R = try {
53+
block(this)
54+
} finally {
55+
release()
56+
}

packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ interface RealmQueryT : CapiT
7070
interface RealmCallbackTokenT : CapiT
7171
interface RealmNotificationTokenT : CapiT
7272
interface RealmChangesT : CapiT
73+
interface RealmSchedulerT : CapiT
7374

7475
// Public type aliases binding to internal verbose type safe type definitions. This should allow us
7576
// to easily change implementation details later on.
@@ -88,6 +89,7 @@ typealias RealmQueryPointer = NativePointer<RealmQueryT>
8889
typealias RealmCallbackTokenPointer = NativePointer<RealmCallbackTokenT>
8990
typealias RealmNotificationTokenPointer = NativePointer<RealmNotificationTokenT>
9091
typealias RealmChangesPointer = NativePointer<RealmChangesT>
92+
typealias RealmSchedulerPointer = NativePointer<RealmSchedulerT>
9193

9294
// Sync types
9395
// Pure marker interfaces corresponding to the C-API realm_x_t struct types
@@ -190,6 +192,8 @@ expect object RealmInterop {
190192
fun realm_config_set_in_memory(config: RealmConfigurationPointer, inMemory: Boolean)
191193
fun realm_schema_validate(schema: RealmSchemaPointer, mode: SchemaValidationMode): Boolean
192194

195+
fun realm_create_scheduler(): RealmSchedulerPointer
196+
fun realm_create_scheduler(dispatcher: CoroutineDispatcher): RealmSchedulerPointer
193197
/**
194198
* Open a realm on the current thread.
195199
*
@@ -211,7 +215,7 @@ expect object RealmInterop {
211215
*/
212216
// The dispatcher argument is only used on Native to build a core scheduler dispatching to the
213217
// dispatcher. The realm itself must also be opened on the same thread
214-
fun realm_open(config: RealmConfigurationPointer, dispatcher: CoroutineDispatcher? = null): Pair<LiveRealmPointer, Boolean>
218+
fun realm_open(config: RealmConfigurationPointer, scheduler: RealmSchedulerPointer): Pair<LiveRealmPointer, Boolean>
215219

216220
// Opening a Realm asynchronously. Only supported for synchronized realms.
217221
fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer

packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import io.realm.kotlin.internal.interop.sync.SyncSessionResyncMode
3333
import io.realm.kotlin.internal.interop.sync.SyncUserIdentity
3434
import kotlinx.coroutines.CoroutineDispatcher
3535
import kotlinx.coroutines.CoroutineScope
36-
import kotlinx.coroutines.CoroutineStart
3736
import kotlinx.coroutines.launch
3837
import org.mongodb.kbson.ObjectId
3938

@@ -180,22 +179,26 @@ actual object RealmInterop {
180179
realmc.realm_config_set_in_memory(config.cptr(), inMemory)
181180
}
182181

183-
actual fun realm_open(config: RealmConfigurationPointer, dispatcher: CoroutineDispatcher?): Pair<LiveRealmPointer, Boolean> {
182+
actual fun realm_create_scheduler(): RealmSchedulerPointer =
183+
LongPointerWrapper(realmc.realm_scheduler_make_default())
184+
185+
actual fun realm_create_scheduler(dispatcher: CoroutineDispatcher): RealmSchedulerPointer =
186+
LongPointerWrapper(realmc.realm_create_scheduler(JVMScheduler(dispatcher)))
187+
188+
actual fun realm_open(
189+
config: RealmConfigurationPointer,
190+
scheduler: RealmSchedulerPointer,
191+
): Pair<LiveRealmPointer, Boolean> {
184192
// Configure callback to track if the file was created as part of opening
185193
var fileCreated = false
186194
val callback = DataInitializationCallback {
187195
fileCreated = true
188196
}
189197
realm_config_set_data_initialization_function(config, callback)
190198

191-
// create a custom Scheduler for JVM if a Coroutine Dispatcher is provided other wise
192-
// pass null to use the generic one
193-
val realmPtr = LongPointerWrapper<LiveRealmT>(
194-
realmc.open_realm_with_scheduler(
195-
(config as LongPointerWrapper).ptr,
196-
if (dispatcher != null) JVMScheduler(dispatcher) else null
197-
)
198-
)
199+
realmc.realm_config_set_scheduler(config.cptr(), scheduler.cptr())
200+
val realmPtr = LongPointerWrapper<LiveRealmT>(realmc.realm_open(config.cptr()))
201+
199202
// Ensure that we can read version information, etc.
200203
realm_begin_read(realmPtr)
201204
return Pair(realmPtr, fileCreated)
@@ -2093,13 +2096,8 @@ private class JVMScheduler(dispatcher: CoroutineDispatcher) {
20932096
val scope: CoroutineScope = CoroutineScope(dispatcher)
20942097

20952098
fun notifyCore(schedulerPointer: Long) {
2096-
val function: suspend CoroutineScope.() -> Unit = {
2099+
scope.launch {
20972100
realmc.invoke_core_notify_callback(schedulerPointer)
20982101
}
2099-
scope.launch(
2100-
scope.coroutineContext,
2101-
CoroutineStart.DEFAULT,
2102-
function
2103-
)
21042102
}
21052103
}

packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt

Lines changed: 65 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package io.realm.kotlin.internal.interop
2020

2121
import io.realm.kotlin.internal.interop.Constants.ENCRYPTION_KEY_LENGTH
22-
import io.realm.kotlin.internal.interop.RealmInterop.safeKString
2322
import io.realm.kotlin.internal.interop.sync.ApiKeyWrapper
2423
import io.realm.kotlin.internal.interop.sync.AppError
2524
import io.realm.kotlin.internal.interop.sync.AuthProvider
@@ -80,7 +79,6 @@ import kotlinx.cinterop.usePinned
8079
import kotlinx.cinterop.value
8180
import kotlinx.coroutines.CoroutineDispatcher
8281
import kotlinx.coroutines.CoroutineScope
83-
import kotlinx.coroutines.CoroutineStart
8482
import kotlinx.coroutines.launch
8583
import org.mongodb.kbson.BsonObjectId
8684
import org.mongodb.kbson.ObjectId
@@ -494,11 +492,10 @@ actual object RealmInterop {
494492
)
495493
}
496494

497-
actual fun realm_open(config: RealmConfigurationPointer, dispatcher: CoroutineDispatcher?): Pair<LiveRealmPointer, Boolean> {
495+
actual fun realm_open(config: RealmConfigurationPointer, scheduler: RealmSchedulerPointer): Pair<LiveRealmPointer, Boolean> {
498496
val fileCreated = atomic(false)
499497
val callback = DataInitializationCallback {
500498
fileCreated.value = true
501-
true
502499
}
503500
realm_wrapper.realm_config_set_data_initialization_function(
504501
config.cptr(),
@@ -516,21 +513,75 @@ actual object RealmInterop {
516513
// val dispatcher = runBlocking { coroutineContext[CoroutineDispatcher.Key] }
517514
// but requires opting in for @ExperimentalStdlibApi, and have really gotten it to play
518515
// for default cases.
519-
if (dispatcher != null) {
520-
val scheduler = checkedPointerResult(createSingleThreadDispatcherScheduler(dispatcher))
521-
realm_wrapper.realm_config_set_scheduler(config.cptr(), scheduler)
522-
} else {
523-
// If there is no notification dispatcher use the default scheduler.
524-
// Re-verify if this is actually needed when notification scheduler is fully in place.
525-
val scheduler = checkedPointerResult(realm_wrapper.realm_scheduler_make_default())
526-
realm_wrapper.realm_config_set_scheduler(config.cptr(), scheduler)
527-
}
516+
realm_wrapper.realm_config_set_scheduler(config.cptr(), scheduler.cptr())
517+
528518
val realmPtr = CPointerWrapper<LiveRealmT>(realm_wrapper.realm_open(config.cptr()))
529519
// Ensure that we can read version information, etc.
530520
realm_begin_read(realmPtr)
531521
return Pair(realmPtr, fileCreated.value)
532522
}
533523

524+
actual fun realm_create_scheduler(): RealmSchedulerPointer {
525+
// If there is no notification dispatcher use the default scheduler.
526+
// Re-verify if this is actually needed when notification scheduler is fully in place.
527+
val scheduler = checkedPointerResult(realm_wrapper.realm_scheduler_make_default())
528+
return CPointerWrapper<RealmSchedulerT>(scheduler)
529+
}
530+
531+
actual fun realm_create_scheduler(dispatcher: CoroutineDispatcher): RealmSchedulerPointer {
532+
printlntid("createSingleThreadDispatcherScheduler")
533+
val scheduler = SingleThreadDispatcherScheduler(tid(), dispatcher)
534+
535+
val capi_scheduler: CPointer<realm_scheduler_t> = checkedPointerResult(
536+
realm_wrapper.realm_scheduler_new(
537+
// userdata: kotlinx.cinterop.CValuesRef<*>?,
538+
scheduler.ref,
539+
540+
// free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
541+
staticCFunction<COpaquePointer?, Unit> { userdata ->
542+
printlntid("free")
543+
userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
544+
},
545+
546+
// notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
547+
staticCFunction<COpaquePointer?, Unit> { userdata ->
548+
// Must be thread safe
549+
val scheduler =
550+
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
551+
printlntid("$scheduler notify")
552+
try {
553+
scheduler.notify()
554+
} catch (e: Exception) {
555+
// Should never happen, but is included for development to get some indicators
556+
// on errors instead of silent crashes.
557+
e.printStackTrace()
558+
}
559+
},
560+
561+
// is_on_thread: realm_wrapper.realm_scheduler_is_on_thread_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
562+
staticCFunction<COpaquePointer?, Boolean> { userdata ->
563+
// Must be thread safe
564+
val scheduler =
565+
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
566+
printlntid("is_on_thread[$scheduler] ${scheduler.threadId} " + tid())
567+
scheduler.threadId == tid()
568+
},
569+
570+
// is_same_as: realm_wrapper.realm_scheduler_is_same_as_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */, kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
571+
staticCFunction<COpaquePointer?, COpaquePointer?, Boolean> { userdata, other ->
572+
userdata == other
573+
},
574+
575+
// can_deliver_notifications: realm_wrapper.realm_scheduler_can_deliver_notifications_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
576+
staticCFunction<COpaquePointer?, Boolean> { _ -> true },
577+
)
578+
) ?: error("Couldn't create scheduler")
579+
580+
scheduler.set_scheduler(capi_scheduler)
581+
582+
return CPointerWrapper<RealmSchedulerT>(capi_scheduler)
583+
}
584+
534585
actual fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer {
535586
return CPointerWrapper(realm_wrapper.realm_open_synchronized(config.cptr()))
536587
}
@@ -3241,61 +3292,6 @@ actual object RealmInterop {
32413292
?: throw NullPointerException(identifier?.let { "'$identifier' shouldn't be null." })
32423293
}
32433294

3244-
private fun createSingleThreadDispatcherScheduler(
3245-
dispatcher: CoroutineDispatcher
3246-
): CPointer<realm_scheduler_t> {
3247-
printlntid("createSingleThreadDispatcherScheduler")
3248-
val scheduler = SingleThreadDispatcherScheduler(tid(), dispatcher)
3249-
3250-
val capi_scheduler: CPointer<realm_scheduler_t> = checkedPointerResult(
3251-
realm_wrapper.realm_scheduler_new(
3252-
// userdata: kotlinx.cinterop.CValuesRef<*>?,
3253-
scheduler.ref,
3254-
3255-
// free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
3256-
staticCFunction<COpaquePointer?, Unit> { userdata ->
3257-
printlntid("free")
3258-
userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
3259-
},
3260-
3261-
// notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
3262-
staticCFunction<COpaquePointer?, Unit> { userdata ->
3263-
// Must be thread safe
3264-
val scheduler =
3265-
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
3266-
printlntid("$scheduler notify")
3267-
try {
3268-
scheduler.notify()
3269-
} catch (e: Exception) {
3270-
// Should never happen, but is included for development to get some indicators
3271-
// on errors instead of silent crashes.
3272-
e.printStackTrace()
3273-
}
3274-
},
3275-
3276-
// is_on_thread: realm_wrapper.realm_scheduler_is_on_thread_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
3277-
staticCFunction<COpaquePointer?, Boolean> { userdata ->
3278-
// Must be thread safe
3279-
val scheduler =
3280-
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
3281-
printlntid("is_on_thread[$scheduler] ${scheduler.threadId} " + tid())
3282-
scheduler.threadId == tid()
3283-
},
3284-
3285-
// is_same_as: realm_wrapper.realm_scheduler_is_same_as_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */, kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
3286-
staticCFunction<COpaquePointer?, COpaquePointer?, Boolean> { userdata, other ->
3287-
userdata == other
3288-
},
3289-
3290-
// can_deliver_notifications: realm_wrapper.realm_scheduler_can_deliver_notifications_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Boolean>>? */,
3291-
staticCFunction<COpaquePointer?, Boolean> { userdata -> true },
3292-
)
3293-
) ?: error("Couldn't create scheduler")
3294-
scheduler.set_scheduler(capi_scheduler)
3295-
scheduler
3296-
return capi_scheduler
3297-
}
3298-
32993295
private fun <R> handleAppCallback(
33003296
userData: COpaquePointer?,
33013297
error: CPointer<realm_app_error_t>?,
@@ -3401,7 +3397,7 @@ actual object RealmInterop {
34013397
}
34023398

34033399
override fun notify() {
3404-
val function: suspend CoroutineScope.() -> Unit = {
3400+
scope.launch {
34053401
try {
34063402
printlntid("on dispatcher")
34073403
realm_wrapper.realm_scheduler_perform_work(scheduler)
@@ -3411,11 +3407,6 @@ actual object RealmInterop {
34113407
e.printStackTrace()
34123408
}
34133409
}
3414-
scope.launch(
3415-
scope.coroutineContext,
3416-
CoroutineStart.DEFAULT,
3417-
function
3418-
)
34193410
}
34203411
}
34213412
}

packages/cinterop/src/nativeDarwinTest/kotlin/io/realm/kotlin/test/CinteropTest.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.realm.kotlin.internal.interop.SchemaMode
2828
import io.realm.kotlin.internal.interop.SchemaValidationMode
2929
import io.realm.kotlin.internal.interop.set
3030
import io.realm.kotlin.internal.interop.toKotlinString
31+
import io.realm.kotlin.internal.interop.use
3132
import kotlinx.cinterop.BooleanVar
3233
import kotlinx.cinterop.CPointer
3334
import kotlinx.cinterop.CPointerVarOf
@@ -178,9 +179,12 @@ class CinteropTest {
178179
SchemaMode.RLM_SCHEMA_MODE_AUTOMATIC
179180
)
180181
RealmInterop.realm_config_set_schema_version(nativeConfig, 1)
181-
182-
val (realm, fileCreated) = RealmInterop.realm_open(nativeConfig)
183-
assertEquals(1L, RealmInterop.realm_get_num_classes(realm))
182+
RealmInterop.realm_create_scheduler()
183+
.use { scheduler ->
184+
val (realm, fileCreated) = RealmInterop.realm_open(nativeConfig, scheduler)
185+
assertEquals(1L, RealmInterop.realm_get_num_classes(realm))
186+
RealmInterop.realm_close(realm)
187+
}
184188
}
185189
}
186190

packages/jni-swig-stub/realm.i

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ return $jnicall;
301301
realm_flx_sync_mutable_subscription_set_t*, realm_flx_sync_subscription_desc_t*,
302302
realm_set_t*, realm_async_open_task_t*, realm_dictionary_t*,
303303
realm_sync_session_connection_state_notification_token_t*,
304-
realm_dictionary_changes_t* };
304+
realm_dictionary_changes_t*, realm_scheduler_t* };
305305

306306
// For all functions returning a pointer or bool, check for null/false and throw an error if
307307
// realm_get_last_error returns true.

packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ void invoke_core_notify_callback(int64_t scheduler) {
313313
realm_scheduler_perform_work(reinterpret_cast<realm_scheduler_t *>(scheduler));
314314
}
315315

316-
realm_t *open_realm_with_scheduler(int64_t config_ptr, jobject dispatchScheduler) {
317-
auto config = reinterpret_cast<realm_config_t *>(config_ptr);
316+
realm_scheduler_t*
317+
realm_create_scheduler(jobject dispatchScheduler) {
318318
if (dispatchScheduler) {
319319
auto jvmScheduler = new CustomJVMScheduler(dispatchScheduler);
320320
auto scheduler = realm_scheduler_new(
@@ -326,13 +326,9 @@ realm_t *open_realm_with_scheduler(int64_t config_ptr, jobject dispatchScheduler
326326
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->can_invoke(); }
327327
);
328328
jvmScheduler->set_scheduler(scheduler);
329-
realm_config_set_scheduler(config, scheduler);
330-
} else {
331-
// TODO refactor to use public C-API https://github.com/realm/realm-kotlin/issues/496
332-
auto scheduler = new realm_scheduler_t{realm::util::Scheduler::make_generic()};
333-
realm_config_set_scheduler(config, scheduler);
329+
return scheduler;
334330
}
335-
return realm_open(config);
331+
throw std::runtime_error("Null dispatchScheduler");
336332
}
337333

338334
jobject convert_to_jvm_app_error(JNIEnv* env, const realm_app_error_t* error) {

packages/jni-swig-stub/src/main/jni/realm_api_helpers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ realm_network_transport_new(jobject network_transport);
4747
void
4848
set_log_callback(jint log_level, jobject log_callback);
4949

50-
realm_t*
51-
open_realm_with_scheduler(int64_t config_ptr, jobject dispatchScheduler);
50+
realm_scheduler_t*
51+
realm_create_scheduler(jobject dispatchScheduler);
5252

5353
bool
5454
realm_should_compact_callback(void* userdata, uint64_t total_bytes, uint64_t used_bytes);

0 commit comments

Comments
 (0)