Skip to content

Commit 4ee9cfb

Browse files
author
Sergey Mashkov
committed
IO: add inline rendezvous swap implementation
1 parent 39a8854 commit 4ee9cfb

File tree

2 files changed

+239
-0
lines changed

2 files changed

+239
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package kotlinx.coroutines.experimental.io.internal
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.experimental.*
5+
import java.util.concurrent.CancellationException
6+
import kotlin.coroutines.experimental.*
7+
import kotlin.coroutines.experimental.intrinsics.*
8+
9+
/**
10+
* This is completely similar to [kotlinx.coroutines.experimental.channels.RendezvousChannel] except:
11+
* - non-cancellable
12+
* - all functions are inline and always tail-call suspend
13+
* - no poll/offer
14+
*/
15+
internal class InlineRendezvousSwap<T : Any> {
16+
private val senderCont = DelegatedContinuation<Unit>()
17+
private val receiverCont = DelegatedContinuation<T>()
18+
19+
@Suppress("NOTHING_TO_INLINE")
20+
suspend inline fun send(e: T) = suspendCoroutineOrReturn<Unit> { c ->
21+
val result = try {
22+
senderCont.swap(c)
23+
} catch (t: Throwable) {
24+
receiverCont.resumeWithException(t)
25+
throw t
26+
}
27+
28+
receiverCont.resume(e)
29+
result
30+
}
31+
32+
@Suppress("NOTHING_TO_INLINE")
33+
suspend inline fun receive(): T = suspendCoroutineOrReturn { c ->
34+
val result = try {
35+
receiverCont.swap(c)
36+
} catch (t: Throwable) {
37+
senderCont.resumeWithException(t)
38+
throw t
39+
}
40+
41+
senderCont.resume(Unit)
42+
result
43+
}
44+
45+
internal class DelegatedContinuation<T : Any> : Continuation<T> {
46+
private val delegate = atomic<Continuation<T>?>(null)
47+
private var result: Any? = null
48+
private var e: Throwable? = null
49+
50+
private fun reset() {
51+
delegate.getAndSet(null)?.takeUnless { it === this }?.resumeWithException(Cancellation)
52+
result = null
53+
e = null
54+
}
55+
56+
fun swap(actual: Continuation<T>): Any? {
57+
require(actual !== this)
58+
59+
delegate.update { c ->
60+
when {
61+
c === this -> return result().also { reset() }
62+
else -> actual
63+
}
64+
}
65+
66+
return COROUTINE_SUSPENDED
67+
}
68+
69+
private fun result(): Any? {
70+
e?.let { reset(); throw it }
71+
return result
72+
}
73+
74+
override val context: CoroutineContext
75+
get() = delegate.value?.takeUnless { it === this }?.context ?: EmptyCoroutineContext
76+
77+
override fun resumeWithException(exception: Throwable) {
78+
e = exception
79+
val c = delegate.getAndUpdate { c ->
80+
when {
81+
c === this -> return
82+
c == null -> this
83+
else -> null
84+
}
85+
}
86+
87+
if (c != null) {
88+
reset()
89+
c.resumeWithException(exception)
90+
}
91+
}
92+
93+
override fun resume(value: T) {
94+
result = value
95+
val c = delegate.getAndUpdate { c ->
96+
when {
97+
c === this -> return
98+
c == null -> this
99+
else -> null
100+
}
101+
}
102+
103+
if (c != null) {
104+
reset()
105+
c.resume(value)
106+
}
107+
}
108+
}
109+
110+
companion object {
111+
private val Cancellation = CancellationException()
112+
}
113+
}
114+
115+
fun main(args: Array<String>) {
116+
val ch = InlineRendezvousSwap<String>()
117+
runBlocking {
118+
launch(coroutineContext) {
119+
repeat(2) {
120+
val e = ch.receive()
121+
println(e)
122+
}
123+
}
124+
launch(coroutineContext) {
125+
ch.send("1")
126+
ch.send("2")
127+
}
128+
}
129+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package kotlinx.coroutines.experimental.io
2+
3+
import kotlinx.coroutines.experimental.*
4+
import kotlinx.coroutines.experimental.channels.*
5+
import kotlinx.coroutines.experimental.io.internal.*
6+
import org.junit.Test
7+
import kotlin.test.*
8+
9+
class InlineRendezvousSwapTest : TestBase() {
10+
@Test
11+
fun smokeTest1() = runTest {
12+
val swap = InlineRendezvousSwap<String>()
13+
14+
launch(coroutineContext) {
15+
assertEquals("1", swap.receive())
16+
}
17+
18+
launch(coroutineContext) {
19+
swap.send("1")
20+
}
21+
}
22+
23+
@Test
24+
fun smokeTest2() = runTest {
25+
val swap = InlineRendezvousSwap<String>()
26+
27+
launch(coroutineContext) {
28+
swap.send("1")
29+
}
30+
31+
launch(coroutineContext) {
32+
assertEquals("1", swap.receive())
33+
}
34+
}
35+
36+
@Test
37+
fun testLoop1() = runTest {
38+
val swap = InlineRendezvousSwap<String>()
39+
val received = Channel<String>(1)
40+
41+
launch(coroutineContext) {
42+
while (true) {
43+
val s = swap.receive()
44+
if (s.isEmpty()) break
45+
received.send(s)
46+
}
47+
received.close()
48+
}
49+
50+
launch(coroutineContext) {
51+
for (i in 1..10) {
52+
swap.send(i.toString())
53+
}
54+
swap.send("")
55+
}
56+
57+
assertEquals((1..10).map { it.toString() }, received.toList())
58+
}
59+
60+
@Test
61+
fun testLoop2() = runTest {
62+
val swap = InlineRendezvousSwap<String>()
63+
val received = Channel<String>(1)
64+
65+
launch(coroutineContext) {
66+
while (true) {
67+
val s = swap.receive()
68+
if (s.isEmpty()) break
69+
received.send(s)
70+
}
71+
received.close()
72+
}
73+
74+
launch(coroutineContext) {
75+
for (i in 1..10) {
76+
yield()
77+
swap.send(i.toString())
78+
}
79+
swap.send("")
80+
}
81+
82+
assertEquals((1..10).map { it.toString() }, received.toList())
83+
}
84+
85+
@Test
86+
fun testLoop3() = runTest {
87+
val swap = InlineRendezvousSwap<String>()
88+
val received = Channel<String>(1)
89+
90+
launch(coroutineContext) {
91+
while (true) {
92+
yield()
93+
val s = swap.receive()
94+
if (s.isEmpty()) break
95+
received.send(s)
96+
}
97+
received.close()
98+
}
99+
100+
launch(coroutineContext) {
101+
for (i in 1..10) {
102+
swap.send(i.toString())
103+
}
104+
swap.send("")
105+
}
106+
107+
assertEquals((1..10).map { it.toString() }, received.toList())
108+
}
109+
110+
}

0 commit comments

Comments
 (0)