77
88package org .elasticsearch .compute .operator .exchange ;
99
10- import org .elasticsearch .ExceptionsHelper ;
1110import org .elasticsearch .action .ActionListener ;
11+ import org .elasticsearch .action .ActionRunnable ;
1212import org .elasticsearch .action .support .RefCountingRunnable ;
1313import org .elasticsearch .action .support .SubscribableListener ;
1414import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
1515import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
1616import org .elasticsearch .compute .EsqlRefCountingListener ;
1717import org .elasticsearch .compute .data .Page ;
18- import org .elasticsearch .compute .operator .FailureCollector ;
1918import org .elasticsearch .compute .operator .IsBlockedResult ;
2019import org .elasticsearch .core .Releasable ;
20+ import org .elasticsearch .tasks .TaskCancelledException ;
2121
2222import java .util .List ;
2323import java .util .Map ;
@@ -38,10 +38,9 @@ public final class ExchangeSourceHandler {
3838
3939 private final PendingInstances outstandingSinks ;
4040 private final PendingInstances outstandingSources ;
41- // Collect failures that occur while fetching pages from the remote sink with `failFast=true`.
42- // The exchange source will stop fetching and abort as soon as any failure is added to this failure collector.
43- // The final failure collected will be notified to callers via the {@code completionListener}.
44- private final FailureCollector failure = new FailureCollector ();
41+ // Track if this exchange source should abort. There is no need to track the actual failure since the actual failure
42+ // should be notified via #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener).
43+ private volatile boolean aborted = false ;
4544
4645 private final AtomicInteger nextSinkId = new AtomicInteger ();
4746 private final Map <Integer , RemoteSink > remoteSinks = ConcurrentCollections .newConcurrentMap ();
@@ -52,7 +51,7 @@ public final class ExchangeSourceHandler {
5251 * @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
5352 * which could otherwise be allocated for other purposes.
5453 * @param fetchExecutor the executor used to fetch pages.
55- * @param completionListener a listener that will be notified when the exchange source handler fails or completes
54+ * @param completionListener a listener that will be notified when the exchange source handler completes
5655 */
5756 public ExchangeSourceHandler (int maxBufferSize , Executor fetchExecutor , ActionListener <Void > completionListener ) {
5857 this .buffer = new ExchangeBuffer (maxBufferSize );
@@ -63,14 +62,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
6362 this .outstandingSources = new PendingInstances (() -> finishEarly (true , ActionListener .running (closingSinks ::finishInstance )));
6463 buffer .addCompletionListener (ActionListener .running (() -> {
6564 final ActionListener <Void > listener = ActionListener .assertAtLeastOnce (completionListener );
66- try (RefCountingRunnable refs = new RefCountingRunnable (() -> {
67- final Exception e = failure .getFailure ();
68- if (e != null ) {
69- listener .onFailure (e );
70- } else {
71- listener .onResponse (null );
72- }
73- })) {
65+ try (RefCountingRunnable refs = new RefCountingRunnable (ActionRunnable .run (listener , this ::checkFailure ))) {
7466 closingSinks .completion .addListener (refs .acquireListener ());
7567 for (PendingInstances pending : List .of (outstandingSinks , outstandingSources )) {
7668 // Create an outstanding instance and then finish to complete the completionListener
@@ -83,20 +75,19 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
8375 }));
8476 }
8577
78+ private void checkFailure () {
79+ if (aborted ) {
80+ throw new TaskCancelledException ("remote sinks failed" );
81+ }
82+ }
83+
8684 private class ExchangeSourceImpl implements ExchangeSource {
8785 private boolean finished ;
8886
8987 ExchangeSourceImpl () {
9088 outstandingSources .trackNewInstance ();
9189 }
9290
93- private void checkFailure () {
94- Exception e = failure .getFailure ();
95- if (e != null ) {
96- throw ExceptionsHelper .convertToRuntime (e );
97- }
98- }
99-
10091 @ Override
10192 public Page pollPage () {
10293 checkFailure ();
@@ -201,7 +192,7 @@ void fetchPage() {
201192 while (loopControl .isRunning ()) {
202193 loopControl .exiting ();
203194 // finish other sinks if one of them failed or source no longer need pages.
204- boolean toFinishSinks = buffer .noMoreInputs () || failure . hasFailure () ;
195+ boolean toFinishSinks = buffer .noMoreInputs () || aborted ;
205196 remoteSink .fetchPageAsync (toFinishSinks , ActionListener .wrap (resp -> {
206197 Page page = resp .takePage ();
207198 if (page != null ) {
@@ -231,7 +222,7 @@ void fetchPage() {
231222
232223 void onSinkFailed (Exception e ) {
233224 if (failFast ) {
234- failure . unwrapAndCollect ( e ) ;
225+ aborted = true ;
235226 }
236227 buffer .waitForReading ().listener ().onResponse (null ); // resume the Driver if it is being blocked on reading
237228 if (finished == false ) {
@@ -260,12 +251,12 @@ void onSinkComplete() {
260251 * - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
261252 * Callers must handle these failures notified via {@code listener}.
262253 * - If {@code true}, failures from this remote sink will cause the exchange source to abort.
263- * Callers can safely ignore failures notified via this listener, as they are collected and
264- * reported by the exchange source.
254+ *
265255 * @param onPageFetched a callback that will be called when a page is fetched from the remote sink
266256 * @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
267257 * More clients reduce latency, but add overhead.
268- * @param listener a listener that will be notified when the sink fails or completes
258+ * @param listener a listener that will be notified when the sink fails or completes. Callers must handle failures notified via
259+ * this listener.
269260 * @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
270261 */
271262 public void addRemoteSink (
@@ -284,7 +275,7 @@ public void addRemoteSink(
284275 @ Override
285276 public void onFailure (Exception e ) {
286277 if (failFast ) {
287- failure . unwrapAndCollect ( e ) ;
278+ aborted = true ;
288279 }
289280 buffer .waitForReading ().listener ().onResponse (null ); // resume the Driver if it is being blocked on reading
290281 remoteSink .close (ActionListener .running (() -> sinkListener .onFailure (e )));
0 commit comments