@@ -142,7 +142,6 @@ internal class ByteBufferChannel(
142
142
}
143
143
}
144
144
145
- joining?.let { restoreStateAfterWrite(); tryCompleteJoining(it); return null }
146
145
if (closed != null ) {
147
146
restoreStateAfterWrite()
148
147
tryTerminate()
@@ -248,44 +247,34 @@ internal class ByteBufferChannel(
248
247
}
249
248
250
249
private fun tryCompleteJoining (joined : JoiningState ): Boolean {
251
- if (! tryReleaseBuffer()) return false
250
+ updateState { state ->
251
+ when {
252
+ state == = ReadWriteBufferState .Terminated -> state
253
+ state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
254
+ // we don't handle IdleNonEmpty as it should be switched to IdleEmpty in restoreStateAfterRead
255
+ else -> return false
256
+ }
257
+ }
258
+
252
259
ensureClosedJoined(joined)
253
260
254
- resumeReadOp (IllegalStateException (" Joining is in progress" ))
255
- resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
261
+ ReadOp .getAndSet( this , null )?.resumeWithException (IllegalStateException (" Joining is in progress" ))
262
+ WriteOp .getAndSet( this , null )?. resume( Unit )
256
263
257
264
return true
258
265
}
259
266
260
267
private fun tryTerminate (): Boolean {
261
- if ( closed == null ) return false
268
+ val closed = closed ? : return false
262
269
263
- if (! tryReleaseBuffer()) return false
264
-
265
- joining?.let { ensureClosedJoined(it) }
266
-
267
- resumeReadOp()
268
- resumeWriteOp()
269
-
270
- return true
271
- }
272
-
273
- private fun tryReleaseBuffer (): Boolean {
274
270
var toRelease: ReadWriteBufferState .Initial ? = null
275
271
276
272
updateState { state ->
277
- toRelease?.let { buffer ->
278
- toRelease = null
279
- buffer.capacity.resetForWrite()
280
- resumeWriteOp()
281
- }
282
- val closed = closed
283
-
284
273
when {
285
274
state == = ReadWriteBufferState .Terminated -> return true
286
275
state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
287
- closed != null && state is ReadWriteBufferState .IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null ) -> {
288
- if (closed.cause != null ) state.capacity.forceLockForRelease()
276
+ closed.cause != null && state is ReadWriteBufferState .IdleNonEmpty -> {
277
+ // here we don't need to tryLockForRelease as we already have closed state
289
278
toRelease = state.initial
290
279
ReadWriteBufferState .Terminated
291
280
}
@@ -299,6 +288,13 @@ internal class ByteBufferChannel(
299
288
}
300
289
}
301
290
291
+ joining?.let { ensureClosedJoined(it) }
292
+
293
+ WriteOp .getAndSet(this , null )?.resumeWithException(closed.sendException)
294
+ ReadOp .getAndSet(this , null )?.apply {
295
+ if (closed.cause != null ) resumeWithException(closed.cause) else resume(false )
296
+ }
297
+
302
298
return true
303
299
}
304
300
@@ -1799,17 +1795,7 @@ internal class ByteBufferChannel(
1799
1795
suspend override fun <A : Appendable > readUTF8LineTo (out : A , limit : Int ) = readUTF8LineToAscii(out , limit)
1800
1796
1801
1797
private fun resumeReadOp () {
1802
- ReadOp .getAndSet(this , null )?.apply {
1803
- val closedCause = closed?.cause
1804
- when {
1805
- closedCause != null -> resumeWithException(closedCause)
1806
- else -> resume(true )
1807
- }
1808
- }
1809
- }
1810
-
1811
- private fun resumeReadOp (result : Throwable ) {
1812
- ReadOp .getAndSet(this , null )?.resumeWithException(result)
1798
+ ReadOp .getAndSet(this , null )?.resume(true )
1813
1799
}
1814
1800
1815
1801
private fun resumeWriteOp () {
@@ -1819,10 +1805,6 @@ internal class ByteBufferChannel(
1819
1805
}
1820
1806
}
1821
1807
1822
- private fun resumeWriteOp (cause : Throwable ) {
1823
- WriteOp .getAndSet(this , null )?.resumeWithException(cause)
1824
- }
1825
-
1826
1808
private fun resumeClosed (cause : Throwable ? ) {
1827
1809
ReadOp .getAndSet(this , null )?.let { c ->
1828
1810
if (cause != null )
0 commit comments