Skip to content

Commit 6d54802

Browse files
committed
JS: Use window.setMessage instead of setTimeout for coroutines inside the browser
Fixes #194
1 parent 9483301 commit 6d54802

File tree

3 files changed

+232
-28
lines changed

3 files changed

+232
-28
lines changed

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,6 @@ public actual object Unconfined : CoroutineDispatcher() {
4242
@Suppress("PropertyName")
4343
public actual val DefaultDispatcher: CoroutineDispatcher = JSDispatcher
4444

45-
internal object JSDispatcher : CoroutineDispatcher(), Delay {
46-
override fun dispatch(context: CoroutineContext, block: Runnable) {
47-
setTimeout({ block.run() }, 0)
48-
}
49-
50-
override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) {
51-
setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.coerceAtLeast(0))
52-
}
53-
54-
override fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle {
55-
val handle = setTimeout({ block.run() }, time.coerceAtLeast(0))
56-
return object : DisposableHandle {
57-
override fun dispose() {
58-
clearTimeout(handle)
59-
}
60-
}
61-
}
62-
63-
private fun Double.timeToInt(): Int = coerceIn(0.0..Int.MAX_VALUE.toDouble()).toInt()
64-
}
65-
6645
/**
6746
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor
6847
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
@@ -73,13 +52,6 @@ public fun newCoroutineContext(context: CoroutineContext, parent: Job? = null):
7352
wp + DefaultDispatcher else wp
7453
}
7554

76-
7755
// No debugging facilities on JS
7856
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
7957
internal actual fun Continuation<*>.toDebugString(): String = toString()
80-
81-
// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
82-
// using them via "window" (which only works in browser)
83-
84-
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
85-
private external fun clearTimeout(handle: Int = definedExternally)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.browser.*
20+
import kotlin.coroutines.experimental.*
21+
22+
internal object JSDispatcher : CoroutineDispatcher(), Delay {
23+
// Check if we are in the browser and must use postMessage to avoid setTimeout throttling
24+
private val messageQueue =
25+
if (jsTypeOf(window) != "undefined") MessageQueue().apply { register() } else null
26+
27+
override fun dispatch(context: CoroutineContext, block: Runnable) {
28+
if (messageQueue != null) {
29+
messageQueue.enqueue(block)
30+
} else {
31+
setTimeout({ block.run() }, 0)
32+
}
33+
}
34+
35+
override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) {
36+
setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.coerceAtLeast(0))
37+
}
38+
39+
override fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle {
40+
val handle = setTimeout({ block.run() }, time.coerceAtLeast(0))
41+
return object : DisposableHandle {
42+
override fun dispose() {
43+
clearTimeout(handle)
44+
}
45+
}
46+
}
47+
}
48+
49+
// it is open for tests
50+
internal open class MessageQueue {
51+
val yieldEvery = 16 // yield to JS event loop after this many processed messages
52+
53+
private val messageName = "JSDispatcher.dispatch"
54+
private var scheduled = false
55+
56+
private var queue = arrayOfNulls<Runnable>(8)
57+
private var head = 0
58+
private var tail = 0
59+
60+
fun register() {
61+
window.addEventListener("message", { event: dynamic ->
62+
if (event.source == window && event.data == messageName) {
63+
event.stopPropagation()
64+
process()
65+
}
66+
}, true)
67+
}
68+
69+
// it is open for tests
70+
open fun schedule() {
71+
window.postMessage(messageName, "*")
72+
}
73+
74+
val isEmpty get() = head == tail
75+
76+
fun poll(): Runnable? {
77+
if (isEmpty) return null
78+
val result = queue[head]!!
79+
queue[head] = null
80+
head = head.next()
81+
return result
82+
}
83+
84+
tailrec fun enqueue(block: Runnable) {
85+
val newTail = tail.next()
86+
if (newTail == head) {
87+
resize()
88+
enqueue(block) // retry with larger size
89+
return
90+
}
91+
queue[tail] = block
92+
tail = newTail
93+
if (!scheduled) {
94+
scheduled = true
95+
schedule()
96+
}
97+
}
98+
99+
fun resize() {
100+
var i = head
101+
var j = 0
102+
val a = arrayOfNulls<Runnable>(queue.size * 2)
103+
while (i != tail) {
104+
a[j++] = queue[i]
105+
i = i.next()
106+
}
107+
queue = a
108+
head = 0
109+
tail = j
110+
}
111+
112+
private fun Int.next(): Int {
113+
val j = this + 1
114+
return if (j == queue.size) 0 else j
115+
}
116+
117+
fun process() {
118+
try {
119+
// limit number of processed messages
120+
repeat(yieldEvery) {
121+
val block = poll() ?: return@process
122+
block.run()
123+
}
124+
} finally {
125+
if (isEmpty) {
126+
scheduled = false
127+
} else {
128+
schedule()
129+
}
130+
}
131+
}
132+
}
133+
134+
// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
135+
// using them via "window" (which only works in browser)
136+
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
137+
private external fun clearTimeout(handle: Int = definedExternally)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.test.*
20+
21+
class MessageQueueTest {
22+
private var scheduled = false
23+
private val processed = mutableListOf<Int>()
24+
25+
private val queue = object : MessageQueue() {
26+
override fun schedule() {
27+
assertFalse(scheduled)
28+
scheduled = true
29+
}
30+
}
31+
32+
inner class Box(val i: Int): Runnable {
33+
override fun run() {
34+
processed += i
35+
}
36+
}
37+
38+
inner class ReBox(val i: Int): Runnable {
39+
override fun run() {
40+
processed += i
41+
queue.enqueue(Box(10 + i))
42+
}
43+
}
44+
45+
@Test
46+
fun testBasic() {
47+
assertTrue(queue.isEmpty)
48+
queue.enqueue(Box(1))
49+
assertFalse(queue.isEmpty)
50+
assertTrue(scheduled)
51+
queue.enqueue(Box(2))
52+
assertFalse(queue.isEmpty)
53+
scheduled = false
54+
queue.process()
55+
assertEquals(listOf(1, 2), processed)
56+
assertFalse(scheduled)
57+
assertTrue(queue.isEmpty)
58+
}
59+
60+
@Test fun testRescheduleFromProcess() {
61+
assertTrue(queue.isEmpty)
62+
queue.enqueue(ReBox(1))
63+
assertFalse(queue.isEmpty)
64+
assertTrue(scheduled)
65+
queue.enqueue(ReBox(2))
66+
assertFalse(queue.isEmpty)
67+
scheduled = false
68+
queue.process()
69+
assertEquals(listOf(1, 2, 11, 12), processed)
70+
assertFalse(scheduled)
71+
assertTrue(queue.isEmpty)
72+
}
73+
74+
@Test
75+
fun testResizeAndWrap() {
76+
repeat(10) { phase ->
77+
val n = 10 * (phase + 1)
78+
assertTrue(queue.isEmpty)
79+
repeat(n) {
80+
queue.enqueue(Box(it))
81+
assertFalse(queue.isEmpty)
82+
assertTrue(scheduled)
83+
}
84+
var countYields = 0
85+
while (scheduled) {
86+
scheduled = false
87+
queue.process()
88+
countYields++
89+
}
90+
assertEquals(List(n) { it }, processed)
91+
assertEquals((n + queue.yieldEvery - 1) / queue.yieldEvery, countYields)
92+
processed.clear()
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)