Skip to content

Commit 5806906

Browse files
author
Alexey Bakhtin
committed
8317808: HTTP/2 stream cancelImpl may leave subscriber registered
Backport-of: 6273ab97dc1a0d3c1f51ba94694d9594dd7593d4
1 parent 951453e commit 5806906

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,9 @@ public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
575575
if (debug.on()) {
576576
debug.log("body subscriber registered: " + count);
577577
}
578+
return true;
578579
}
579-
return true;
580+
return false;
580581
}
581582
} finally {
582583
selmgr.unlock();

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void schedule() {
191191
if (debug.on()) debug.log("subscribing user subscriber");
192192
subscriber.onSubscribe(userSubscription);
193193
}
194-
while (!inputQ.isEmpty()) {
194+
while (!inputQ.isEmpty() && errorRef.get() == null) {
195195
Http2Frame frame = inputQ.peek();
196196
if (frame instanceof ResetFrame rf) {
197197
inputQ.remove();
@@ -425,6 +425,10 @@ private void sendDataFrame(DataFrame frame) {
425425
// pushes entire response body into response subscriber
426426
// blocking when required by local or remote flow control
427427
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
428+
// ensure that the body subscriber will be subscribed and onError() is
429+
// invoked
430+
pendingResponseSubscriber = bodySubscriber;
431+
428432
// We want to allow the subscriber's getBody() method to block so it
429433
// can work with InputStreams. So, we offload execution.
430434
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
@@ -435,9 +439,6 @@ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor exec
435439
responseBodyCF.completeExceptionally(t);
436440
}
437441

438-
// ensure that the body subscriber will be subscribed and onError() is
439-
// invoked
440-
pendingResponseSubscriber = bodySubscriber;
441442
sched.runOrSchedule(); // in case data waiting already to be processed, or error
442443

443444
return responseBodyCF;

test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
474474
if (response != null) {
475475
finisher.finish(where, response, thrower);
476476
}
477+
var tracker = TRACKER.getTracker(client);
477478
if (!sameClient) {
478479
// Wait for the client to be garbage collected.
479480
// we use the ReferenceTracker API rather than HttpClient::close here,
@@ -482,7 +483,6 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
482483
// By using the ReferenceTracker, we will get some diagnosis about what
483484
// is keeping the client alive if it doesn't get GC'ed within the
484485
// expected time frame.
485-
var tracker = TRACKER.getTracker(client);
486486
client = null;
487487
System.gc();
488488
System.out.println(now() + "waiting for client to shutdown: " + tracker.getName());
@@ -491,6 +491,14 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
491491
if (error != null) throw error;
492492
System.out.println(now() + "client shutdown normally: " + tracker.getName());
493493
System.err.println(now() + "client shutdown normally: " + tracker.getName());
494+
} else {
495+
System.out.println(now() + "waiting for operation to finish: " + tracker.getName());
496+
System.err.println(now() + "waiting for operation to finish: " + tracker.getName());
497+
var error = TRACKER.checkFinished(tracker, 10000);
498+
if (error != null) throw error;
499+
System.out.println(now() + "operation finished normally: " + tracker.getName());
500+
System.err.println(now() + "operation finished normally: " + tracker.getName());
501+
494502
}
495503
}
496504
}
@@ -800,7 +808,7 @@ public void teardown() throws Exception {
800808
sharedClient == null ? null : sharedClient.toString();
801809
sharedClient = null;
802810
Thread.sleep(100);
803-
AssertionError fail = TRACKER.check(500);
811+
AssertionError fail = TRACKER.check(5000);
804812
try {
805813
httpTestServer.stop();
806814
httpsTestServer.stop();

test/jdk/java/net/httpclient/ReferenceTracker.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ public AssertionError check(Tracker tracker, long graceDelayMs) {
115115
"outstanding operations or unreleased resources", true);
116116
}
117117

118+
public AssertionError checkFinished(Tracker tracker, long graceDelayMs) {
119+
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
120+
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;
121+
return check(tracker, graceDelayMs,
122+
hasOperations.or(hasSubscribers),
123+
"outstanding operations or unreleased resources", false);
124+
}
125+
118126
public AssertionError check(long graceDelayMs) {
119127
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
120128
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;

0 commit comments

Comments
 (0)