Skip to content

Commit 507f5d4

Browse files
committed
Implemented withTimeoutOrNull function
1 parent 9d61b3e commit 507f5d4

File tree

8 files changed

+241
-19
lines changed

8 files changed

+241
-19
lines changed

kotlinx-coroutines-core/README.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ Synchronization primitives for coroutines:
3131

3232
Top-level suspending functions:
3333

34-
| **Name** | **Description**
35-
| ------------- | ---------------
36-
| [delay] | Non-blocking sleep
37-
| [yield] | Yields thread in single-threaded dispatchers
38-
| [run] | Switches to a different context
39-
| [withTimeout] | Set execution time-limit (deadline)
34+
| **Name** | **Description**
35+
| ------------------- | ---------------
36+
| [delay] | Non-blocking sleep
37+
| [yield] | Yields thread in single-threaded dispatchers
38+
| [run] | Switches to a different context
39+
| [withTimeout] | Set execution time-limit with exception on timeout
40+
| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
4041

4142
[Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously:
4243

@@ -93,6 +94,7 @@ Select expression to perform multiple suspending operations simultaneously until
9394
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
9495
[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
9596
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
97+
[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
9698
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
9799
[Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html
98100
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
@@ -126,4 +128,5 @@ Select expression to perform multiple suspending operations simultaneously until
126128
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
127129
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
128130
[kotlinx.coroutines.experimental.selects.SelectBuilder.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-lock.html
131+
[kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-timeout.html
129132
<!--- END -->

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ internal open class CancellableContinuationImpl<in T>(
229229
}
230230

231231
override fun completeResume(token: Any) {
232-
completeUpdateState(token, state, defaultResumeMode())
232+
completeUpdateState(token, state, defaultResumeMode)
233233
}
234234

235235
override fun afterCompletion(state: Any?, mode: Int) {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,29 @@ public abstract class AbstractCoroutine<in T>(
6767

6868
protected open fun createContext() = parentContext + this
6969

70-
protected open fun defaultResumeMode(): Int = MODE_DISPATCHED
70+
protected open val defaultResumeMode: Int get() = MODE_DISPATCHED
7171

72-
final override fun resume(value: T) = resume(value, defaultResumeMode())
72+
protected open val ignoreRepeatedResume: Boolean get() = false
73+
74+
final override fun resume(value: T) = resume(value, defaultResumeMode)
7375

7476
protected fun resume(value: T, mode: Int) {
7577
while (true) { // lock-free loop on state
7678
val state = this.state // atomic read
7779
when (state) {
7880
is Incomplete -> if (updateState(state, value, mode)) return
7981
is Cancelled -> return // ignore resumes on cancelled continuation
80-
else -> throw IllegalStateException("Already resumed, but got value $value")
82+
else -> {
83+
if (ignoreRepeatedResume) {
84+
return
85+
} else
86+
throw IllegalStateException("Already resumed, but got value $value")
87+
}
8188
}
8289
}
8390
}
8491

85-
final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode())
92+
final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode)
8693

8794
protected fun resumeWithException(exception: Throwable, mode: Int) {
8895
while (true) { // lock-free loop on state
@@ -96,7 +103,13 @@ public abstract class AbstractCoroutine<in T>(
96103
if (exception != state.exception) handleCoroutineException(context, exception)
97104
return
98105
}
99-
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
106+
else -> {
107+
if (ignoreRepeatedResume) {
108+
handleCoroutineException(context, exception)
109+
return
110+
} else
111+
throw IllegalStateException("Already resumed, but got exception $exception", exception)
112+
}
100113
}
101114
}
102115
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,15 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
6565
}
6666

6767
/**
68-
* Runs a given suspending block of code inside a coroutine with a specified timeout and throws
68+
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
6969
* [CancellationException] if timeout was exceeded.
7070
*
71-
* Note, that timeout can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
71+
* The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
72+
* exception inside of it, too. However, even the code in the block suppresses the exception,
73+
* this `withTimeout` function invocation still throws [CancellationException].
74+
*
75+
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
76+
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
7277
*
7378
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
7479
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
@@ -81,7 +86,41 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
8186
if (time <= 0L) throw CancellationException("Timed out immediately")
8287
return suspendCoroutineOrReturn sc@ { delegate: Continuation<T> ->
8388
// schedule cancellation of this continuation on time
84-
val cont = TimeoutContinuation(time, unit, delegate)
89+
val cont = TimeoutExceptionContinuation(time, unit, delegate)
90+
val delay = cont.context[ContinuationInterceptor] as? Delay
91+
if (delay != null)
92+
cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
93+
cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
94+
// restart block using cancellable context of this continuation,
95+
// however start it as undispatched coroutine, because we are already in the proper context
96+
block.startCoroutineUndispatched(cont)
97+
cont.getResult()
98+
}
99+
}
100+
101+
/**
102+
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
103+
* `null` if timeout was exceeded.
104+
*
105+
* The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
106+
* exception inside of it. However, even the code in the block does not catch the cancellation exception,
107+
* this `withTimeoutOrNull` function invocation still returns `null` on timeout.
108+
*
109+
* The sibling function that throws exception on timeout is [withTimeout].
110+
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
111+
*
112+
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
113+
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
114+
*
115+
* @param time timeout time
116+
* @param unit timeout unit (milliseconds by default)
117+
*/
118+
public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? {
119+
require(time >= 0) { "Timeout time $time cannot be negative" }
120+
if (time <= 0L) return null
121+
return suspendCoroutineOrReturn sc@ { delegate: Continuation<T?> ->
122+
// schedule cancellation of this continuation on time
123+
val cont = TimeoutNullContinuation<T>(delegate)
85124
val delay = cont.context[ContinuationInterceptor] as? Delay
86125
if (delay != null)
87126
cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
@@ -93,11 +132,19 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
93132
}
94133
}
95134

96-
private class TimeoutContinuation<T>(
135+
private class TimeoutExceptionContinuation<in T>(
97136
private val time: Long,
98137
private val unit: TimeUnit,
99138
delegate: Continuation<T>
100139
) : CancellableContinuationImpl<T>(delegate, active = true), Runnable {
101-
override fun defaultResumeMode(): Int = MODE_DIRECT
140+
override val defaultResumeMode get() = MODE_DIRECT
102141
override fun run() { cancel(CancellationException("Timed out waiting for $time $unit")) }
103142
}
143+
144+
private class TimeoutNullContinuation<in T>(
145+
delegate: Continuation<T?>
146+
) : CancellableContinuationImpl<T?>(delegate, active = true), Runnable {
147+
override val defaultResumeMode get() = MODE_DIRECT
148+
override val ignoreRepeatedResume: Boolean get() = true
149+
override fun run() { resume(null, mode = 0) /* dispatch resume */ }
150+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ internal class SelectBuilderImpl<in R>(
216216
cancel(cause)
217217
}
218218

219-
override fun defaultResumeMode(): Int = MODE_DIRECT // all resumes through completion are dispatched directly
219+
override val defaultResumeMode get() = MODE_DIRECT // all resumes through completion are dispatched directly
220220

221221
override val completion: Continuation<R> get() {
222222
check(isSelected) { "Must be selected first" }
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.hamcrest.core.IsEqual
20+
import org.hamcrest.core.IsNull
21+
import org.junit.Assert.assertThat
22+
import org.junit.Test
23+
24+
class WithTimeoutOrNullTest : TestBase() {
25+
/**
26+
* Tests property dispatching of `withTimeoutOrNull` blocks
27+
*/
28+
@Test
29+
fun testDispatch() = runBlocking {
30+
expect(1)
31+
launch(context) {
32+
expect(4)
33+
yield() // back to main
34+
expect(7)
35+
}
36+
expect(2)
37+
// test that it does not yield to the above job when started
38+
val result = withTimeoutOrNull(1000) {
39+
expect(3)
40+
yield() // yield only now
41+
expect(5)
42+
"OK"
43+
}
44+
assertThat(result, IsEqual("OK"))
45+
expect(6)
46+
yield() // back to launch
47+
finish(8)
48+
}
49+
50+
/**
51+
* Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
52+
*/
53+
@Test
54+
fun testYieldBlockingWithTimeout() = runBlocking {
55+
expect(1)
56+
val result = withTimeoutOrNull(100) {
57+
while (true) {
58+
yield()
59+
}
60+
}
61+
assertThat(result, IsNull())
62+
finish(2)
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.hamcrest.core.IsEqual
20+
import org.hamcrest.core.IsNull
21+
import org.junit.After
22+
import org.junit.Assert
23+
import org.junit.Assert.assertThat
24+
import org.junit.Test
25+
import java.util.concurrent.ExecutorService
26+
import java.util.concurrent.Executors
27+
import java.util.concurrent.ThreadFactory
28+
import kotlin.coroutines.experimental.CoroutineContext
29+
30+
class WithTimeoutOrNullThreadDispatchTest : TestBase() {
31+
var executor: ExecutorService? = null
32+
33+
@After
34+
fun tearDown() {
35+
executor?.shutdown()
36+
}
37+
38+
@Test
39+
fun testCancellationDispatchScheduled() {
40+
checkCancellationDispatch {
41+
executor = Executors.newScheduledThreadPool(1, it)
42+
executor!!.asCoroutineDispatcher()
43+
}
44+
}
45+
46+
@Test
47+
fun testCancellationDispatchNonScheduled() {
48+
checkCancellationDispatch {
49+
executor = Executors.newSingleThreadExecutor(it)
50+
executor!!.asCoroutineDispatcher()
51+
}
52+
}
53+
54+
@Test
55+
fun testCancellationDispatchCustomNoDelay() {
56+
checkCancellationDispatch {
57+
executor = Executors.newSingleThreadExecutor(it)
58+
object : CoroutineDispatcher() {
59+
override fun dispatch(context: CoroutineContext, block: Runnable) {
60+
executor!!.execute(block)
61+
}
62+
}
63+
}
64+
}
65+
66+
private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
67+
expect(1)
68+
var thread: Thread? = null
69+
val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
70+
run(dispatcher) {
71+
expect(2)
72+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
73+
val result =
74+
withTimeoutOrNull(100) {
75+
try {
76+
expect(3)
77+
delay(1000)
78+
expectUnreached()
79+
} catch (e: CancellationException) {
80+
expect(4)
81+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
82+
}
83+
expect(5)
84+
"FAIL"
85+
}
86+
assertThat(result, IsNull())
87+
expect(6)
88+
}
89+
finish(7)
90+
}
91+
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import org.hamcrest.core.IsEqual
20+
import org.junit.Assert.assertThat
1921
import org.junit.Test
2022

2123
class WithTimeoutTest : TestBase() {
@@ -32,11 +34,13 @@ class WithTimeoutTest : TestBase() {
3234
}
3335
expect(2)
3436
// test that it does not yield to the above job when started
35-
withTimeout(1000) {
37+
val result = withTimeout(1000) {
3638
expect(3)
3739
yield() // yield only now
3840
expect(5)
41+
"OK"
3942
}
43+
assertThat(result, IsEqual("OK"))
4044
expect(6)
4145
yield() // back to launch
4246
finish(8)

0 commit comments

Comments
 (0)