Skip to content

evaluate cancelled but async jobs continue #109

@TelephoneTan

Description

@TelephoneTan

Currently, the evaluate can cooperate with coroutine cancel, it will return when the current coroutine is cancelled, this is good.

BUT the async jobs triggered by evaluate including their callbacks will last forever and continue to run after evaluate has been cancelled, this is bad. We can say that the async jobs are leaked.

The root cause is here:

private val coroutineScope = CoroutineScope(jobDispatcher + exceptionHandler)
// ...
internal actual fun invokeAsyncFunction(
        args: Array<Any?>,
        block: suspend (bindingArgs: Array<Any?>) -> Any?,
    ) {
        ensureNotClosed()
        val (resolveHandle, rejectHandle) = promiseHandlesFromArgs(args)
        val job = coroutineScope.launch {
            try {
                val result = block(args.sliceArray(2..<args.size))
                jsMutex.withLock {
                    // Call resolve() on JNI side
                    invokeJsFunction(
                        context = context,
                        globals = globals,
                        handle = resolveHandle,
                        args = arrayOf(result)
                    )
                }
            } catch (e: Throwable) {
                jsMutex.withLock {
                    // Call reject() on JNI side
                    invokeJsFunction(
                        context = context,
                        globals = globals,
                        handle = rejectHandle,
                        args = arrayOf(e)
                    )
                }
            }
            jsMutex.withLock {
                // The job is completed, see what we can do next:
                // - Execute subsequent Promises
                // - Cancel all jobs and fail, if rejected and JS didn't handle it
                do {
                    val executed = executePendingJob(context, globals)
                } while (executed)
            }
        }
        jobsMutex.withLockSync { asyncJobs += job }
        job.invokeOnCompletion {
            jobsMutex.withLockSync { asyncJobs -= job }
        }
    }

invokeAsyncFunction uses a independent CoroutineScope instance to launch async jobs, but this CoroutineScope instance doesn't has the coroutine Job of evaluate's current coroutine as its scope Job, so all async jobs escape out of the "structured concurrency" of the evaluate's current coroutine and won't get cancelled if evaluate is cancelled.

The solution is quite simple: inject the Job of evaluate that triggers async jobs to the coroutine scope of those async jobs, then every async job triggered by an evaluate will listen to this evaluate's Job and get cancelled automatically when this evaluate is cancelled:

internal actual fun invokeAsyncFunction(
        // ###################################################################
        // inject the Job either through parameters or through outside context
        evaluateJob: Job,
        // ###################################################################
        args: Array<Any?>,
        block: suspend (bindingArgs: Array<Any?>) -> Any?,
    ) {
        ensureNotClosed()
        val (resolveHandle, rejectHandle) = promiseHandlesFromArgs(args)
        // ###################################################################
        // use the evaluate Job as the parent Job of this async job
        val job = coroutineScope.launch(evaluateJob) {
        // ###################################################################
            // ............
        }
        jobsMutex.withLockSync { asyncJobs += job }
        job.invokeOnCompletion {
            jobsMutex.withLockSync { asyncJobs -= job }
        }
    }

Now our async jobs will get cancelled automatically, so we can wait for them in evaluate even if we are in cancelled state:

private suspend fun awaitAsyncJobs() {
        jsMutex.withLock {
            do {
                // Execute JS Promises, putting this in while(true) is unnecessary
                // since we have the same loop after every asyncFunction call
                val executed = executePendingJob(context, globals)
            } while (executed)
        }
        // ######################################################################
        // This is important.
        //
        // We have to wait for all async jobs to get completed (even if they are
        // cancelled) before we return, so the code here must continue to run
        // even if we are in cancelled state.
        withContext(NonCancellable){
        // ######################################################################
            while (true) {
                val jobs = jobsMutex.withLock { asyncJobs.filter { it.isActive } }
                if (jobs.isEmpty()) {
                    // No jobs to run
                    break
                }
                jobs.joinAll()
            }
        }
    }

