11/*
2- * Copyright (c) 2018, 2023 , Oracle and/or its affiliates. All rights reserved.
2+ * Copyright (c) 2018, 2025 , Oracle and/or its affiliates. All rights reserved.
33 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44 *
55 * This code is free software; you can redistribute it and/or modify it
2121 * questions.
2222 */
2323
24- /**
24+ /*
2525 * @test id=default
2626 * @bug 8284161
2727 * @summary Test virtual threads doing blocking I/O on NIO channels
2828 * @library /test/lib
2929 * @run junit BlockingChannelOps
3030 */
3131
32- /**
32+ /*
3333 * @test id=poller-modes
3434 * @requires (os.family == "linux") | (os.family == "mac")
3535 * @library /test/lib
3636 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
3737 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
3838 */
3939
40- /**
40+ /*
4141 * @test id=no-vmcontinuations
4242 * @requires vm.continuations
4343 * @library /test/lib
6262import java .nio .channels .ServerSocketChannel ;
6363import java .nio .channels .SocketChannel ;
6464import java .nio .channels .WritableByteChannel ;
65+ import java .util .concurrent .locks .LockSupport ;
6566
6667import jdk .test .lib .thread .VThreadRunner ;
6768import org .junit .jupiter .api .Test ;
@@ -161,6 +162,22 @@ void testSocketChannelReadAsyncClose() throws Exception {
161162 });
162163 }
163164
165+ /**
166+ * SocketChannel shutdownInput while virtual thread blocked in read.
167+ */
168+ @ Test
169+ void testSocketChannelReadAsyncShutdownInput () throws Exception {
170+ VThreadRunner .run (() -> {
171+ try (var connection = new Connection ()) {
172+ SocketChannel sc = connection .channel1 ();
173+ runAfterParkedAsync (sc ::shutdownInput );
174+ int n = sc .read (ByteBuffer .allocate (100 ));
175+ assertEquals (-1 , n );
176+ assertTrue (sc .isOpen ());
177+ }
178+ });
179+ }
180+
164181 /**
165182 * Virtual thread interrupted while blocked in SocketChannel read.
166183 */
@@ -190,13 +207,15 @@ void testSocketChannelReadInterrupt() throws Exception {
190207 @ Test
191208 void testSocketChannelWriteAsyncClose () throws Exception {
192209 VThreadRunner .run (() -> {
193- boolean retry = true ;
194- while (retry ) {
210+ boolean done = false ;
211+ while (! done ) {
195212 try (var connection = new Connection ()) {
196213 SocketChannel sc = connection .channel1 ();
197214
198215 // close sc when current thread blocks in write
199- runAfterParkedAsync (sc ::close );
216+ runAfterParkedAsync (sc ::close , true );
217+
218+ // write until channel is closed
200219 try {
201220 ByteBuffer bb = ByteBuffer .allocate (100 *1024 );
202221 for (;;) {
@@ -206,11 +225,39 @@ void testSocketChannelWriteAsyncClose() throws Exception {
206225 }
207226 } catch (AsynchronousCloseException expected ) {
208227 // closed when blocked in write
209- retry = false ;
228+ done = true ;
210229 } catch (ClosedChannelException e ) {
211- // closed when not blocked in write, need to retry test
230+ // closed but not blocked in write, need to retry test
231+ System .err .format ("%s, need to retry!%n" , e );
232+ }
233+ }
234+ }
235+ });
236+ }
237+
238+
239+ /**
240+ * SocketChannel shutdownOutput while virtual thread blocked in write.
241+ */
242+ @ Test
243+ void testSocketChannelWriteAsyncShutdownOutput () throws Exception {
244+ VThreadRunner .run (() -> {
245+ try (var connection = new Connection ()) {
246+ SocketChannel sc = connection .channel1 ();
247+
248+ // shutdown output when current thread blocks in write
249+ runAfterParkedAsync (sc ::shutdownOutput );
250+ try {
251+ ByteBuffer bb = ByteBuffer .allocate (100 *1024 );
252+ for (;;) {
253+ int n = sc .write (bb );
254+ assertTrue (n > 0 );
255+ bb .clear ();
212256 }
257+ } catch (ClosedChannelException e ) {
258+ // expected
213259 }
260+ assertTrue (sc .isOpen ());
214261 }
215262 });
216263 }
@@ -221,15 +268,16 @@ void testSocketChannelWriteAsyncClose() throws Exception {
221268 @ Test
222269 void testSocketChannelWriteInterrupt () throws Exception {
223270 VThreadRunner .run (() -> {
224- boolean retry = true ;
225- while (retry ) {
271+ boolean done = false ;
272+ while (! done ) {
226273 try (var connection = new Connection ()) {
227274 SocketChannel sc = connection .channel1 ();
228275
229276 // interrupt current thread when it blocks in write
230277 Thread thisThread = Thread .currentThread ();
231- runAfterParkedAsync (thisThread ::interrupt );
278+ runAfterParkedAsync (thisThread ::interrupt , true );
232279
280+ // write until channel is closed
233281 try {
234282 ByteBuffer bb = ByteBuffer .allocate (100 *1024 );
235283 for (;;) {
@@ -240,9 +288,10 @@ void testSocketChannelWriteInterrupt() throws Exception {
240288 } catch (ClosedByInterruptException e ) {
241289 // closed when blocked in write
242290 assertTrue (Thread .interrupted ());
243- retry = false ;
291+ done = true ;
244292 } catch (ClosedChannelException e ) {
245- // closed when not blocked in write, need to retry test
293+ // closed but not blocked in write, need to retry test
294+ System .err .format ("%s, need to retry!%n" , e );
246295 }
247296 }
248297 }
@@ -734,14 +783,16 @@ void testPipeReadInterrupt() throws Exception {
734783 @ Test
735784 void testPipeWriteAsyncClose () throws Exception {
736785 VThreadRunner .run (() -> {
737- boolean retry = true ;
738- while (retry ) {
786+ boolean done = false ;
787+ while (! done ) {
739788 Pipe p = Pipe .open ();
740789 try (Pipe .SinkChannel sink = p .sink ();
741790 Pipe .SourceChannel source = p .source ()) {
742791
743792 // close sink when current thread blocks in write
744- runAfterParkedAsync (sink ::close );
793+ runAfterParkedAsync (sink ::close , true );
794+
795+ // write until channel is closed
745796 try {
746797 ByteBuffer bb = ByteBuffer .allocate (100 *1024 );
747798 for (;;) {
@@ -751,9 +802,10 @@ void testPipeWriteAsyncClose() throws Exception {
751802 }
752803 } catch (AsynchronousCloseException e ) {
753804 // closed when blocked in write
754- retry = false ;
805+ done = true ;
755806 } catch (ClosedChannelException e ) {
756- // closed when not blocked in write, need to retry test
807+ // closed but not blocked in write, need to retry test
808+ System .err .format ("%s, need to retry!%n" , e );
757809 }
758810 }
759811 }
@@ -766,16 +818,17 @@ void testPipeWriteAsyncClose() throws Exception {
766818 @ Test
767819 void testPipeWriteInterrupt () throws Exception {
768820 VThreadRunner .run (() -> {
769- boolean retry = true ;
770- while (retry ) {
821+ boolean done = false ;
822+ while (! done ) {
771823 Pipe p = Pipe .open ();
772824 try (Pipe .SinkChannel sink = p .sink ();
773825 Pipe .SourceChannel source = p .source ()) {
774826
775827 // interrupt current thread when it blocks in write
776828 Thread thisThread = Thread .currentThread ();
777- runAfterParkedAsync (thisThread ::interrupt );
829+ runAfterParkedAsync (thisThread ::interrupt , true );
778830
831+ // write until channel is closed
779832 try {
780833 ByteBuffer bb = ByteBuffer .allocate (100 *1024 );
781834 for (;;) {
@@ -786,9 +839,10 @@ void testPipeWriteInterrupt() throws Exception {
786839 } catch (ClosedByInterruptException expected ) {
787840 // closed when blocked in write
788841 assertTrue (Thread .interrupted ());
789- retry = false ;
842+ done = true ;
790843 } catch (ClosedChannelException e ) {
791- // closed when not blocked in write, need to retry test
844+ // closed but not blocked in write, need to retry test
845+ System .err .format ("%s, need to retry!%n" , e );
792846 }
793847 }
794848 }
@@ -848,26 +902,50 @@ interface ThrowingRunnable {
848902 }
849903
850904 /**
851- * Runs the given task asynchronously after the current virtual thread has parked.
905+ * Runs the given task asynchronously after the current virtual thread parks.
906+ * @param writing if the thread will block in write
852907 * @return the thread started to run the task
853908 */
854- static Thread runAfterParkedAsync (ThrowingRunnable task ) {
909+ private static Thread runAfterParkedAsync (ThrowingRunnable task , boolean writing ) {
855910 Thread target = Thread .currentThread ();
856911 if (!target .isVirtual ())
857912 throw new WrongThreadException ();
858913 return Thread .ofPlatform ().daemon ().start (() -> {
859914 try {
860- Thread .State state = target .getState ();
861- while (state != Thread .State .WAITING
862- && state != Thread .State .TIMED_WAITING ) {
915+ // wait for target thread to park
916+ while (!isWaiting (target )) {
863917 Thread .sleep (20 );
864- state = target .getState ();
865918 }
866- Thread .sleep (20 ); // give a bit more time to release carrier
919+
920+ // if the target thread is parked in write then we nudge it a few times
921+ // to avoid wakeup with some bytes written
922+ if (writing ) {
923+ for (int i = 0 ; i < 3 ; i ++) {
924+ LockSupport .unpark (target );
925+ while (!isWaiting (target )) {
926+ Thread .sleep (20 );
927+ }
928+ }
929+ }
930+
867931 task .run ();
932+
868933 } catch (Exception e ) {
869934 e .printStackTrace ();
870935 }
871936 });
872937 }
938+
939+ private static Thread runAfterParkedAsync (ThrowingRunnable task ) {
940+ return runAfterParkedAsync (task , false );
941+ }
942+
943+ /**
944+ * Return true if the given Thread is parked.
945+ */
946+ private static boolean isWaiting (Thread target ) {
947+ Thread .State state = target .getState ();
948+ assertNotEquals (Thread .State .TERMINATED , state );
949+ return (state == Thread .State .WAITING || state == Thread .State .TIMED_WAITING );
950+ }
873951}
0 commit comments