1
+ /*
2
+ * Copyright 2016-2017 JetBrains s.r.o.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+
17
+ package kotlinx.coroutines.experimental.selects
18
+
19
+ import kotlinx.coroutines.experimental.Deferred
20
+ import kotlinx.coroutines.experimental.Job
21
+ import kotlinx.coroutines.experimental.channels.ReceiveChannel
22
+ import kotlinx.coroutines.experimental.channels.SendChannel
23
+ import java.util.*
24
+ import kotlin.coroutines.experimental.Continuation
25
+ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
26
+
27
+ /* *
28
+ * Waits for the result of multiple suspending functions simultaneously like [select], but in an _unbiased_
29
+ * way when multiple clauses are selectable at the same time.
30
+ *
31
+ * This unbiased implementation of `select` expression randomly shuffles the clauses before checking
32
+ * if they are selectable, thus ensuring that there is no statistical bias to the selection of the first
33
+ * clauses.
34
+ *
35
+ * See [select] function description for all the other details.
36
+ */
37
+ public inline suspend fun <R > selectUnbiased (crossinline builder : SelectBuilder <R >.() -> Unit ): R =
38
+ suspendCoroutineOrReturn { cont ->
39
+ val scope = UnbiasedSelectBuilderImpl (cont)
40
+ try {
41
+ builder(scope)
42
+ } catch (e: Throwable ) {
43
+ scope.handleBuilderException(e)
44
+ }
45
+ scope.initSelectResult()
46
+ }
47
+
48
+
49
+ @PublishedApi
50
+ internal class UnbiasedSelectBuilderImpl <in R >(cont : Continuation <R >) : SelectBuilder<R> {
51
+ val instance = SelectBuilderImpl (cont)
52
+ val clauses = arrayListOf< () -> Unit > ()
53
+
54
+ @PublishedApi
55
+ internal fun handleBuilderException (e : Throwable ) = instance.handleBuilderException(e)
56
+
57
+ @PublishedApi
58
+ internal fun initSelectResult (): Any? {
59
+ if (! instance.isSelected) {
60
+ try {
61
+ Collections .shuffle(clauses)
62
+ clauses.forEach { it.invoke() }
63
+ } catch (e: Throwable ) {
64
+ instance.handleBuilderException(e)
65
+ }
66
+ }
67
+ return instance.initSelectResult()
68
+ }
69
+
70
+ override fun Job.onJoin (block : suspend () -> R ) {
71
+ clauses + = { registerSelectJoin(instance, block) }
72
+ }
73
+
74
+ override fun <T > Deferred<T>.onAwait (block : suspend (T ) -> R ) {
75
+ clauses + = { registerSelectAwait(instance, block) }
76
+ }
77
+
78
+ override fun <E > SendChannel<E>.onSend (element : E , block : suspend () -> R ) {
79
+ clauses + = { registerSelectSend(instance, element, block) }
80
+ }
81
+
82
+ override fun <E > ReceiveChannel<E>.onReceive (block : suspend (E ) -> R ) {
83
+ clauses + = { registerSelectReceive(instance, block) }
84
+ }
85
+
86
+ override fun <E > ReceiveChannel<E>.onReceiveOrNull (block : suspend (E ? ) -> R ) {
87
+ clauses + = { registerSelectReceiveOrNull(instance, block) }
88
+ }
89
+ }
0 commit comments