Additionally, even when evaluate is cancelled, the sync JS code must continue to run to push the JS code state to the correct final state so we can make sure no pending jobs are leaked, and JS code is not in a strange middle state that could probably causes strange bugs. All the clean-up steps must run as usual too:

internal actual fun invokeAsyncFunction(
        // ###################################################################
        // inject the Job either through parameters or through outside context
        evaluateJob: Job,
        // ###################################################################
        args: Array<Any?>,
        block: suspend (bindingArgs: Array<Any?>) -> Any?,
    ) {
        ensureNotClosed()
        val (resolveHandle, rejectHandle) = promiseHandlesFromArgs(args)
        // ###################################################################
        // use the evaluate Job as the parent Job of this async job
        val job = coroutineScope.launch(evaluateJob) {
        // ###################################################################
            try {
                val result = block(args.sliceArray(2..<args.size))
                // ###################################################################
                // JS sync code
                withContext(NonCancellable){
                // ###################################################################
                    jsMutex.withLock {
                        // Call resolve() on JNI side
                        invokeJsFunction(
                            context = context,
                            globals = globals,
                            handle = resolveHandle,
                            args = arrayOf(result)
                        )
                    }
                }
            } catch (e: Throwable) {
                // ###################################################################
                // JS sync code
                withContext(NonCancellable) {
                // ###################################################################
                    jsMutex.withLock {
                        // Call reject() on JNI side
                        invokeJsFunction(
                            context = context,
                            globals = globals,
                            handle = rejectHandle,
                            args = arrayOf(e)
                        )
                    }
                }
            }
            // ###################################################################
            // JS sync code
            withContext(NonCancellable) {
            // ###################################################################
                jsMutex.withLock {
                    // The job is completed, see what we can do next:
                    // - Execute subsequent Promises
                    // - Cancel all jobs and fail, if rejected and JS didn't handle it
                    do {
                        val executed = executePendingJob(context, globals)
                    } while (executed)
                }
            }
        }
        jobsMutex.withLockSync { asyncJobs += job }
        job.invokeOnCompletion {
            jobsMutex.withLockSync { asyncJobs -= job }
        }
    }

private suspend fun awaitAsyncJobs() {
        // ###################################################################
        // JS sync code
        withContext(NonCancellable) {
        // ###################################################################
            jsMutex.withLock {
                do {
                    // Execute JS Promises, putting this in while(true) is unnecessary
                    // since we have the same loop after every asyncFunction call
                    val executed = executePendingJob(context, globals)
                } while (executed)
            }
        }
        // ######################################################################
        // This is important.
        //
        // We have to wait for all async jobs to get completed (even if they are
        // cancelled) before we return, so the code here must continue to run
        // even if we are in cancelled state.
        withContext(NonCancellable){
        // ######################################################################
            while (true) {
                val jobs = jobsMutex.withLock { asyncJobs.filter { it.isActive } }
                if (jobs.isEmpty()) {
                    // No jobs to run
                    break
                }
                jobs.joinAll()
            }
        }
    }

private suspend fun evalAndAwait(evalBlock: suspend () -> Any?): Any? {
        ensureNotClosed()
        evalException = null
        // ###################################################################
        // NonCancellable to avoid loadModules() stops at a middle state
        withContext(NonCancellable) {
        // ###################################################################
            loadModules()
        }
        val result = jsResultMutex.withLock {
            jsMutex.withLock {
                // ###################################################################
                // NonCancellable to avoid evalBlock() stops at a middle state
                withContext(NonCancellable) {
                // ###################################################################
                    evalBlock()
                }
            }
            // ###################################################################
            // JS sync code
            withContext(NonCancellable) {
            // ###################################################################
                awaitAsyncJobs()
            }
            // ###################################################################
            // Clean-up run as usual
            withContext(NonCancellable) {
            // ###################################################################
                jsMutex.withLock { getEvaluateResult(context, globals) }
            }
        }
        handleException()
        return result
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions