Skip to content

Commit bbbe85c

Browse files
committed
8316580: HttpClient with StructuredTaskScope does not close when a task fails
Reviewed-by: abakhtin, phh Backport-of: d8291f593762ab270bf05643b87c57578d716242
1 parent 4b4cfe7 commit bbbe85c

File tree

4 files changed

+494
-24
lines changed

4 files changed

+494
-24
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -279,23 +279,25 @@ public String toString() {
279279
}
280280
}
281281

282-
static void registerPending(PendingRequest pending) {
282+
static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
283283
// shortcut if cf is already completed: no need to go through the trouble of
284284
// registering it
285-
if (pending.cf.isDone()) return;
285+
if (pending.cf.isDone()) return res;
286286

287287
var client = pending.client;
288288
var cf = pending.cf;
289289
var id = pending.id;
290290
boolean added = client.pendingRequests.add(pending);
291291
// this may immediately remove `pending` from the set is the cf is already completed
292-
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
292+
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
293+
pending.ref = ref;
293294
assert added : "request %d was already added".formatted(id);
294295
// should not happen, unless the selector manager has already
295296
// exited abnormally
296297
if (client.selmgr.isClosed()) {
297298
pending.abort(client.selmgr.selectorClosedException());
298299
}
300+
return ref;
299301
}
300302

301303
static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
@@ -875,8 +877,9 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
875877
cf = sendAsync(req, responseHandler, null, null);
876878
return cf.get();
877879
} catch (InterruptedException ie) {
878-
if (cf != null )
880+
if (cf != null) {
879881
cf.cancel(true);
882+
}
880883
throw ie;
881884
} catch (ExecutionException e) {
882885
final Throwable throwable = e.getCause();
@@ -991,19 +994,23 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
991994
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
992995
}
993996

994-
// makes sure that any dependent actions happen in the CF default
995-
// executor. This is only needed for sendAsync(...), when
996-
// exchangeExecutor is non-null.
997-
if (exchangeExecutor != null) {
998-
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
999-
}
1000-
1001997
// The mexCf is the Cf we need to abort if the SelectorManager thread
1002998
// is aborted.
1003999
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
1004-
registerPending(pending);
1005-
return res;
1006-
} catch(Throwable t) {
1000+
res = registerPending(pending, res);
1001+
1002+
if (exchangeExecutor != null) {
1003+
// makes sure that any dependent actions happen in the CF default
1004+
// executor. This is only needed for sendAsync(...), when
1005+
// exchangeExecutor is non-null.
1006+
return res.isDone() ? res
1007+
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
1008+
} else {
1009+
// make a defensive copy that can be safely canceled
1010+
// by the caller
1011+
return res.isDone() ? res : res.copy();
1012+
}
1013+
} catch (Throwable t) {
10071014
requestUnreference();
10081015
debugCompleted("ClientImpl (async)", start, userRequest);
10091016
throw t;

src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class MultiExchange<T> implements Cancelable {
9191
Exchange<T> previous;
9292
volatile Throwable retryCause;
9393
volatile boolean expiredOnce;
94-
volatile HttpResponse<T> response = null;
94+
volatile HttpResponse<T> response;
9595

9696
// Maximum number of times a request will be retried/redirected
9797
// for any reason
@@ -274,11 +274,19 @@ public void cancel(IOException cause) {
274274
@Override
275275
public boolean cancel(boolean mayInterruptIfRunning) {
276276
boolean cancelled = this.cancelled;
277+
boolean firstCancel = false;
277278
if (!cancelled && mayInterruptIfRunning) {
278279
if (interrupted.get() == null) {
279-
interrupted.compareAndSet(null,
280+
firstCancel = interrupted.compareAndSet(null,
280281
new CancellationException("Request cancelled"));
281282
}
283+
if (debug.on()) {
284+
if (firstCancel) {
285+
debug.log("multi exchange recording: " + interrupted.get());
286+
} else {
287+
debug.log("multi exchange recorded: " + interrupted.get());
288+
}
289+
}
282290
this.cancelled = true;
283291
var exchange = getExchange();
284292
if (exchange != null) {
@@ -360,17 +368,30 @@ private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T>
360368
}).exceptionallyCompose(this::whenCancelled);
361369
}
362370

371+
// returns a CancellationExcpetion that wraps the given cause
372+
// if cancel(boolean) was called, the given cause otherwise
373+
private Throwable wrapIfCancelled(Throwable cause) {
374+
CancellationException interrupt = interrupted.get();
375+
if (interrupt == null) return cause;
376+
377+
var cancel = new CancellationException(interrupt.getMessage());
378+
// preserve the stack trace of the original exception to
379+
// show where the call to cancel(boolean) came from
380+
cancel.setStackTrace(interrupt.getStackTrace());
381+
cancel.initCause(Utils.getCancelCause(cause));
382+
return cancel;
383+
}
384+
385+
// if the request failed because the multi exchange was cancelled,
386+
// make sure the reported exception is wrapped in CancellationException
363387
private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {
364-
CancellationException x = interrupted.get();
365-
if (x != null) {
366-
// make sure to fail with CancellationException if cancel(true)
367-
// was called.
368-
t = x.initCause(Utils.getCancelCause(t));
388+
var x = wrapIfCancelled(t);
389+
if (x instanceof CancellationException) {
369390
if (debug.on()) {
370-
debug.log("MultiExchange interrupted with: " + t.getCause());
391+
debug.log("MultiExchange interrupted with: " + x.getCause());
371392
}
372393
}
373-
return MinimalFuture.failedFuture(t);
394+
return MinimalFuture.failedFuture(x);
374395
}
375396

376397
static class NullSubscription implements Flow.Subscription {

0 commit comments

Comments
 (0)