Skip to content

Commit fadc752

Browse files
authored
Allow exchange source continue on failure (#117410)
Currently, when an exchange request fails, we stop fetching pages and abort the ExchangeSource. However, to support partial_results, we need to continue fetching pages from other remote sinks despite failures. This change introduces a failFast flag in ExchangeSource, which enables the process to continue in case of failures. By default, this flag is set to true but switches to false when allow_partial_results is enabled.
1 parent 0f5eb0c commit fadc752

File tree

7 files changed

+253
-89
lines changed

7 files changed

+253
-89
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
/**
4848
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
4949
* It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s
50-
* To connect exchange sources to exchange sinks, use the {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, int)} method.
50+
* To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}.
5151
*/
5252
public final class ExchangeService extends AbstractLifecycleComponent {
5353
// TODO: Make this a child action of the data node transport to ensure that exchanges
@@ -311,7 +311,7 @@ static final class TransportRemoteSink implements RemoteSink {
311311

312312
@Override
313313
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
314-
final long reservedBytes = estimatedPageSizeInBytes.get();
314+
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
315315
if (reservedBytes > 0) {
316316
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
317317
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public IsBlockedResult waitForWriting() {
9393
* @param sourceFinished if true, then this handler can finish as sources have enough pages.
9494
* @param listener the listener that will be notified when pages are ready or this handler is finished
9595
* @see RemoteSink
96-
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, int)
96+
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)
9797
*/
9898
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
9999
if (sourceFinished) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 67 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,54 @@
2424
/**
2525
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
2626
* and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method.
27-
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, int)}) method.
27+
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method.
2828
*
2929
* @see #createExchangeSource()
30-
* @see #addRemoteSink(RemoteSink, int)
30+
* @see #addRemoteSink(RemoteSink, boolean, int, ActionListener)
3131
*/
3232
public final class ExchangeSourceHandler {
3333
private final ExchangeBuffer buffer;
3434
private final Executor fetchExecutor;
3535

3636
private final PendingInstances outstandingSinks;
3737
private final PendingInstances outstandingSources;
38+
// Collect failures that occur while fetching pages from the remote sink with `failFast=true`.
39+
// The exchange source will stop fetching and abort as soon as any failure is added to this failure collector.
40+
// The final failure collected will be notified to callers via the {@code completionListener}.
3841
private final FailureCollector failure = new FailureCollector();
3942

40-
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
43+
/**
44+
* Creates a new ExchangeSourceHandler.
45+
*
46+
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
47+
* which could otherwise be allocated for other purposes.
48+
* @param fetchExecutor the executor used to fetch pages.
49+
* @param completionListener a listener that will be notified when the exchange source handler fails or completes
50+
*/
51+
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
4152
this.buffer = new ExchangeBuffer(maxBufferSize);
4253
this.fetchExecutor = fetchExecutor;
4354
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
4455
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
56+
buffer.addCompletionListener(ActionListener.running(() -> {
57+
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> {
58+
final Exception e = failure.getFailure();
59+
if (e != null) {
60+
l.onFailure(e);
61+
} else {
62+
l.onResponse(null);
63+
}
64+
});
65+
try (RefCountingListener refs = new RefCountingListener(listener)) {
66+
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
67+
// Create an outstanding instance and then finish to complete the completionListener
68+
// if we haven't registered any instances of exchange sinks or exchange sources before.
69+
pending.trackNewInstance();
70+
pending.completion.addListener(refs.acquire());
71+
pending.finishInstance();
72+
}
73+
}
74+
}));
4575
}
4676

4777
private class ExchangeSourceImpl implements ExchangeSource {
@@ -89,20 +119,6 @@ public int bufferSize() {
89119
}
90120
}
91121

92-
public void addCompletionListener(ActionListener<Void> listener) {
93-
buffer.addCompletionListener(ActionListener.running(() -> {
94-
try (RefCountingListener refs = new RefCountingListener(listener)) {
95-
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
96-
// Create an outstanding instance and then finish to complete the completionListener
97-
// if we haven't registered any instances of exchange sinks or exchange sources before.
98-
pending.trackNewInstance();
99-
pending.completion.addListener(refs.acquire());
100-
pending.finishInstance();
101-
}
102-
}
103-
}));
104-
}
105-
106122
/**
107123
* Create a new {@link ExchangeSource} for exchanging data
108124
*
@@ -159,10 +175,14 @@ void exited() {
159175
private final class RemoteSinkFetcher {
160176
private volatile boolean finished = false;
161177
private final RemoteSink remoteSink;
178+
private final boolean failFast;
179+
private final ActionListener<Void> completionListener;
162180

163-
RemoteSinkFetcher(RemoteSink remoteSink) {
181+
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener<Void> completionListener) {
164182
outstandingSinks.trackNewInstance();
165183
this.remoteSink = remoteSink;
184+
this.failFast = failFast;
185+
this.completionListener = completionListener;
166186
}
167187

168188
void fetchPage() {
@@ -198,15 +218,22 @@ void fetchPage() {
198218
}
199219

200220
void onSinkFailed(Exception e) {
201-
failure.unwrapAndCollect(e);
221+
if (failFast) {
222+
failure.unwrapAndCollect(e);
223+
}
202224
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
203-
onSinkComplete();
225+
if (finished == false) {
226+
finished = true;
227+
outstandingSinks.finishInstance();
228+
completionListener.onFailure(e);
229+
}
204230
}
205231

206232
void onSinkComplete() {
207233
if (finished == false) {
208234
finished = true;
209235
outstandingSinks.finishInstance();
236+
completionListener.onResponse(null);
210237
}
211238
}
212239
}
@@ -215,23 +242,36 @@ void onSinkComplete() {
215242
* Add a remote sink as a new data source of this handler. The handler will start fetching data from this remote sink intermediately.
216243
*
217244
* @param remoteSink the remote sink
218-
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages. More clients reduce latency,
219-
* but add overhead.
245+
* @param failFast determines how failures in this remote sink are handled:
246+
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
247+
* Callers must handle these failures notified via {@code listener}.
248+
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
249+
* Callers can safely ignore failures notified via this listener, as they are collected and
250+
* reported by the exchange source.
251+
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
252+
* More clients reduce latency, but add overhead.
253+
* @param listener a listener that will be notified when the sink fails or completes
220254
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
221255
*/
222-
public void addRemoteSink(RemoteSink remoteSink, int instances) {
256+
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
257+
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
223258
fetchExecutor.execute(new AbstractRunnable() {
224259
@Override
225260
public void onFailure(Exception e) {
226-
failure.unwrapAndCollect(e);
261+
if (failFast) {
262+
failure.unwrapAndCollect(e);
263+
}
227264
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
265+
sinkListener.onFailure(e);
228266
}
229267

230268
@Override
231269
protected void doRun() {
232-
for (int i = 0; i < instances; i++) {
233-
var fetcher = new RemoteSinkFetcher(remoteSink);
234-
fetcher.fetchPage();
270+
try (RefCountingListener refs = new RefCountingListener(sinkListener)) {
271+
for (int i = 0; i < instances; i++) {
272+
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
273+
fetcher.fetchPage();
274+
}
235275
}
236276
}
237277
});

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,19 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
209209
randomIntBetween(2, 10),
210210
threadPool.relativeTimeInMillisSupplier()
211211
);
212-
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
213-
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, 1);
212+
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
213+
randomIntBetween(1, 4),
214+
threadPool.executor(ESQL_TEST_EXECUTOR),
215+
ActionListener.noop()
216+
);
217+
sourceExchanger.addRemoteSink(
218+
sinkExchanger::fetchPageAsync,
219+
randomBoolean(),
220+
1,
221+
ActionListener.<Void>noop().delegateResponse((l, e) -> {
222+
throw new AssertionError("unexpected failure", e);
223+
})
224+
);
214225

215226
Iterator<? extends Operator> intermediateOperatorItr;
216227
int itrSize = (splitInput.size() * 3) + 3; // 3 inter ops per initial source drivers, and 3 per final

0 commit comments

Comments
 (0)