Skip to content

Commit 1400595

Browse files
committed
8317808: HTTP/2 stream cancelImpl may leave subscriber registered
Backport-of: 6273ab97dc1a0d3c1f51ba94694d9594dd7593d4
1 parent e236cad commit 1400595

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,9 @@ public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
560560
if (debug.on()) {
561561
debug.log("body subscriber registered: " + count);
562562
}
563+
return true;
563564
}
564-
return true;
565+
return false;
565566
}
566567
}
567568
}

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private void schedule() {
188188
if (debug.on()) debug.log("subscribing user subscriber");
189189
subscriber.onSubscribe(userSubscription);
190190
}
191-
while (!inputQ.isEmpty()) {
191+
while (!inputQ.isEmpty() && errorRef.get() == null) {
192192
Http2Frame frame = inputQ.peek();
193193
if (frame instanceof ResetFrame) {
194194
inputQ.remove();
@@ -416,6 +416,10 @@ private void sendDataFrame(DataFrame frame) {
416416
// pushes entire response body into response subscriber
417417
// blocking when required by local or remote flow control
418418
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
419+
// ensure that the body subscriber will be subscribed and onError() is
420+
// invoked
421+
pendingResponseSubscriber = bodySubscriber;
422+
419423
// We want to allow the subscriber's getBody() method to block so it
420424
// can work with InputStreams. So, we offload execution.
421425
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
@@ -426,9 +430,6 @@ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor exec
426430
responseBodyCF.completeExceptionally(t);
427431
}
428432

429-
// ensure that the body subscriber will be subscribed and onError() is
430-
// invoked
431-
pendingResponseSubscriber = bodySubscriber;
432433
sched.runOrSchedule(); // in case data waiting already to be processed, or error
433434

434435
return responseBodyCF;

test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
474474
if (response != null) {
475475
finisher.finish(where, response, thrower);
476476
}
477+
var tracker = TRACKER.getTracker(client);
477478
if (!sameClient) {
478479
// Wait for the client to be garbage collected.
479480
// we use the ReferenceTracker API rather than HttpClient::close here,
@@ -482,7 +483,6 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
482483
// By using the ReferenceTracker, we will get some diagnosis about what
483484
// is keeping the client alive if it doesn't get GC'ed within the
484485
// expected time frame.
485-
var tracker = TRACKER.getTracker(client);
486486
client = null;
487487
System.gc();
488488
System.out.println(now() + "waiting for client to shutdown: " + tracker.getName());
@@ -491,6 +491,14 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
491491
if (error != null) throw error;
492492
System.out.println(now() + "client shutdown normally: " + tracker.getName());
493493
System.err.println(now() + "client shutdown normally: " + tracker.getName());
494+
} else {
495+
System.out.println(now() + "waiting for operation to finish: " + tracker.getName());
496+
System.err.println(now() + "waiting for operation to finish: " + tracker.getName());
497+
var error = TRACKER.checkFinished(tracker, 10000);
498+
if (error != null) throw error;
499+
System.out.println(now() + "operation finished normally: " + tracker.getName());
500+
System.err.println(now() + "operation finished normally: " + tracker.getName());
501+
494502
}
495503
}
496504
}
@@ -800,7 +808,7 @@ public void teardown() throws Exception {
800808
sharedClient == null ? null : sharedClient.toString();
801809
sharedClient = null;
802810
Thread.sleep(100);
803-
AssertionError fail = TRACKER.check(500);
811+
AssertionError fail = TRACKER.check(5000);
804812
try {
805813
httpTestServer.stop();
806814
httpsTestServer.stop();

test/jdk/java/net/httpclient/ReferenceTracker.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ public AssertionError check(Tracker tracker, long graceDelayMs) {
115115
"outstanding operations or unreleased resources", true);
116116
}
117117

118+
public AssertionError checkFinished(Tracker tracker, long graceDelayMs) {
119+
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
120+
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;
121+
return check(tracker, graceDelayMs,
122+
hasOperations.or(hasSubscribers),
123+
"outstanding operations or unreleased resources", false);
124+
}
125+
118126
public AssertionError check(long graceDelayMs) {
119127
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
120128
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;

0 commit comments

Comments
 (0)