Skip to content

Commit 7869fbe

Browse files
authored
Avoid cyclic exception in ExchangeSource (#121995) (#122054)
Since introducing the fail_fast (see #117410) option to remote sinks, the ExchangeSource can propagate failures that can lead to circular references. The issue occurs as follows: 1. remote-sink-1 fails with exception e1, and the failure collector collects e1. 2. remote-sink-2 fails with exception e2, and the failure collector collects e2. 3. The listener of remote-sink-2 propagates e2 before the listener of remote-sink-1 propagates e1. 4. The failure collector in ExchangeSource sees [e1, e2] and suppresses e2 to e1. The upstream sees [e2, e1] and suppresses e1 to e2, leading to a circular reference. With this change, we stop collecting failures in ExchangeSource. Labelled this non-issue for an unreleased bug. Relates #117410
1 parent 87b1068 commit 7869fbe

File tree

2 files changed

+52
-31
lines changed

2 files changed

+52
-31
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10-
import org.elasticsearch.ExceptionsHelper;
1110
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionRunnable;
1212
import org.elasticsearch.action.support.RefCountingRunnable;
1313
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1515
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1616
import org.elasticsearch.compute.EsqlRefCountingListener;
1717
import org.elasticsearch.compute.data.Page;
18-
import org.elasticsearch.compute.operator.FailureCollector;
1918
import org.elasticsearch.compute.operator.IsBlockedResult;
2019
import org.elasticsearch.core.Releasable;
20+
import org.elasticsearch.tasks.TaskCancelledException;
2121

2222
import java.util.List;
2323
import java.util.Map;
@@ -38,10 +38,9 @@ public final class ExchangeSourceHandler {
3838

3939
private final PendingInstances outstandingSinks;
4040
private final PendingInstances outstandingSources;
41-
// Collect failures that occur while fetching pages from the remote sink with `failFast=true`.
42-
// The exchange source will stop fetching and abort as soon as any failure is added to this failure collector.
43-
// The final failure collected will be notified to callers via the {@code completionListener}.
44-
private final FailureCollector failure = new FailureCollector();
41+
// Track if this exchange source should abort. There is no need to track the actual failure since the actual failure
42+
// should be notified via #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener).
43+
private volatile boolean aborted = false;
4544

4645
private final AtomicInteger nextSinkId = new AtomicInteger();
4746
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
@@ -52,7 +51,7 @@ public final class ExchangeSourceHandler {
5251
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
5352
* which could otherwise be allocated for other purposes.
5453
* @param fetchExecutor the executor used to fetch pages.
55-
* @param completionListener a listener that will be notified when the exchange source handler fails or completes
54+
* @param completionListener a listener that will be notified when the exchange source handler completes
5655
*/
5756
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
5857
this.buffer = new ExchangeBuffer(maxBufferSize);
@@ -63,14 +62,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
6362
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
6463
buffer.addCompletionListener(ActionListener.running(() -> {
6564
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
66-
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
67-
final Exception e = failure.getFailure();
68-
if (e != null) {
69-
listener.onFailure(e);
70-
} else {
71-
listener.onResponse(null);
72-
}
73-
})) {
65+
try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) {
7466
closingSinks.completion.addListener(refs.acquireListener());
7567
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
7668
// Create an outstanding instance and then finish to complete the completionListener
@@ -83,20 +75,19 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
8375
}));
8476
}
8577

78+
private void checkFailure() {
79+
if (aborted) {
80+
throw new TaskCancelledException("remote sinks failed");
81+
}
82+
}
83+
8684
private class ExchangeSourceImpl implements ExchangeSource {
8785
private boolean finished;
8886

8987
ExchangeSourceImpl() {
9088
outstandingSources.trackNewInstance();
9189
}
9290

93-
private void checkFailure() {
94-
Exception e = failure.getFailure();
95-
if (e != null) {
96-
throw ExceptionsHelper.convertToRuntime(e);
97-
}
98-
}
99-
10091
@Override
10192
public Page pollPage() {
10293
checkFailure();
@@ -201,7 +192,7 @@ void fetchPage() {
201192
while (loopControl.isRunning()) {
202193
loopControl.exiting();
203194
// finish other sinks if one of them failed or source no longer need pages.
204-
boolean toFinishSinks = buffer.noMoreInputs() || failure.hasFailure();
195+
boolean toFinishSinks = buffer.noMoreInputs() || aborted;
205196
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
206197
Page page = resp.takePage();
207198
if (page != null) {
@@ -231,7 +222,7 @@ void fetchPage() {
231222

232223
void onSinkFailed(Exception e) {
233224
if (failFast) {
234-
failure.unwrapAndCollect(e);
225+
aborted = true;
235226
}
236227
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
237228
if (finished == false) {
@@ -260,12 +251,12 @@ void onSinkComplete() {
260251
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
261252
* Callers must handle these failures notified via {@code listener}.
262253
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
263-
* Callers can safely ignore failures notified via this listener, as they are collected and
264-
* reported by the exchange source.
254+
*
265255
* @param onPageFetched a callback that will be called when a page is fetched from the remote sink
266256
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
267257
* More clients reduce latency, but add overhead.
268-
* @param listener a listener that will be notified when the sink fails or completes
258+
* @param listener a listener that will be notified when the sink fails or completes. Callers must handle failures notified via
259+
* this listener.
269260
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
270261
*/
271262
public void addRemoteSink(
@@ -284,7 +275,7 @@ public void addRemoteSink(
284275
@Override
285276
public void onFailure(Exception e) {
286277
if (failFast) {
287-
failure.unwrapAndCollect(e);
278+
aborted = true;
288279
}
289280
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
290281
remoteSink.close(ActionListener.running(() -> sinkListener.onFailure(e)));

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.ExceptionsHelper;
1112
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionListener;
@@ -16,13 +17,15 @@
1617
import org.elasticsearch.cluster.node.VersionInformation;
1718
import org.elasticsearch.common.breaker.CircuitBreaker;
1819
import org.elasticsearch.common.breaker.CircuitBreakingException;
20+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1921
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2022
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.common.unit.ByteSizeValue;
2224
import org.elasticsearch.common.util.MockBigArrays;
2325
import org.elasticsearch.common.util.PageCacheRecycler;
2426
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2527
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.compute.EsqlRefCountingListener;
2629
import org.elasticsearch.compute.data.BlockFactory;
2730
import org.elasticsearch.compute.data.BlockWritables;
2831
import org.elasticsearch.compute.data.IntBlock;
@@ -37,6 +40,7 @@
3740
import org.elasticsearch.core.TimeValue;
3841
import org.elasticsearch.tasks.Task;
3942
import org.elasticsearch.tasks.TaskCancellationService;
43+
import org.elasticsearch.tasks.TaskCancelledException;
4044
import org.elasticsearch.test.ESTestCase;
4145
import org.elasticsearch.test.transport.MockTransportService;
4246
import org.elasticsearch.test.transport.StubbableTransport;
@@ -69,6 +73,7 @@
6973
import static org.hamcrest.Matchers.equalTo;
7074
import static org.hamcrest.Matchers.greaterThan;
7175
import static org.hamcrest.Matchers.hasSize;
76+
import static org.hamcrest.Matchers.instanceOf;
7277

7378
public class ExchangeServiceTests extends ESTestCase {
7479

@@ -621,14 +626,15 @@ public void sendResponse(TransportResponse transportResponse) {
621626
);
622627
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
623628
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
629+
PlainActionFuture<Void> remoteSinkFuture = new PlainActionFuture<>();
624630
sourceHandler.addRemoteSink(
625631
exchange0.newRemoteSink(task, exchangeId, node0, connection),
626632
true,
627633
() -> {},
628634
randomIntBetween(1, 5),
629-
ActionListener.noop()
635+
remoteSinkFuture
630636
);
631-
Exception err = expectThrows(
637+
Exception driverException = expectThrows(
632638
Exception.class,
633639
() -> runConcurrentTest(
634640
maxSeqNo,
@@ -637,7 +643,9 @@ public void sendResponse(TransportResponse transportResponse) {
637643
() -> sinkHandler.createExchangeSink(() -> {})
638644
)
639645
);
640-
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
646+
assertThat(driverException, instanceOf(TaskCancelledException.class));
647+
var sinkException = expectThrows(Exception.class, remoteSinkFuture::actionGet);
648+
Throwable cause = ExceptionsHelper.unwrap(sinkException, IOException.class);
641649
assertNotNull(cause);
642650
assertThat(cause.getMessage(), equalTo("page is too large"));
643651
PlainActionFuture<Void> sinkCompletionFuture = new PlainActionFuture<>();
@@ -647,6 +655,28 @@ public void sendResponse(TransportResponse transportResponse) {
647655
}
648656
}
649657

658+
public void testNoCyclicException() throws Exception {
659+
PlainActionFuture<Void> future = new PlainActionFuture<>();
660+
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(future)) {
661+
var exchangeSourceHandler = new ExchangeSourceHandler(between(10, 100), threadPool.generic(), refs.acquire());
662+
int numSinks = between(5, 10);
663+
for (int i = 0; i < numSinks; i++) {
664+
RemoteSink remoteSink = (allSourcesFinished, listener) -> threadPool.schedule(
665+
() -> listener.onFailure(new IOException("simulated")),
666+
TimeValue.timeValueMillis(1),
667+
threadPool.generic()
668+
);
669+
exchangeSourceHandler.addRemoteSink(remoteSink, randomBoolean(), () -> {}, between(1, 3), refs.acquire());
670+
}
671+
}
672+
Exception err = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS));
673+
assertThat(ExceptionsHelper.unwrap(err, IOException.class).getMessage(), equalTo("simulated"));
674+
try (BytesStreamOutput output = new BytesStreamOutput()) {
675+
// ensure no cyclic exception
676+
ElasticsearchException.writeException(err, output);
677+
}
678+
}
679+
650680
private MockTransportService newTransportService() {
651681
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(ClusterModule.getNamedWriteables());
652682
namedWriteables.addAll(BlockWritables.getNamedWriteables());

0 commit comments

Comments
 (0)