Skip to content

Commit 19f4845

Browse files
committed
MPP: withTimeout, delay in int number of milliseconds
1 parent 9d5abcd commit 19f4845

File tree

8 files changed

+206
-26
lines changed

8 files changed

+206
-26
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonDelay.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ package kotlinx.coroutines.experimental
22

33
public expect interface Delay
44

5-
public expect suspend fun delay(time: Long)
5+
public expect suspend fun delay(time: Int)
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
package kotlinx.coroutines.experimental
22

3+
public expect suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T
4+
5+
public expect suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T?
6+
7+
public expect class TimeoutCancellationException public constructor(message: String)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,8 @@ public actual interface Delay {
8484
*
8585
* @param time time in milliseconds.
8686
*/
87-
public actual suspend fun delay(time: Long) {
88-
kotlin.require(time >= 0) { "Delay time $time cannot be negative" }
89-
if (time <= 0) return // don't delay
90-
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
91-
cont.context.delay.scheduleResumeAfterDelay(time, TimeUnit.MILLISECONDS, cont)
92-
}
93-
}
87+
public actual suspend fun delay(time: Int) =
88+
delay(time.toLong(), TimeUnit.MILLISECONDS)
9489

9590
/**
9691
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2525
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2626
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2727

28+
/**
29+
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
30+
* [TimeoutCancellationException] if timeout was exceeded.
31+
*
32+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
33+
* cancellable suspending function inside the block throws [TimeoutCancellationException].
34+
* Even if the code in the block suppresses [TimeoutCancellationException], it
35+
* is still thrown by `withTimeout` invocation.
36+
*
37+
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
38+
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
39+
*
40+
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
41+
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
42+
*
43+
* @param time timeout time in milliseconds.
44+
*/
45+
public actual suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T =
46+
withTimeout(time.toLong(), TimeUnit.MILLISECONDS, block)
47+
2848
/**
2949
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
3050
* [TimeoutCancellationException] if timeout was exceeded.
@@ -107,6 +127,26 @@ private open class TimeoutCoroutine<U, in T: U>(
107127
"${super.nameString()}($time $unit)"
108128
}
109129

130+
/**
131+
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
132+
* `null` if this timeout was exceeded.
133+
*
134+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
135+
* cancellable suspending function inside the block throws [TimeoutCancellationException].
136+
* Even if the code in the block suppresses [TimeoutCancellationException], this
137+
* invocation of `withTimeoutOrNull` still returns `null`.
138+
*
139+
* The sibling function that throws exception on timeout is [withTimeout].
140+
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
141+
*
142+
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
143+
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
144+
*
145+
* @param time timeout time in milliseconds.
146+
*/
147+
public actual suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? =
148+
withTimeoutOrNull(time.toLong(), TimeUnit.MILLISECONDS, block)
149+
110150
/**
111151
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
112152
* `null` if this timeout was exceeded.
@@ -161,14 +201,14 @@ private class TimeoutOrNullCoroutine<T>(
161201
* This exception is thrown by [withTimeout] to indicate timeout.
162202
*/
163203
@Suppress("DEPRECATION")
164-
public class TimeoutCancellationException internal constructor(
204+
public actual class TimeoutCancellationException internal constructor(
165205
message: String,
166206
@JvmField internal val coroutine: Job?
167207
) : TimeoutException(message) {
168208
/**
169209
* Creates timeout exception with a given message.
170210
*/
171-
public constructor(message: String) : this(message, null)
211+
public actual constructor(message: String) : this(message, null)
172212
}
173213

174214
/**

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ internal object DefaultExecutor : CoroutineDispatcher(), Delay {
4444
window.setTimeout({ block.run() }, 0)
4545
}
4646

47-
override fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>) {
48-
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.timeToInt())
47+
override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) {
48+
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.coerceAtLeast(0))
4949
}
5050

51-
override fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle {
52-
val handle = window.setTimeout({ block.run() }, time.timeToInt())
51+
override fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle {
52+
val handle = window.setTimeout({ block.run() }, time.coerceAtLeast(0))
5353
return object : DisposableHandle {
5454
override fun dispose() {
5555
window.clearTimeout(handle)

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ public actual interface Delay {
4242
* with(continuation) { resumeUndispatched(Unit) }
4343
* ```
4444
*/
45-
fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>)
45+
fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>)
4646

4747
/**
4848
* Schedules invocation of a specified [block] after a specified delay [time].
4949
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
5050
* request if it is not needed anymore.
5151
*/
52-
fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle
52+
fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle
5353
}
5454

5555
/**
@@ -63,12 +63,11 @@ public actual interface Delay {
6363
*
6464
* @param time time in milliseconds.
6565
*/
66-
public actual suspend fun delay(time: Long) {
67-
val dt = time.toDouble()
68-
kotlin.require(dt >= 0) { "Delay time $time cannot be negative" }
69-
if (dt <= 0) return // don't delay
66+
public actual suspend fun delay(time: Int) {
67+
kotlin.require(time >= 0) { "Delay time $time cannot be negative" }
68+
if (time <= 0) return // don't delay
7069
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
71-
cont.context.delay.scheduleResumeAfterDelay(dt, cont)
70+
cont.context.delay.scheduleResumeAfterDelay(time, cont)
7271
}
7372
}
7473

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
5858
override fun dispatch(context: CoroutineContext, block: Runnable) =
5959
enqueue(block.toQueuedTask())
6060

61-
override fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>) =
61+
override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) =
6262
schedule(DelayedResumeTask(time, continuation))
6363

64-
override fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle =
64+
override fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle =
6565
DelayedRunnableTask(time, block).also { schedule(it) }
6666

6767
override fun processNextEvent(): Double {
@@ -129,7 +129,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
129129
}
130130

