Skip to content

Commit 060f36a

Browse files
author
Sergey Mashkov
committed
IO: fix too expensive joinTo fast-path
1 parent 98c8739 commit 060f36a

File tree

2 files changed

+125
-7
lines changed

2 files changed

+125
-7
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelCopyBenchmark.kt

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ ChannelCopyBenchmark.runBlockingAndLaunch avgt 5 833,390 ± 14,96
3333
@State(Scope.Benchmark)
3434
@Fork(1)
3535
open class ChannelCopyBenchmark {
36+
private val HelloWorld = "Hello, World!".toByteArray()
3637
private val ABC = "ABC".repeat(100).toByteArray()
3738
private val buffer = ByteArray(4096)
3839
private val ioe = IOException()
@@ -76,6 +77,26 @@ open class ChannelCopyBenchmark {
7677
read
7778
}
7879

80+
@Benchmark
81+
fun cioChannelCopyHW() = runBlocking {
82+
val pIn = ByteChannel(true)
83+
val pOut = ByteChannel(true)
84+
85+
pOut.writeFully(HelloWorld)
86+
pOut.close()
87+
88+
pOut.copyAndClose(pIn)
89+
90+
var read = 0
91+
while (read < HelloWorld.size) {
92+
val rc = pIn.readAvailable(buffer)
93+
if (rc == -1) break
94+
read += rc
95+
}
96+
97+
read
98+
}
99+
79100
@Benchmark
80101
fun cioJoinToClosed() = runBlocking {
81102
val pIn = ByteChannel(true)
@@ -96,6 +117,53 @@ open class ChannelCopyBenchmark {
96117
read
97118
}
98119

120+
@Benchmark
121+
fun cioJoinToClosedHW() = runBlocking {
122+
val pIn = ByteChannel(true)
123+
val pOut = ByteChannel(true)
124+
125+
pOut.writeFully(HelloWorld)
126+
pOut.close()
127+
128+
pOut.joinTo(pIn, true)
129+
130+
var read = 0
131+
while (read < HelloWorld.size) {
132+
val rc = pIn.readAvailable(buffer)
133+
if (rc == -1) break
134+
read += rc
135+
}
136+
137+
read
138+
}
139+
140+
141+
@Benchmark
142+
fun cioCopyFromEmpty() = runCoroutineFast {
143+
val from = ByteChannel(true)
144+
val to = ByteChannel(true)
145+
146+
from.close()
147+
from.copyAndClose(to)
148+
}
149+
150+
@Benchmark
151+
fun cioJoinFromEmpty() = runCoroutineFast {
152+
val from = ByteChannel(true)
153+
val to = ByteChannel(true)
154+
155+
from.close()
156+
from.joinTo(to, true)
157+
}
158+
159+
@Benchmark
160+
fun cioJoinFromEmptyNonClosed() = runCoroutineFast(allowSuspend = true) {
161+
val from = ByteChannel(true)
162+
val to = ByteChannel(true)
163+
164+
from.joinTo(to, true) // should setup joining and suspend
165+
}
166+
99167
@Benchmark
100168
fun cioJoinToBeforeWrite() = runBlocking {
101169
val pIn = ByteChannel(true)
@@ -120,6 +188,30 @@ open class ChannelCopyBenchmark {
120188
read
121189
}
122190

191+
@Benchmark
192+
fun cioJoinToHWBeforeWrite() = runBlocking {
193+
val pIn = ByteChannel(true)
194+
val pOut = ByteChannel(true)
195+
196+
launch(coroutineContext) {
197+
pOut.joinTo(pIn, true)
198+
}
199+
200+
yield()
201+
202+
pOut.writeFully(HelloWorld)
203+
pOut.close()
204+
205+
var read = 0
206+
while (read < HelloWorld.size) {
207+
val rc = pIn.readAvailable(buffer)
208+
if (rc == -1) break
209+
read += rc
210+
}
211+
212+
read
213+
}
214+
123215
@Benchmark
124216
fun cioCopyToInLaunch() = runBlocking {
125217
val pIn = ByteChannel(true)
@@ -145,6 +237,31 @@ open class ChannelCopyBenchmark {
145237
read
146238
}
147239

240+
@Benchmark
241+
fun cioCopyToHWInLaunch() = runBlocking {
242+
val pIn = ByteChannel(true)
243+
val pOut = ByteChannel(true)
244+
245+
launch(coroutineContext) {
246+
pOut.copyTo(pIn)
247+
pIn.close()
248+
}
249+
250+
yield()
251+
252+
pOut.writeFully(HelloWorld)
253+
pOut.close()
254+
255+
var read = 0
256+
while (read < HelloWorld.size) {
257+
val rc = pIn.readAvailable(buffer)
258+
if (rc == -1) break
259+
read += rc
260+
}
261+
262+
read
263+
}
264+
148265
@Benchmark
149266
fun cioJustWrite() = runBlocking {
150267
val c = ByteChannel()
@@ -153,7 +270,7 @@ open class ChannelCopyBenchmark {
153270
}
154271

155272
@Benchmark
156-
fun cioJustWriteUnintercepted() = runForSureNoSuspend {
273+
fun cioJustWriteUnintercepted() = runCoroutineFast {
157274
val c = ByteChannel()
158275
c.writeFully(ABC)
159276
c.close(ioe)
@@ -168,7 +285,7 @@ open class ChannelCopyBenchmark {
168285
}
169286

170287
@Benchmark
171-
fun cioReadAndWriteUnintercepted() = runForSureNoSuspend {
288+
fun cioReadAndWriteUnintercepted() = runCoroutineFast {
172289
val c = ByteChannel(true)
173290
c.writeFully(ABC)
174291
c.readAvailable(buffer)
@@ -188,9 +305,10 @@ open class ChannelCopyBenchmark {
188305
yield()
189306
}
190307

191-
private fun runForSureNoSuspend(block: suspend () -> Unit) {
308+
private fun runCoroutineFast(allowSuspend: Boolean = false, block: suspend () -> Unit) {
192309
if (block.startCoroutineUninterceptedOrReturn(EmptyContinuation) === COROUTINE_SUSPENDED) {
193-
throw IllegalStateException("Unexpected suspend")
310+
if (!allowSuspend)
311+
throw IllegalStateException("Unexpected suspend")
194312
}
195313
}
196314

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ internal class ByteBufferChannel(
261261
if (!tryReleaseBuffer()) return false
262262
ensureClosedJoined(joined)
263263

264-
resumeReadOp(IllegalStateException("Joining is in progress"))
264+
resumeReadOp { IllegalStateException("Joining is in progress") }
265265
resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
266266

267267
return true
@@ -1818,8 +1818,8 @@ internal class ByteBufferChannel(
18181818
}
18191819
}
18201820

1821-
private fun resumeReadOp(result: Throwable) {
1822-
ReadOp.getAndSet(this, null)?.resumeWithException(result)
1821+
private inline fun resumeReadOp(exception: () -> Throwable) {
1822+
ReadOp.getAndSet(this, null)?.resumeWithException(exception())
18231823
}
18241824

18251825
private fun resumeWriteOp() {

0 commit comments

Comments
 (0)