Skip to content

Commit 6aeb313

Browse files
committed
Avoid cyclic exception in ExchangeSource
1 parent 144ff0c commit 6aeb313

File tree

2 files changed

+44
-28
lines changed

2 files changed

+44
-28
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10-
import org.elasticsearch.ExceptionsHelper;
1110
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionRunnable;
1212
import org.elasticsearch.action.support.RefCountingRunnable;
1313
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1515
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1616
import org.elasticsearch.compute.EsqlRefCountingListener;
1717
import org.elasticsearch.compute.data.Page;
18-
import org.elasticsearch.compute.operator.FailureCollector;
1918
import org.elasticsearch.compute.operator.IsBlockedResult;
2019
import org.elasticsearch.core.Releasable;
20+
import org.elasticsearch.tasks.TaskCancelledException;
2121

2222
import java.util.List;
2323
import java.util.Map;
@@ -38,10 +38,9 @@ public final class ExchangeSourceHandler {
3838

3939
private final PendingInstances outstandingSinks;
4040
private final PendingInstances outstandingSources;
41-
// Collect failures that occur while fetching pages from the remote sink with `failFast=true`.
42-
// The exchange source will stop fetching and abort as soon as any failure is added to this failure collector.
43-
// The final failure collected will be notified to callers via the {@code completionListener}.
44-
private final FailureCollector failure = new FailureCollector();
41+
// Track if this exchange source should abort. There is no need to track the actual failure since the actual failure
42+
// should be notified via #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener).
43+
private volatile boolean aborted = false;
4544

4645
private final AtomicInteger nextSinkId = new AtomicInteger();
4746
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
@@ -52,7 +51,7 @@ public final class ExchangeSourceHandler {
5251
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
5352
* which could otherwise be allocated for other purposes.
5453
* @param fetchExecutor the executor used to fetch pages.
55-
* @param completionListener a listener that will be notified when the exchange source handler fails or completes
54+
* @param completionListener a listener that will be notified when the exchange source handler completes
5655
*/
5756
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
5857
this.buffer = new ExchangeBuffer(maxBufferSize);
@@ -63,14 +62,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
6362
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
6463
buffer.addCompletionListener(ActionListener.running(() -> {
6564
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
66-
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
67-
final Exception e = failure.getFailure();
68-
if (e != null) {
69-
listener.onFailure(e);
70-
} else {
71-
listener.onResponse(null);
72-
}
73-
})) {
65+
try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) {
7466
closingSinks.completion.addListener(refs.acquireListener());
7567
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
7668
// Create an outstanding instance and then finish to complete the completionListener
@@ -83,20 +75,19 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
8375
}));
8476
}
8577

78+
private void checkFailure() {
79+
if (aborted) {
80+
throw new TaskCancelledException("remote sinks failed");
81+
}
82+
}
83+
8684
private class ExchangeSourceImpl implements ExchangeSource {
8785
private boolean finished;
8886

8987
ExchangeSourceImpl() {
9088
outstandingSources.trackNewInstance();
9189
}
9290

93-
private void checkFailure() {
94-
Exception e = failure.getFailure();
95-
if (e != null) {
96-
throw ExceptionsHelper.convertToRuntime(e);
97-
}
98-
}
99-
10091
@Override
10192
public Page pollPage() {
10293
checkFailure();
@@ -201,7 +192,7 @@ void fetchPage() {
201192
while (loopControl.isRunning()) {
202193
loopControl.exiting();
203194
// finish other sinks if one of them failed or source no longer need pages.
204-
boolean toFinishSinks = buffer.noMoreInputs() || failure.hasFailure();
195+
boolean toFinishSinks = buffer.noMoreInputs() || aborted;
205196
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
206197
Page page = resp.takePage();
207198
if (page != null) {
@@ -231,7 +222,7 @@ void fetchPage() {
231222

232223
void onSinkFailed(Exception e) {
233224
if (failFast) {
234-
failure.unwrapAndCollect(e);
225+
aborted = true;
235226
}
236227
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
237228
if (finished == false) {
@@ -260,12 +251,12 @@ void onSinkComplete() {
260251
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
261252
* Callers must handle these failures notified via {@code listener}.
262253
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
263-
* Callers can safely ignore failures notified via this listener, as they are collected and
264-
* reported by the exchange source.
254+
*
265255
* @param onPageFetched a callback that will be called when a page is fetched from the remote sink
266256
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
267257
* More clients reduce latency, but add overhead.
268-
* @param listener a listener that will be notified when the sink fails or completes
258+
* @param listener a listener that will be notified when the sink fails or completes. Callers must handle failures notified via
259+
* this listener.
269260
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
270261
*/
271262
public void addRemoteSink(
@@ -284,7 +275,7 @@ public void addRemoteSink(
284275
@Override
285276
public void onFailure(Exception e) {
286277
if (failFast) {
287-
failure.unwrapAndCollect(e);
278+
aborted = true;
288279
}
289280
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
290281
remoteSink.close(ActionListener.running(() -> sinkListener.onFailure(e)));

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.ExceptionsHelper;
1112
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionListener;
@@ -16,13 +17,15 @@
1617
import org.elasticsearch.cluster.node.VersionInformation;
1718
import org.elasticsearch.common.breaker.CircuitBreaker;
1819
import org.elasticsearch.common.breaker.CircuitBreakingException;
20+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1921
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2022
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.common.unit.ByteSizeValue;
2224
import org.elasticsearch.common.util.MockBigArrays;
2325
import org.elasticsearch.common.util.PageCacheRecycler;
2426
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2527
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.compute.EsqlRefCountingListener;
2629
import org.elasticsearch.compute.data.BlockFactory;
2730
import org.elasticsearch.compute.data.BlockWritables;
2831
import org.elasticsearch.compute.data.IntBlock;
@@ -649,6 +652,28 @@ public void sendResponse(TransportResponse transportResponse) {
649652
}
650653
}
651654

655+
public void testNoCyclicException() throws Exception {
656+
PlainActionFuture<Void> future = new PlainActionFuture<>();
657+
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(future)) {
658+
var exchangeSourceHandler = new ExchangeSourceHandler(between(10, 100), threadPool.generic(), refs.acquire());
659+
int numSinks = between(5, 10);
660+
for (int i = 0; i < numSinks; i++) {
661+
RemoteSink remoteSink = (allSourcesFinished, listener) -> threadPool.schedule(
662+
() -> listener.onFailure(new IOException("simulated")),
663+
TimeValue.timeValueMillis(1),
664+
threadPool.generic()
665+
);
666+
exchangeSourceHandler.addRemoteSink(remoteSink, randomBoolean(), () -> {}, between(1, 3), refs.acquire());
667+
}
668+
}
669+
Exception err = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS));
670+
assertThat(ExceptionsHelper.unwrap(err, IOException.class).getMessage(), equalTo("simulated"));
671+
try (BytesStreamOutput output = new BytesStreamOutput()) {
672+
// ensure no cyclic exception
673+
ElasticsearchException.writeException(err, output);
674+
}
675+
}
676+
652677
private MockTransportService newTransportService() {
653678
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(ClusterModule.getNamedWriteables());
654679
namedWriteables.addAll(BlockWritables.getNamedWriteables());

0 commit comments

Comments
 (0)