@@ -395,15 +395,15 @@ internal class ByteBufferChannel(
395
395
else consumed + consumed0
396
396
}
397
397
398
- suspend override fun readFully (dst : ByteArray , offset : Int , length : Int ) {
398
+ final suspend override fun readFully (dst : ByteArray , offset : Int , length : Int ) {
399
399
val consumed = readAsMuchAsPossible(dst, offset, length)
400
400
401
401
if (consumed < length) {
402
- readFullySuspend(dst, offset + consumed, length - consumed)
402
+ return readFullySuspend(dst, offset + consumed, length - consumed)
403
403
}
404
404
}
405
405
406
- suspend override fun readFully (dst : ByteBuffer ): Int {
406
+ final suspend override fun readFully (dst : ByteBuffer ): Int {
407
407
val rc = readAsMuchAsPossible(dst)
408
408
if (! dst.hasRemaining()) return rc
409
409
@@ -491,7 +491,7 @@ internal class ByteBufferChannel(
491
491
return readAvailable(dst)
492
492
}
493
493
494
- suspend override fun readPacket (size : Int , headerSizeHint : Int ): ByteReadPacket {
494
+ final suspend override fun readPacket (size : Int , headerSizeHint : Int ): ByteReadPacket {
495
495
closed?.cause?.let { throw it }
496
496
497
497
if (size == 0 ) return ByteReadPacket .Empty
@@ -525,7 +525,7 @@ internal class ByteBufferChannel(
525
525
BufferPool .recycle(buffer)
526
526
builder.build()
527
527
} else {
528
- readPacketSuspend(remaining, builder, buffer)
528
+ return readPacketSuspend(remaining, builder, buffer)
529
529
}
530
530
}
531
531
@@ -557,7 +557,7 @@ internal class ByteBufferChannel(
557
557
}
558
558
}
559
559
560
- suspend override fun readByte (): Byte {
560
+ final suspend override fun readByte (): Byte {
561
561
var b: Byte = 0
562
562
563
563
val rc = reading {
@@ -568,10 +568,10 @@ internal class ByteBufferChannel(
568
568
} else false
569
569
}
570
570
571
- return if (rc) {
572
- b
571
+ if (rc) {
572
+ return b
573
573
} else {
574
- readByteSuspend()
574
+ return readByteSuspend()
575
575
}
576
576
}
577
577
@@ -580,7 +580,7 @@ internal class ByteBufferChannel(
580
580
return readByte()
581
581
}
582
582
583
- suspend override fun readBoolean (): Boolean {
583
+ final suspend override fun readBoolean (): Boolean {
584
584
var b = false
585
585
586
586
val rc = reading {
@@ -591,10 +591,10 @@ internal class ByteBufferChannel(
591
591
} else false
592
592
}
593
593
594
- return if (rc) {
595
- b
594
+ if (rc) {
595
+ return b
596
596
} else {
597
- readBooleanSuspend()
597
+ return readBooleanSuspend()
598
598
}
599
599
}
600
600
@@ -603,7 +603,7 @@ internal class ByteBufferChannel(
603
603
return readBoolean()
604
604
}
605
605
606
- suspend override fun readShort (): Short {
606
+ final suspend override fun readShort (): Short {
607
607
var sh: Short = 0
608
608
609
609
val rc = reading {
@@ -615,10 +615,10 @@ internal class ByteBufferChannel(
615
615
} else false
616
616
}
617
617
618
- return if (rc) {
619
- sh
618
+ if (rc) {
619
+ return sh
620
620
} else {
621
- readShortSuspend()
621
+ return readShortSuspend()
622
622
}
623
623
}
624
624
@@ -627,7 +627,7 @@ internal class ByteBufferChannel(
627
627
return readShort()
628
628
}
629
629
630
- suspend override fun readInt (): Int {
630
+ final suspend override fun readInt (): Int {
631
631
var i = 0
632
632
633
633
val rc = reading {
@@ -639,10 +639,10 @@ internal class ByteBufferChannel(
639
639
} else false
640
640
}
641
641
642
- return if (rc) {
643
- i
642
+ if (rc) {
643
+ return i
644
644
} else {
645
- readIntSuspend()
645
+ return readIntSuspend()
646
646
}
647
647
}
648
648
@@ -651,7 +651,7 @@ internal class ByteBufferChannel(
651
651
return readInt()
652
652
}
653
653
654
- suspend override fun readLong (): Long {
654
+ final suspend override fun readLong (): Long {
655
655
var i = 0L
656
656
657
657
val rc = reading {
@@ -663,10 +663,10 @@ internal class ByteBufferChannel(
663
663
} else false
664
664
}
665
665
666
- return if (rc) {
667
- i
666
+ if (rc) {
667
+ return i
668
668
} else {
669
- readLongSuspend()
669
+ return readLongSuspend()
670
670
}
671
671
}
672
672
@@ -675,7 +675,7 @@ internal class ByteBufferChannel(
675
675
return readLong()
676
676
}
677
677
678
- suspend override fun readDouble (): Double {
678
+ final suspend override fun readDouble (): Double {
679
679
var d = 0.0
680
680
681
681
val rc = reading {
@@ -687,10 +687,10 @@ internal class ByteBufferChannel(
687
687
} else false
688
688
}
689
689
690
- return if (rc) {
691
- d
690
+ if (rc) {
691
+ return d
692
692
} else {
693
- readDoubleSuspend()
693
+ return readDoubleSuspend()
694
694
}
695
695
}
696
696
@@ -699,7 +699,7 @@ internal class ByteBufferChannel(
699
699
return readDouble()
700
700
}
701
701
702
- suspend override fun readFloat (): Float {
702
+ final suspend override fun readFloat (): Float {
703
703
var f = 0.0f
704
704
705
705
val rc = reading {
@@ -711,10 +711,10 @@ internal class ByteBufferChannel(
711
711
} else false
712
712
}
713
713
714
- return if (rc) {
715
- f
714
+ if (rc) {
715
+ return f
716
716
} else {
717
- readFloatSuspend()
717
+ return readFloatSuspend()
718
718
}
719
719
}
720
720
@@ -902,15 +902,18 @@ internal class ByteBufferChannel(
902
902
}
903
903
904
904
suspend override fun writeInt (i : Int ) {
905
- val delegated = resolveDelegation(this )
906
- if (delegated != = this ) return delegated.writeInt(i)
907
-
908
- val buffer = setupStateForWrite() ? : return delegateInt(i)
905
+ val buffer = setupStateForWrite()
906
+ if (buffer == null ) {
907
+ val delegation = resolveDelegation(this )
908
+ if (delegation != = this ) return delegation.writeInt(i)
909
+ else return delegateSuspend(joining!! , { writeInt(i) })
910
+ }
909
911
val c = state.capacity
910
-
911
- if (! buffer.tryWriteInt(i, c)) {
912
- return buffer.writeIntSuspend(i, c)
912
+ //
913
+ if (buffer.tryWriteInt(i, c)) {
914
+ return
913
915
}
916
+ return buffer.writeIntSuspend(i, c)
914
917
}
915
918
916
919
private tailrec suspend fun ByteBuffer.writeIntSuspend (i : Int , c : RingBufferCapacity ) {
@@ -992,11 +995,11 @@ internal class ByteBufferChannel(
992
995
}
993
996
994
997
suspend override fun writeDouble (d : Double ) {
995
- writeLong(java.lang.Double .doubleToRawLongBits(d))
998
+ return writeLong(java.lang.Double .doubleToRawLongBits(d))
996
999
}
997
1000
998
1001
suspend override fun writeFloat (f : Float ) {
999
- writeInt(java.lang.Float .floatToRawIntBits(f))
1002
+ return writeInt(java.lang.Float .floatToRawIntBits(f))
1000
1003
}
1001
1004
1002
1005
suspend override fun writeAvailable (src : ByteBuffer ): Int {
@@ -1078,7 +1081,7 @@ internal class ByteBufferChannel(
1078
1081
val joined = joining
1079
1082
1080
1083
if (joined != null ) {
1081
- joined.awaitClose()
1084
+ return joined.awaitClose()
1082
1085
} else if (closed == null ) {
1083
1086
error(" Only works for joined" )
1084
1087
}
@@ -1091,6 +1094,10 @@ internal class ByteBufferChannel(
1091
1094
}
1092
1095
closed?.let { closed -> throw closed.sendException }
1093
1096
1097
+ return joinFromSuspend(src, delegateClose)
1098
+ }
1099
+
1100
+ private suspend fun joinFromSuspend (src : ByteBufferChannel , delegateClose : Boolean ) {
1094
1101
val joined = src.setupDelegateTo(this , delegateClose)
1095
1102
copyDirect(src, Long .MAX_VALUE , joined)
1096
1103
@@ -1316,7 +1323,7 @@ internal class ByteBufferChannel(
1316
1323
1317
1324
if (rem == 0 ) return
1318
1325
1319
- writeFullySuspend(src, off, rem)
1326
+ return writeFullySuspend(src, off, rem)
1320
1327
}
1321
1328
1322
1329
private tailrec suspend fun writeFullySuspend (src : ByteArray , offset : Int , length : Int ) {
@@ -1456,9 +1463,9 @@ internal class ByteBufferChannel(
1456
1463
* Never invokes [visitor] with empty buffer unless [last] = true. Invokes visitor with last = true at most once
1457
1464
* even if there are remaining bytes and visitor returned true.
1458
1465
*/
1459
- override suspend fun consumeEachBufferRange (visitor : (buffer: ByteBuffer , last: Boolean ) -> Boolean ) {
1466
+ final override suspend fun consumeEachBufferRange (visitor : (buffer: ByteBuffer , last: Boolean ) -> Boolean ) {
1460
1467
if (consumeEachBufferRangeFast(false , visitor)) return
1461
- consumeEachBufferRangeSuspend(visitor)
1468
+ return consumeEachBufferRangeSuspend(visitor)
1462
1469
}
1463
1470
1464
1471
override fun <R > lookAhead (visitor : LookAheadSession .() -> R ): R {
@@ -1511,7 +1518,16 @@ internal class ByteBufferChannel(
1511
1518
}
1512
1519
}
1513
1520
1514
- suspend override fun awaitAtLeast (n : Int ) {
1521
+ final suspend override fun awaitAtLeast (n : Int ) {
1522
+ if (state.capacity.availableForRead >= n) {
1523
+ if (state.idle) setupStateForRead()
1524
+ return
1525
+ }
1526
+
1527
+ return awaitAtLeastSuspend(n)
1528
+ }
1529
+
1530
+ private suspend fun awaitAtLeastSuspend (n : Int ) {
1515
1531
if (readSuspend(n) && state.idle) {
1516
1532
setupStateForRead()
1517
1533
}
@@ -1562,12 +1578,12 @@ internal class ByteBufferChannel(
1562
1578
return rc
1563
1579
}
1564
1580
1565
- private suspend fun consumeEachBufferRangeSuspend (visitor : (buffer: ByteBuffer , last: Boolean ) -> Boolean ): Boolean {
1581
+ private suspend fun consumeEachBufferRangeSuspend (visitor : (buffer: ByteBuffer , last: Boolean ) -> Boolean ) {
1566
1582
var last = false
1567
1583
1568
1584
do {
1569
- if (consumeEachBufferRangeFast(last, visitor)) return true
1570
- if (last) return false
1585
+ if (consumeEachBufferRangeFast(last, visitor)) return
1586
+ if (last) return
1571
1587
if (! readSuspend(1 )) {
1572
1588
last = true
1573
1589
}
0 commit comments