131131
internal abstract inner class DelayedTask(
132-
delay: Double
132+
delay: Int
133133
) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle, HeapNode {
134134
override var index: Int = -1
135135
var state = DELAYED
@@ -169,7 +169,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
169169
}
170170

171171
private inner class DelayedResumeTask(
172-
time: Double,
172+
time: Int,
173173
private val cont: CancellableContinuation<Unit>
174174
) : DelayedTask(time) {
175175
override fun run() {
@@ -178,7 +178,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
178178
}
179179

180180
private inner class DelayedRunnableTask(
181-
time: Double,
181+
time: Int,
182182
private val block: Runnable
183183
) : DelayedTask(time) {
184184
override fun run() { block.run() }
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,143 @@
11
package kotlinx.coroutines.experimental
22

3+
import kotlin.coroutines.experimental.Continuation
4+
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
5+
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
6+
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
7+
8+
/**
9+
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
10+
* [TimeoutCancellationException] if timeout was exceeded.
11+
*
12+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
13+
* cancellable suspending function inside the block throws [TimeoutCancellationException].
14+
* Even if the code in the block suppresses [TimeoutCancellationException], it
15+
* is still thrown by `withTimeout` invocation.
16+
*
17+
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
18+
*
19+
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
20+
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
21+
*
22+
* @param time timeout time in milliseconds.
23+
*/
24+
public actual suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T {
25+
require(time >= 0) { "Timeout time $time cannot be negative" }
26+
if (time <= 0L) throw CancellationException("Timed out immediately")
27+
return suspendCoroutineOrReturn { cont: Continuation<T> ->
28+
setupTimeout(TimeoutCoroutine(time, cont), block)
29+
}
30+
}
31+
32+
private fun <U, T: U> setupTimeout(
33+
coroutine: TimeoutCoroutine<U, T>,
34+
block: suspend CoroutineScope.() -> T
35+
): Any? {
36+
// schedule cancellation of this coroutine on time
37+
val cont = coroutine.cont
38+
val context = cont.context
39+
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
40+
coroutine.initParentJob(context[Job])
41+
// restart block using new coroutine with new job,
42+
// however start it as undispatched coroutine, because we are already in the proper context
43+
val result = try {
44+
block.startCoroutineUninterceptedOrReturn(receiver = coroutine, completion = coroutine)
45+
} catch (e: Throwable) {
46+
JobSupport.CompletedExceptionally(e)
47+
}
48+
return when {
49+
result == COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
50+
coroutine.makeCompleting(result, MODE_IGNORE) -> {
51+
if (result is JobSupport.CompletedExceptionally) throw result.exception else result
52+
}
53+
else -> COROUTINE_SUSPENDED
54+
}
55+
}
56+
57+
private open class TimeoutCoroutine<U, in T: U>(
58+
val time: Int,
59+
val cont: Continuation<U>
60+
) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
61+
override val defaultResumeMode: Int get() = MODE_DIRECT
62+
63+
@Suppress("LeakingThis")
64+
override fun run() {
65+
cancel(TimeoutCancellationException(time, this))
66+
}
67+
68+
@Suppress("UNCHECKED_CAST")
69+
override fun afterCompletion(state: Any?, mode: Int) {
70+
if (state is CompletedExceptionally)
71+
cont.resumeWithExceptionMode(state.exception, mode)
72+
else
73+
cont.resumeMode(state as T, mode)
74+
}
75+
76+
override fun toString(): String =
77+
"TimeoutCoroutine($time)"
78+
}
79+
80+
/**
81+
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
82+
* `null` if this timeout was exceeded.
83+
*
84+
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
85+
* cancellable suspending function inside the block throws [TimeoutCancellationException].
86+
* Even if the code in the block suppresses [TimeoutCancellationException], this
87+
* invocation of `withTimeoutOrNull` still returns `null`.
88+
*
89+
* The sibling function that throws exception on timeout is [withTimeout].
90+
*
91+
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
92+
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
93+
*
94+
* @param time timeout time in milliseconds.
95+
*/
96+
public actual suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? {
97+
require(time >= 0) { "Timeout time $time cannot be negative" }
98+
if (time <= 0L) return null
99+
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
100+
setupTimeout(TimeoutOrNullCoroutine(time, cont), block)
101+
}
102+
}
103+
104+
private class TimeoutOrNullCoroutine<T>(
105+
time: Int,
106+
cont: Continuation<T?>
107+
) : TimeoutCoroutine<T?, T>(time, cont) {
108+
@Suppress("UNCHECKED_CAST")
109+
override fun afterCompletion(state: Any?, mode: Int) {
110+
if (state is CompletedExceptionally) {
111+
val exception = state.exception
112+
if (exception is TimeoutCancellationException && exception.coroutine === this)
113+
cont.resumeMode(null, mode) else
114+
cont.resumeWithExceptionMode(exception, mode)
115+
} else
116+
cont.resumeMode(state as T, mode)
117+
}
118+
119+
override fun toString(): String =
120+
"TimeoutOrNullCoroutine($time)"
121+
}
122+
123+
/**
124+
* This exception is thrown by [withTimeout] to indicate timeout.
125+
*/
126+
@Suppress("DEPRECATION")
127+
public actual class TimeoutCancellationException internal constructor(
128+
message: String,
129+
internal val coroutine: Job?
130+
) : CancellationException(message) {
131+
/**
132+
* Creates timeout exception with a given message.
133+
*/
134+
public actual constructor(message: String) : this(message, null)
135+
}
136+
137+
@Suppress("FunctionName")
138+
private fun TimeoutCancellationException(
139+
time: Int,
140+
coroutine: Job
141+
) : TimeoutCancellationException = TimeoutCancellationException("Timed out waiting for $time", coroutine)
142+
143+

0 commit comments

Comments
 (0)