11/*
2- * Copyright (c) 2020, 2023 , Oracle and/or its affiliates. All rights reserved.
2+ * Copyright (c) 2020, 2025 , Oracle and/or its affiliates. All rights reserved.
33 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44 *
55 * This code is free software; you can redistribute it and/or modify it
3333 * @summary Tests HttpRequest.BodyPublishers::concat
3434 */
3535
36- import java .net .InetAddress ;
37- import java .net .InetSocketAddress ;
3836import java .net .URI ;
3937import java .net .http .HttpClient ;
4038import java .net .http .HttpRequest ;
5755import java .util .concurrent .Flow ;
5856import java .util .concurrent .Flow .Subscriber ;
5957import java .util .concurrent .Flow .Subscription ;
58+ import java .util .concurrent .Semaphore ;
6059import java .util .concurrent .TimeUnit ;
6160import java .util .concurrent .TimeoutException ;
6261import java .util .concurrent .atomic .AtomicLong ;
6766import java .util .stream .LongStream ;
6867import java .util .stream .Stream ;
6968import jdk .httpclient .test .lib .common .HttpServerAdapters ;
70- import jdk .httpclient .test .lib .http2 .Http2TestServer ;
7169import javax .net .ssl .SSLContext ;
7270
73- import com .sun .net .httpserver .HttpServer ;
74- import com .sun .net .httpserver .HttpsConfigurator ;
75- import com .sun .net .httpserver .HttpsServer ;
7671import jdk .test .lib .net .SimpleSSLContext ;
7772import org .testng .Assert ;
7873import org .testng .ITestContext ;
@@ -419,9 +414,11 @@ public void cancel() {
419414 }
420415
421416 static class RequestSubscriber implements Flow .Subscriber <ByteBuffer > {
422- CompletableFuture <Subscription > subscriptionCF = new CompletableFuture <>();
423- ConcurrentLinkedDeque <ByteBuffer > items = new ConcurrentLinkedDeque <>();
424- CompletableFuture <List <ByteBuffer >> resultCF = new CompletableFuture <>();
417+ final CompletableFuture <Subscription > subscriptionCF = new CompletableFuture <>();
418+ final ConcurrentLinkedDeque <ByteBuffer > items = new ConcurrentLinkedDeque <>();
419+ final CompletableFuture <List <ByteBuffer >> resultCF = new CompletableFuture <>();
420+
421+ final Semaphore semaphore = new Semaphore (0 );
425422
426423 @ Override
427424 public void onSubscribe (Subscription subscription ) {
@@ -431,6 +428,11 @@ public void onSubscribe(Subscription subscription) {
431428 @ Override
432429 public void onNext (ByteBuffer item ) {
433430 items .addLast (item );
431+ int available = semaphore .availablePermits ();
432+ if (available > Integer .MAX_VALUE - 8 ) {
433+ onError (new IllegalStateException ("too many buffers in queue: " + available ));
434+ }
435+ semaphore .release ();
434436 }
435437
436438 @ Override
@@ -443,6 +445,18 @@ public void onComplete() {
443445 resultCF .complete (items .stream ().collect (Collectors .toUnmodifiableList ()));
444446 }
445447
448+ public ByteBuffer take () {
449+ // it is not guaranteed that the buffer will be added to
450+ // the queue in the same thread that calls request(1).
451+ try {
452+ semaphore .acquire ();
453+ } catch (InterruptedException x ) {
454+ Thread .currentThread ().interrupt ();
455+ throw new CompletionException (x );
456+ }
457+ return items .pop ();
458+ }
459+
446460 CompletableFuture <List <ByteBuffer >> resultCF () { return resultCF ; }
447461 }
448462
@@ -628,8 +642,9 @@ public void testPositiveRequests() {
628642 publisher .subscribe (requestSubscriber1 );
629643 Subscription subscription1 = requestSubscriber1 .subscriptionCF .join ();
630644 subscription1 .request (16 );
631- assertTrue ( requestSubscriber1 . resultCF (). isDone ());
645+ // onNext() may not be called in the same thread than request()
632646 List <ByteBuffer > list1 = requestSubscriber1 .resultCF ().join ();
647+ assertTrue (requestSubscriber1 .resultCF ().isDone ());
633648 String result1 = stringFromBytes (list1 .stream ());
634649 assertEquals (result1 , "Lorem ipsum dolor sit amet, consectetur adipiscing elit." );
635650 System .out .println ("Got expected sentence with one request: \" %s\" " .formatted (result1 ));
@@ -646,8 +661,8 @@ public void testPositiveRequests() {
646661 subscription2 .request (4 );
647662 assertFalse (requestSubscriber2 .resultCF ().isDone ());
648663 subscription2 .request (1 );
649- assertTrue (requestSubscriber2 .resultCF ().isDone ());
650664 List <ByteBuffer > list2 = requestSubscriber2 .resultCF ().join ();
665+ assertTrue (requestSubscriber2 .resultCF ().isDone ());
651666 String result2 = stringFromBytes (list2 .stream ());
652667 assertEquals (result2 , "Lorem ipsum dolor sit amet, consectetur adipiscing elit." );
653668 System .out .println ("Got expected sentence with 4 requests: \" %s\" " .formatted (result1 ));
@@ -689,7 +704,7 @@ public void testCancel() {
689704 // receive half the data
690705 for (int i = 0 ; i < n ; i ++) {
691706 subscription .request (1 );
692- ByteBuffer buffer = subscriber .items . pop ();
707+ ByteBuffer buffer = subscriber .take ();
693708 }
694709
695710 // cancel subscription
@@ -789,7 +804,8 @@ public void testCancelSubscription() {
789804 @ Test (dataProvider = "variants" )
790805 public void test (String uri , boolean sameClient ) throws Exception {
791806 checkSkip ();
792- System .out .println ("Request to " + uri );
807+ System .out .printf ("Request to %s (sameClient: %s)%n" , uri , sameClient );
808+ System .err .printf ("Request to %s (sameClient: %s)%n" , uri , sameClient );
793809
794810 HttpClient client = newHttpClient (sameClient );
795811
@@ -802,7 +818,8 @@ public void test(String uri, boolean sameClient) throws Exception {
802818 .POST (publisher )
803819 .build ();
804820 for (int i = 0 ; i < ITERATION_COUNT ; i ++) {
805- System .out .println ("Iteration: " + i );
821+ System .out .println (uri + ": Iteration: " + i );
822+ System .err .println (uri + ": Iteration: " + i );
806823 HttpResponse <String > response = client .send (request , BodyHandlers .ofString ());
807824 int expectedResponse = RESPONSE_CODE ;
808825 if (response .statusCode () != expectedResponse )
0 commit comments