@@ -630,6 +630,76 @@ class ByteBufferChannelTest : TestBase() {
630
630
assertEquals(" abc" , ch.readASCIILine())
631
631
}
632
632
633
+ @Test
634
+ fun testReadAndWriteLarge () = runTest {
635
+ val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
636
+ val data = ByteBuffer .allocate(8192 )!!
637
+ Random ().nextBytes(data.array())
638
+
639
+ launch(" writer" ) {
640
+ repeat(count.toInt()) {
641
+ data.clear()
642
+ ch.writeFully(data)
643
+ }
644
+ ch.close()
645
+ }
646
+
647
+ launch(" reader" ) {
648
+ val buffer = ByteBuffer .allocate(8192 )!!
649
+ var read = 0L
650
+ val total = count * 8192
651
+
652
+ while (read < total) {
653
+ buffer.clear()
654
+ val rc = ch.readFully(buffer)
655
+ if (rc == - 1 ) break
656
+ read + = rc
657
+ }
658
+
659
+ assertEquals(total, read)
660
+
661
+ buffer.clear()
662
+ assertEquals(- 1 , ch.readAvailable(buffer))
663
+ }
664
+ }
665
+
666
+ @Test
667
+ fun testReadAndWriteLargeViaLookAheadSession () = runTest {
668
+ val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
669
+ val data = ByteBuffer .allocate(8192 )!!
670
+ Random ().nextBytes(data.array())
671
+
672
+ launch(" writer" ) {
673
+ repeat(count.toInt()) {
674
+ data.clear()
675
+ ch.writeFully(data)
676
+ }
677
+ ch.close()
678
+ }
679
+
680
+ launch(" reader" ) {
681
+ var read = 0L
682
+ val total = count * 8192
683
+
684
+ ch.lookAheadSuspend {
685
+ while (read < total) {
686
+ val bb = request(0 , 1 )
687
+ if (bb == null ) {
688
+ if (! awaitAtLeast(1 )) break
689
+ continue
690
+ }
691
+ val rc = bb.remaining()
692
+ bb.position(bb.limit())
693
+ read + = rc
694
+ consumed(rc)
695
+ }
696
+ }
697
+
698
+ assertEquals(total, read)
699
+ assertEquals(- 1 , ch.readAvailable(ByteBuffer .allocate(8192 )))
700
+ }
701
+ }
702
+
633
703
@Test
634
704
fun testCopyLarge () {
635
705
val count = 100 * 256 * stressTestMultiplier // * 8192
@@ -752,6 +822,16 @@ class ByteBufferChannelTest : TestBase() {
752
822
latch.await()
753
823
}
754
824
825
+ private fun CoroutineScope.launch (name : String = "child", block : suspend () -> Unit ): Job {
826
+ return launch(context = DefaultDispatcher + CoroutineName (name), parent = coroutineContext[Job ]) {
827
+ block()
828
+ }.apply {
829
+ invokeOnCompletion(true ) { t ->
830
+ if (t != null ) ch.cancel(t)
831
+ }
832
+ }
833
+ }
834
+
755
835
private fun launch (block : suspend () -> Unit ): Job {
756
836
return launch(DefaultDispatcher ) {
757
837
try {
0 commit comments