Skip to content

Commit 4ea3cb3

Browse files
author
Sergey Mashkov
committed
IO: make buffer release guaranteed if channel closed with error
1 parent eab5c25 commit 4ea3cb3

File tree

2 files changed

+48
-22
lines changed

2 files changed

+48
-22
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ internal class ByteBufferChannel(
142142
}
143143
}
144144

145+
joining?.let { restoreStateAfterWrite(); tryCompleteJoining(it); return null }
145146
if (closed != null) {
146147
restoreStateAfterWrite()
147148
tryTerminate()
@@ -247,34 +248,44 @@ internal class ByteBufferChannel(
247248
}
248249

249250
private fun tryCompleteJoining(joined: JoiningState): Boolean {
250-
updateState { state ->
251-
when {
252-
state === ReadWriteBufferState.Terminated -> state
253-
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
254-
// we don't handle IdleNonEmpty as it should be switched to IdleEmpty in restoreStateAfterRead
255-
else -> return false
256-
}
257-
}
258-
251+
if (!tryReleaseBuffer()) return false
259252
ensureClosedJoined(joined)
260253

261-
ReadOp.getAndSet(this, null)?.resumeWithException(IllegalStateException("Joining is in progress"))
262-
WriteOp.getAndSet(this, null)?.resume(Unit)
254+
resumeReadOp(IllegalStateException("Joining is in progress"))
255+
resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
263256

264257
return true
265258
}
266259

267260
private fun tryTerminate(): Boolean {
268-
val closed = closed ?: return false
261+
if (closed == null) return false
269262

263+
if (!tryReleaseBuffer()) return false
264+
265+
joining?.let { ensureClosedJoined(it) }
266+
267+
resumeReadOp()
268+
resumeWriteOp()
269+
270+
return true
271+
}
272+
273+
private fun tryReleaseBuffer(): Boolean {
270274
var toRelease: ReadWriteBufferState.Initial? = null
271275

272276
updateState { state ->
277+
toRelease?.let { buffer ->
278+
toRelease = null
279+
buffer.capacity.resetForWrite()
280+
resumeWriteOp()
281+
}
282+
val closed = closed
283+
273284
when {
274285
state === ReadWriteBufferState.Terminated -> return true
275286
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
276-
closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
277-
// here we don't need to tryLockForRelease as we already have closed state
287+
closed != null && state is ReadWriteBufferState.IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null) -> {
288+
if (closed.cause != null) state.capacity.forceLockForRelease()
278289
toRelease = state.initial
279290
ReadWriteBufferState.Terminated
280291
}
@@ -288,13 +299,6 @@ internal class ByteBufferChannel(
288299
}
289300
}
290301

291-
joining?.let { ensureClosedJoined(it) }
292-
293-
WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
294-
ReadOp.getAndSet(this, null)?.apply {
295-
if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
296-
}
297-
298302
return true
299303
}
300304

@@ -1795,7 +1799,17 @@ internal class ByteBufferChannel(
17951799
suspend override fun <A : Appendable> readUTF8LineTo(out: A, limit: Int) = readUTF8LineToAscii(out, limit)
17961800

17971801
private fun resumeReadOp() {
1798-
ReadOp.getAndSet(this, null)?.resume(true)
1802+
ReadOp.getAndSet(this, null)?.apply {
1803+
val closedCause = closed?.cause
1804+
when {
1805+
closedCause != null -> resumeWithException(closedCause)
1806+
else -> resume(true)
1807+
}
1808+
}
1809+
}
1810+
1811+
private fun resumeReadOp(result: Throwable) {
1812+
ReadOp.getAndSet(this, null)?.resumeWithException(result)
17991813
}
18001814

18011815
private fun resumeWriteOp() {
@@ -1805,6 +1819,10 @@ internal class ByteBufferChannel(
18051819
}
18061820
}
18071821

1822+
private fun resumeWriteOp(cause: Throwable) {
1823+
WriteOp.getAndSet(this, null)?.resumeWithException(cause)
1824+
}
1825+
18081826
private fun resumeClosed(cause: Throwable?) {
18091827
ReadOp.getAndSet(this, null)?.let { c ->
18101828
if (cause != null)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ internal class RingBufferCapacity(private val totalCapacity: Int) {
105105
}
106106
}
107107

108+
/**
109+
* Make all writers to fail to write any more bytes
110+
* Use only during failure termination
111+
*/
112+
fun forceLockForRelease() {
113+
AvailableForWrite.getAndSet(this, 0)
114+
}
115+
108116
fun isEmpty(): Boolean = availableForWrite == totalCapacity
109117
fun isFull(): Boolean = availableForWrite == 0
110118

0 commit comments

Comments
 (0)