|
16 | 16 |
|
17 | 17 | package kotlinx.coroutines.experimental
|
18 | 18 |
|
| 19 | +import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine |
19 | 20 | import java.util.concurrent.ScheduledExecutorService
|
20 | 21 | import java.util.concurrent.ScheduledThreadPoolExecutor
|
21 | 22 | import java.util.concurrent.TimeUnit
|
| 23 | +import kotlin.coroutines.experimental.Continuation |
22 | 24 | import kotlin.coroutines.experimental.ContinuationInterceptor
|
23 |
| -import kotlin.coroutines.experimental.startCoroutine |
| 25 | +import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn |
24 | 26 |
|
25 | 27 | private val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
|
26 | 28 |
|
@@ -70,25 +72,25 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
|
70 | 72 | public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
|
71 | 73 | require(time >= 0) { "Timeout time $time cannot be negative" }
|
72 | 74 | if (time <= 0L) throw CancellationException("Timed out immediately")
|
73 |
| - return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<T> -> |
| 75 | + return suspendCoroutineOrReturn sc@ { delegate: Continuation<T> -> |
74 | 76 | // schedule cancellation of this continuation on time
|
75 |
| - val runnable = CancelTimedOutContinuationRunnable(time, unit, cont) |
| 77 | + val cont = TimeoutContinuation(time, unit, delegate) |
76 | 78 | val delay = cont.context[ContinuationInterceptor] as? Delay
|
77 | 79 | if (delay != null)
|
78 |
| - cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, runnable)) else |
79 |
| - cont.cancelFutureOnCompletion(scheduledExecutor.schedule(runnable, time, unit)) |
80 |
| - // restart block in a separate coroutine using cancellable context of this continuation, |
81 |
| - block.startCoroutine(cont) |
| 80 | + cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else |
| 81 | + cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit)) |
| 82 | + // restart block using cancellable context of this continuation, |
| 83 | + // however start it as undispatched coroutine, because we are already in the proper context |
| 84 | + block.startUndispatchedCoroutine(cont) |
| 85 | + cont.getResult() |
82 | 86 | }
|
83 | 87 | }
|
84 | 88 |
|
85 |
| -private class CancelTimedOutContinuationRunnable( |
| 89 | +private class TimeoutContinuation<T>( |
86 | 90 | private val time: Long,
|
87 | 91 | private val unit: TimeUnit,
|
88 |
| - private val cont: CancellableContinuation<*> |
89 |
| -): Runnable { |
90 |
| - override fun run() { |
91 |
| - cont.cancel(CancellationException("Timed out waiting for $time $unit")) |
92 |
| - } |
93 |
| - override fun toString(): String = "CancelTimedOutContinuationRunnable[$time,$unit,$cont]" |
| 92 | + delegate: Continuation<T> |
| 93 | +) : CancellableContinuationImpl<T>(delegate, active = true), Runnable { |
| 94 | + override fun defaultResumeMode(): Int = MODE_DIRECT |
| 95 | + override fun run() { cancel(CancellationException("Timed out waiting for $time $unit")) } |
94 | 96 | }
|
0 commit comments