Skip to content

Commit 98c8739

Browse files
author
Sergey Mashkov
committed
IO: make buffer release guaranteed for cases
Make byte channel release if it has been closed with error or if write preparation loop has allocated a buffer but failed to start writing (for example, joining started so we have to cancel wriging preparation). It is important to release allocated/borrowed buffer during write operation preparation and if CAS loop was not successful and no write state was set due to some reason (for example, joining).
1 parent 7549dad commit 98c8739

File tree

3 files changed

+103
-31
lines changed

3 files changed

+103
-31
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,28 @@ internal class ByteBufferChannel(
131131
var _allocated: ReadWriteBufferState.Initial? = null
132132
val (old, newState) = updateState { state ->
133133
when {
134-
joining != null -> return null
135-
closed != null -> throw closed!!.sendException
134+
joining != null -> {
135+
_allocated?.let { releaseBuffer(it) }
136+
return null
137+
}
138+
closed != null -> {
139+
_allocated?.let { releaseBuffer(it) }
140+
throw closed!!.sendException
141+
}
136142
state === ReadWriteBufferState.IdleEmpty -> {
137143
val allocated = _allocated ?: newBuffer().also { _allocated = it }
138144
allocated.startWriting()
139145
}
140-
state === ReadWriteBufferState.Terminated -> throw closed!!.sendException
146+
state === ReadWriteBufferState.Terminated -> {
147+
_allocated?.let { releaseBuffer(it) }
148+
if (joining != null) return null
149+
throw closed!!.sendException
150+
}
141151
else -> state.startWriting()
142152
}
143153
}
144154

155+
// joining?.let { restoreStateAfterWrite(); return null }
145156
if (closed != null) {
146157
restoreStateAfterWrite()
147158
tryTerminate()
@@ -247,34 +258,44 @@ internal class ByteBufferChannel(
247258
}
248259

249260
private fun tryCompleteJoining(joined: JoiningState): Boolean {
250-
updateState { state ->
251-
when {
252-
state === ReadWriteBufferState.Terminated -> state
253-
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
254-
// we don't handle IdleNonEmpty as it should be switched to IdleEmpty in restoreStateAfterRead
255-
else -> return false
256-
}
257-
}
258-
261+
if (!tryReleaseBuffer()) return false
259262
ensureClosedJoined(joined)
260263

261-
ReadOp.getAndSet(this, null)?.resumeWithException(IllegalStateException("Joining is in progress"))
262-
WriteOp.getAndSet(this, null)?.resume(Unit)
264+
resumeReadOp(IllegalStateException("Joining is in progress"))
265+
resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
263266

264267
return true
265268
}
266269

267270
private fun tryTerminate(): Boolean {
268-
val closed = closed ?: return false
271+
if (closed == null) return false
272+
273+
if (!tryReleaseBuffer()) return false
274+
275+
joining?.let { ensureClosedJoined(it) }
276+
277+
resumeReadOp()
278+
resumeWriteOp()
279+
280+
return true
281+
}
269282

283+
private fun tryReleaseBuffer(): Boolean {
270284
var toRelease: ReadWriteBufferState.Initial? = null
271285

272286
updateState { state ->
287+
toRelease?.let { buffer ->
288+
toRelease = null
289+
buffer.capacity.resetForWrite()
290+
resumeWriteOp()
291+
}
292+
val closed = closed
293+
273294
when {
274295
state === ReadWriteBufferState.Terminated -> return true
275296
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
276-
closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
277-
// here we don't need to tryLockForRelease as we already have closed state
297+
closed != null && state is ReadWriteBufferState.IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null) -> {
298+
if (closed.cause != null) state.capacity.forceLockForRelease()
278299
toRelease = state.initial
279300
ReadWriteBufferState.Terminated
280301
}
@@ -288,13 +309,6 @@ internal class ByteBufferChannel(
288309
}
289310
}
290311

291-
joining?.let { ensureClosedJoined(it) }
292-
293-
WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
294-
ReadOp.getAndSet(this, null)?.apply {
295-
if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
296-
}
297-
298312
return true
299313
}
300314

@@ -1795,7 +1809,17 @@ internal class ByteBufferChannel(
17951809
suspend override fun <A : Appendable> readUTF8LineTo(out: A, limit: Int) = readUTF8LineToAscii(out, limit)
17961810

17971811
private fun resumeReadOp() {
1798-
ReadOp.getAndSet(this, null)?.resume(true)
1812+
ReadOp.getAndSet(this, null)?.apply {
1813+
val closedCause = closed?.cause
1814+
when {
1815+
closedCause != null -> resumeWithException(closedCause)
1816+
else -> resume(true)
1817+
}
1818+
}
1819+
}
1820+
1821+
private fun resumeReadOp(result: Throwable) {
1822+
ReadOp.getAndSet(this, null)?.resumeWithException(result)
17991823
}
18001824

18011825
private fun resumeWriteOp() {
@@ -1805,6 +1829,10 @@ internal class ByteBufferChannel(
18051829
}
18061830
}
18071831

1832+
private fun resumeWriteOp(cause: Throwable) {
1833+
WriteOp.getAndSet(this, null)?.resumeWithException(cause)
1834+
}
1835+
18081836
private fun resumeClosed(cause: Throwable?) {
18091837
ReadOp.getAndSet(this, null)?.let { c ->
18101838
if (cause != null)

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ internal class RingBufferCapacity(private val totalCapacity: Int) {
105105
}
106106
}
107107

108+
/**
109+
* Make all writers to fail to write any more bytes
110+
* Use only during failure termination
111+
*/
112+
fun forceLockForRelease() {
113+
AvailableForWrite.getAndSet(this, 0)
114+
}
115+
108116
fun isEmpty(): Boolean = availableForWrite == totalCapacity
109117
fun isFull(): Boolean = availableForWrite == 0
110118

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -897,20 +897,56 @@ class ByteBufferChannelTest {
897897
}
898898

899899
@Test
900-
@Ignore
900+
fun writeThenReadStress() = runBlocking<Unit> {
901+
for (i in 1..500_000) {
902+
val a = ByteBufferChannel(false, pool)
903+
904+
val w = launch {
905+
a.writeLong(1)
906+
a.close()
907+
}
908+
val r = launch {
909+
a.readLong()
910+
}
911+
912+
w.join()
913+
r.join()
914+
}
915+
916+
ch.close()
917+
}
918+
919+
@Test
920+
fun joinToEmptyStress() = runBlocking<Unit> {
921+
for (i in 1..500_000) {
922+
val a = ByteBufferChannel(false, pool)
923+
924+
launch(coroutineContext) {
925+
a.joinTo(ch, true)
926+
}
927+
928+
yield()
929+
930+
a.close()
931+
}
932+
}
933+
934+
@Test
901935
fun testJoinToStress() = runBlocking<Unit> {
902-
for (i in 1..10) {
903-
println("Step $i")
936+
for (i in 1..100000) {
904937
val child = ByteBufferChannel(false, pool)
905938
val writer = launch {
906-
child.writeLong(999)
939+
child.writeLong(999 + i.toLong())
907940
child.close()
908941
}
909942

910943
child.joinTo(ch, false)
911-
assertEquals(999, ch.readLong())
912-
assertTrue { writer.isCompleted }
944+
assertEquals(999 + i.toLong(), ch.readLong())
945+
writer.join()
913946
}
947+
948+
assertEquals(0, ch.availableForRead)
949+
ch.close()
914950
}
915951

916952
@Test

0 commit comments

Comments
 (0)