Skip to content

Commit 9d61b3e

Browse files
committed
Select expression onTimeout clause
1 parent c02ee11 commit 9d61b3e

File tree

7 files changed

+186
-1
lines changed

7 files changed

+186
-1
lines changed

kotlinx-coroutines-core/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Top-level suspending functions:
4848
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
4949
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
5050
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.selects.SelectBuilder.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
51+
| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none
5152

5253
Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
5354
helper function. [NonCancellable] job object is provided to suppress cancellation with

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.selects.SelectBuilder
20+
import kotlinx.coroutines.experimental.selects.select
1921
import java.util.concurrent.Future
2022
import java.util.concurrent.TimeUnit
2123
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -74,6 +76,8 @@ public interface Delay {
7476
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
7577
* immediately resumes with [CancellationException].
7678
*
79+
* Note, that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
80+
*
7781
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
7882
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
7983
*/

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
20+
import kotlinx.coroutines.experimental.selects.SelectBuilder
21+
import kotlinx.coroutines.experimental.selects.select
2022
import java.util.concurrent.ScheduledExecutorService
2123
import java.util.concurrent.ScheduledThreadPoolExecutor
2224
import java.util.concurrent.TimeUnit
@@ -66,8 +68,13 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
6668
* Runs a given suspending block of code inside a coroutine with a specified timeout and throws
6769
* [CancellationException] if timeout was exceeded.
6870
*
71+
* Note, that timeout can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
72+
*
6973
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
7074
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
75+
*
76+
* @param time timeout time
77+
* @param unit timeout unit (milliseconds by default)
7178
*/
7279
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
7380
require(time >= 0) { "Timeout time $time cannot be negative" }

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
2222
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2323
import kotlinx.coroutines.experimental.channels.SendChannel
2424
import kotlinx.coroutines.experimental.internal.AtomicDesc
25-
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
25+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2626
import kotlinx.coroutines.experimental.sync.Mutex
27+
import java.util.concurrent.TimeUnit
2728
import kotlin.coroutines.experimental.Continuation
29+
import kotlin.coroutines.experimental.ContinuationInterceptor
2830
import kotlin.coroutines.experimental.CoroutineContext
2931
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
32+
import kotlin.coroutines.experimental.startCoroutine
3033

3134
/**
3235
* Scope for [select] invocation.
@@ -76,6 +79,14 @@ public interface SelectBuilder<in R> {
7679
* is already locked with the same token (same identity), this clause throws [IllegalStateException].
7780
*/
7881
public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
82+
83+
/**
84+
* Clause that selects the given [block] after a specified timeout passes.
85+
*
86+
* @param time timeout time
87+
* @param unit timeout unit (milliseconds by default)
88+
*/
89+
public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
7990
}
8091

