Skip to content

Commit e4d3ce7

Browse files
author
Sergey Mashkov
committed
IO: fix byte channel readX operations return -1 during close+flush
Read operation should try flush before return -1 if closed because the close operation setting close status first then do flush so reader could see closed status but not-yet-flushed byte counters
1 parent 10be56e commit e4d3ce7

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,13 @@ internal class ByteBufferChannel(
328328
val consumed = readAsMuchAsPossible(dst, offset, length)
329329

330330
return when {
331-
consumed == 0 && closed != null -> -1
331+
consumed == 0 && closed != null -> {
332+
if (state.capacity.flush()) {
333+
return readAsMuchAsPossible(dst, offset, length)
334+
} else {
335+
-1
336+
}
337+
}
332338
consumed > 0 || length == 0 -> consumed
333339
else -> readAvailableSuspend(dst, offset, length)
334340
}
@@ -338,7 +344,13 @@ internal class ByteBufferChannel(
338344
val consumed = readAsMuchAsPossible(dst)
339345

340346
return when {
341-
consumed == 0 && closed != null -> -1
347+
consumed == 0 && closed != null -> {
348+
if (state.capacity.flush()) {
349+
return readAsMuchAsPossible(dst)
350+
} else {
351+
-1
352+
}
353+
}
342354
consumed > 0 || !dst.hasRemaining() -> consumed
343355
else -> readAvailableSuspend(dst)
344356
}
@@ -1268,11 +1280,12 @@ internal class ByteBufferChannel(
12681280
}
12691281

12701282
private tailrec suspend fun readSuspend(size: Int): Boolean {
1271-
if (state.capacity.availableForRead >= size) return true
1283+
val capacity = state.capacity
1284+
if (capacity.availableForRead >= size) return true
12721285

12731286
closed?.let { c ->
1274-
if (c.cause == null) return false
1275-
throw c.cause
1287+
if (c.cause != null) throw c.cause
1288+
return capacity.flush() && capacity.availableForRead >= size
12761289
}
12771290

12781291
if (!readSuspendImpl(size)) return false
@@ -1291,13 +1304,12 @@ internal class ByteBufferChannel(
12911304
}
12921305

12931306
closed?.let {
1294-
if (it.cause == null && state.capacity.availableForRead == 0) {
1295-
c.resume(false)
1296-
return@suspendCancellableCoroutine
1297-
} else if (it.cause != null) {
1307+
if (it.cause != null) {
12981308
c.resumeWithException(it.cause)
1299-
return@suspendCancellableCoroutine
1309+
} else {
1310+
c.resume(state.capacity.flush() && state.capacity.availableForRead >= size)
13001311
}
1312+
return@suspendCancellableCoroutine
13011313
}
13021314
} while (!setContinuation({ readOp }, ReadOp, c, { closed == null && state.capacity.availableForRead < size }))
13031315
}

0 commit comments

Comments
 (0)