Skip to content

Commit dd091a9

Browse files
authored
IGNITE-26719 Java client: fix async continuation executor (#6862)
Fix `completeAsync` logic to always complete outer future using the `asyncContinuationExecutor`. Before that, we relied on `thenCompose` which could keep us in the Netty thread in some cases - `testDefaultAsyncContinuationExecutorIsForkJoinPool` was flaky (reproducible locally).
1 parent 17a3357 commit dd091a9

File tree

1 file changed

+46
-23
lines changed

1 file changed

+46
-23
lines changed

modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.util.concurrent.CompletableFuture.failedFuture;
2222
import static org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
2323
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
24+
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
2425
import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
2526
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
2627
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -424,11 +425,14 @@ private <T> CompletableFuture<T> send(
424425
}
425426

426427
// Handle the response in the async continuation pool with completeAsync.
427-
return fut
428-
.thenCompose(unpacker -> completeAsync(payloadReader, notificationFut, unpacker))
429-
.exceptionally(err -> {
430-
throw sneakyThrow(ViewUtils.ensurePublicException(err));
431-
});
428+
CompletableFuture<T> resFut = new CompletableFuture<>();
429+
430+
fut.handle((unpacker, err) -> {
431+
completeAsync(payloadReader, notificationFut, unpacker, err, resFut);
432+
return null;
433+
});
434+
435+
return resFut;
432436
} catch (Throwable t) {
433437
log.warn("Failed to send request [id=" + id + ", op=" + opCode + ", remoteAddress=" + cfg.getAddress() + "]: "
434438
+ t.getMessage(), t);
@@ -443,14 +447,28 @@ private <T> CompletableFuture<T> send(
443447
}
444448
}
445449

446-
private <T> CompletableFuture<T> completeAsync(
450+
private <T> void completeAsync(
447451
@Nullable PayloadReader<T> payloadReader,
448452
@Nullable CompletableFuture<PayloadInputChannel> notificationFut,
449-
ClientMessageUnpacker unpacker
453+
ClientMessageUnpacker unpacker,
454+
@Nullable Throwable err,
455+
CompletableFuture<T> resFut
450456
) {
451-
try {
452-
CompletableFuture<T> resFut = new CompletableFuture<>();
457+
if (err != null) {
458+
assert unpacker == null : "unpacker must be null if err is not null";
459+
460+
try {
461+
asyncContinuationExecutor.execute(() -> resFut.completeExceptionally(ViewUtils.ensurePublicException(err)));
462+
} catch (Throwable execError) {
463+
// Executor error, complete directly.
464+
execError.addSuppressed(err);
465+
resFut.completeExceptionally(ViewUtils.ensurePublicException(execError));
466+
}
467+
468+
return;
469+
}
453470

471+
try {
454472
// Use asyncContinuationExecutor explicitly to close unpacker if the executor throws.
455473
// With handleAsync et al we can't close the unpacker in that case.
456474
asyncContinuationExecutor.execute(() -> {
@@ -460,11 +478,11 @@ private <T> CompletableFuture<T> completeAsync(
460478
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
461479
}
462480
});
463-
464-
return resFut;
465-
} catch (Throwable t) {
481+
} catch (Throwable execErr) {
466482
unpacker.close();
467-
return failedFuture(ViewUtils.ensurePublicException(t));
483+
484+
// Executor error, complete directly.
485+
resFut.completeExceptionally(ViewUtils.ensurePublicException(execErr));
468486
}
469487
}
470488

@@ -669,16 +687,21 @@ private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver) throws Ign
669687
}
670688
});
671689

672-
return fut
673-
.thenCompose(unpacker -> completeAsync(r -> handshakeRes(r.in()), null, unpacker))
674-
.exceptionally(err -> {
675-
if (err instanceof TimeoutException || err.getCause() instanceof TimeoutException) {
676-
metrics.handshakesFailedTimeoutIncrement();
677-
throw new IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", endpoint(), err);
678-
}
679-
metrics.handshakesFailedIncrement();
680-
throw new IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), err);
681-
});
690+
CompletableFuture<Object> resFut = new CompletableFuture<>();
691+
692+
fut.handle((unpacker, err) -> {
693+
completeAsync(r -> handshakeRes(r.in()), null, unpacker, err, resFut);
694+
return null;
695+
});
696+
697+
return resFut.exceptionally(err -> {
698+
if (unwrapRootCause(err) instanceof TimeoutException) {
699+
metrics.handshakesFailedTimeoutIncrement();
700+
throw new IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", endpoint(), err);
701+
}
702+
metrics.handshakesFailedIncrement();
703+
throw new IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), err);
704+
});
682705
}
683706

684707
/**

0 commit comments

Comments
 (0)