8192
/**
@@ -145,6 +156,7 @@ public interface SelectInstance<in R> {
145156
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
146157
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
147158
* | [Mutex] | [lock][Mutex.lock] | [onLock][SelectBuilder.onLock] | [tryLock][Mutex.tryLock]
159+
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
148160
*
149161
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
150162
* function is suspended, this function immediately resumes with [CancellationException].
@@ -240,6 +252,25 @@ internal class SelectBuilderImpl<in R>(
240252
registerSelectLock(this@SelectBuilderImpl, owner, block)
241253
}
242254

255+
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
256+
require(time >= 0) { "Timeout time $time cannot be negative" }
257+
if (time == 0L) {
258+
if (trySelect(idempotent = null))
259+
block.startCoroutineUndispatched(completion)
260+
return
261+
}
262+
val action = Runnable {
263+
// todo: we could have replaced startCoroutine with startCoroutineUndispatched
264+
// But we need a way to know that Delay.invokeOnTimeout had used the right thread
265+
if (trySelect(idempotent = null))
266+
block.startCoroutine(completion)
267+
}
268+
val delay = context[ContinuationInterceptor] as? Delay
269+
if (delay != null)
270+
disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
271+
cancelFutureOnCompletion(scheduledExecutor.schedule(action, time, unit))
272+
}
273+
243274
override fun disposeOnSelect(handle: DisposableHandle) {
244275
invokeOnCompletion(DisposeOnCompletion(this, handle))
245276
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel
2222
import kotlinx.coroutines.experimental.channels.SendChannel
2323
import kotlinx.coroutines.experimental.sync.Mutex
2424
import java.util.*
25+
import java.util.concurrent.TimeUnit
2526
import kotlin.coroutines.experimental.Continuation
2627
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2728

@@ -91,4 +92,8 @@ internal class UnbiasedSelectBuilderImpl<in R>(cont: Continuation<R>) : SelectBu
9192
override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
9293
clauses += { registerSelectLock(instance, owner, block) }
9394
}
95+
96+
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
97+
clauses += { instance.onTimeout(time, unit, block) }
98+
}
9499
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.hamcrest.core.IsEqual
20+
import org.junit.After
21+
import org.junit.Assert
22+
import org.junit.Test
23+
import java.util.concurrent.ExecutorService
24+
import java.util.concurrent.Executors
25+
import java.util.concurrent.ThreadFactory
26+
import kotlin.coroutines.experimental.CoroutineContext
27+
28+
class WithTimeoutThreadDispatchTest : TestBase() {
29+
var executor: ExecutorService? = null
30+
31+
@After
32+
fun tearDown() {
33+
executor?.shutdown()
34+
}
35+
36+
@Test
37+
fun testCancellationDispatchScheduled() {
38+
checkCancellationDispatch {
39+
executor = Executors.newScheduledThreadPool(1, it)
40+
executor!!.asCoroutineDispatcher()
41+
}
42+
}
43+
44+
@Test
45+
fun testCancellationDispatchNonScheduled() {
46+
checkCancellationDispatch {
47+
executor = Executors.newSingleThreadExecutor(it)
48+
executor!!.asCoroutineDispatcher()
49+
}
50+
}
51+
52+
@Test
53+
fun testCancellationDispatchCustomNoDelay() {
54+
checkCancellationDispatch {
55+
executor = Executors.newSingleThreadExecutor(it)
56+
object : CoroutineDispatcher() {
57+
override fun dispatch(context: CoroutineContext, block: Runnable) {
58+
executor!!.execute(block)
59+
}
60+
}
61+
}
62+
}
63+
64+
private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
65+
expect(1)
66+
var thread: Thread? = null
67+
val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
68+
run(dispatcher) {
69+
expect(2)
70+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
71+
try {
72+
withTimeout(100) {
73+
try {
74+
expect(3)
75+
delay(1000)
76+
expectUnreached()
77+
} catch (e: CancellationException) {
78+
expect(4)
79+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
80+
}
81+
expect(5)
82+
}
83+
} catch (e: CancellationException) {
84+
expect(6)
85+
Assert.assertThat(Thread.currentThread(), IsEqual(thread))
86+
}
87+
expect(7)
88+
}
89+
finish(8)
90+
}
91+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.selects
18+
19+
import kotlinx.coroutines.experimental.TestBase
20+
import kotlinx.coroutines.experimental.runBlocking
21+
import org.hamcrest.MatcherAssert.assertThat
22+
import org.hamcrest.core.IsEqual
23+
import org.junit.Test
24+
25+
class SelectTimeoutTest : TestBase() {
26+
@Test
27+
fun testBasic() = runBlocking {
28+
expect(1)
29+
val result = select<String> {
30+
onTimeout(1000) {
31+
expectUnreached()
32+
"FAIL"
33+
}
34+
onTimeout(100) {
35+
expect(2)
36+
"OK"
37+
}
38+
onTimeout(500) {
39+
expectUnreached()
40+
"FAIL"
41+
}
42+
}
43+
assertThat(result, IsEqual("OK"))
44+
finish(3)
45+
}
46+
}

0 commit comments

Comments
 (0)