Skip to content

Commit 4df720f

Browse files
author
Sergey Mashkov
committed
IO: introduce simple ByteReadChannel.discard implementation
1 parent 1e40237 commit 4df720f

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,6 +1477,43 @@ internal class ByteBufferChannel(
14771477
}
14781478
}
14791479

1480+
override suspend fun discard(max: Long): Long {
1481+
require(max >= 0) { "max shouldn't be negative: $max" }
1482+
1483+
var discarded = 0L
1484+
1485+
reading {
1486+
val n = it.tryReadAtMost(minOf(Int.MAX_VALUE.toLong(), max).toInt())
1487+
bytesRead(it, n)
1488+
discarded += n
1489+
true
1490+
}
1491+
1492+
if (discarded == max || isClosedForRead) return discarded
1493+
1494+
return discardSuspend(discarded, max)
1495+
}
1496+
1497+
private suspend fun discardSuspend(discarded0: Long, max: Long): Long {
1498+
var discarded = discarded0
1499+
1500+
while (discarded < max) {
1501+
val rc = reading {
1502+
val n = it.tryReadAtMost(minOf(Int.MAX_VALUE.toLong(), max - discarded).toInt())
1503+
bytesRead(it, n)
1504+
discarded += n
1505+
1506+
true
1507+
}
1508+
1509+
if (!rc) {
1510+
if (isClosedForRead || !readSuspend(1)) break
1511+
}
1512+
}
1513+
1514+
return discarded
1515+
}
1516+
14801517
private suspend fun readBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
14811518
if (!readSuspend(min.coerceAtLeast(1))) {
14821519
if (min > 0)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ public interface ByteReadChannel {
156156
* @see ByteWriteChannel.close
157157
*/
158158
fun cancel(cause: Throwable? = null): Boolean
159+
160+
/**
161+
* Discard up to [max] bytes
162+
*
163+
* @return number of bytes were discarded
164+
*/
165+
suspend fun discard(max: Long = Long.MAX_VALUE): Long
159166
}
160167

161168
suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean) {

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,4 +469,88 @@ class ByteBufferChannelScenarioTest : TestBase() {
469469
finish(6)
470470
}
471471

472+
@Test
473+
fun testDiscardExisting() = runBlocking {
474+
launch(coroutineContext) {
475+
expect(1)
476+
ch.writeInt(1)
477+
ch.writeInt(2)
478+
expect(2)
479+
}
480+
481+
yield()
482+
expect(3)
483+
484+
assertEquals(4, ch.discard(4))
485+
assertEquals(2, ch.readInt())
486+
487+
finish(4)
488+
}
489+
490+
@Test
491+
fun testDiscardPartiallyExisting() = runBlocking {
492+
ch.writeInt(1)
493+
494+
launch(coroutineContext) {
495+
expect(1)
496+
assertEquals(8, ch.discard(8))
497+
expect(3)
498+
}
499+
500+
yield()
501+
expect(2)
502+
503+
ch.writeInt(2)
504+
yield()
505+
506+
expect(4)
507+
assertEquals(0, ch.availableForRead)
508+
finish(5)
509+
}
510+
511+
@Test
512+
fun testDiscardPartiallyExisting2() = runBlocking {
513+
launch(coroutineContext) {
514+
expect(1)
515+
assertEquals(8, ch.discard(8))
516+
expect(4)
517+
}
518+
519+
yield()
520+
521+
expect(2)
522+
ch.writeInt(1)
523+
yield()
524+
expect(3)
525+
assertEquals(0, ch.availableForRead)
526+
527+
ch.writeInt(2)
528+
yield()
529+
expect(5)
530+
assertEquals(0, ch.availableForRead)
531+
finish(6)
532+
}
533+
534+
@Test
535+
fun testDiscardClose() = runBlocking {
536+
launch(coroutineContext) {
537+
expect(1)
538+
assertEquals(8, ch.discard())
539+
expect(4)
540+
}
541+
542+
yield()
543+
544+
expect(2)
545+
ch.writeInt(1)
546+
yield()
547+
ch.writeInt(2)
548+
yield()
549+
550+
expect(3)
551+
ch.close()
552+
yield()
553+
554+
finish(5)
555+
}
472556
}

0 commit comments

Comments
 (0)