Skip to content

Commit 63dcbb0

Browse files
authored
By pass cancellation when closing sinks (#117797) (#117871)
> **java.lang.AssertionError: Leftover exchanges ExchangeService{sinks=[veZSyrPATq2Sg83dtgK3Jg:700/3]} on node node_s4** I looked into the test failure described in #117253. The reason we don't clean up the exchange sink quickly is that, once a failure occurs, we cancel the request along with all its child requests. These exchange sinks will be cleaned up only after they become inactive, which by default takes 5 minutes. We could override the `esql.exchange.sink_inactive_interval` setting in the test to remove these exchange sinks faster. However, I think we should allow exchange requests that close exchange sinks to bypass cancellation, enabling quicker resource cleanup than the default inactive interval. Closes #117253
1 parent 0a14f27 commit 63dcbb0

File tree

6 files changed

+78
-27
lines changed

6 files changed

+78
-27
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeRequest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ public void writeTo(StreamOutput out) throws IOException {
4040
out.writeBoolean(sourcesFinished);
4141
}
4242

43+
@Override
44+
public TaskId getParentTask() {
45+
// Exchange requests with `sourcesFinished=true` complete the remote sink and return without blocking.
46+
// Masking the parent task allows these requests to bypass task cancellation, ensuring cleanup of the remote sink.
47+
// TODO: Maybe add a separate action/request for closing exchange sinks?
48+
if (sourcesFinished) {
49+
return TaskId.EMPTY_TASK_ID;
50+
}
51+
return super.getParentTask();
52+
}
53+
4354
/**
4455
* True if the {@link ExchangeSourceHandler} has enough input.
4556
* The corresponding {@link ExchangeSinkHandler} can drain pages and finish itself.
@@ -70,9 +81,9 @@ public int hashCode() {
7081

7182
@Override
7283
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
73-
if (parentTaskId.isSet() == false) {
74-
assert false : "ExchangeRequest must have a parent task";
75-
throw new IllegalStateException("ExchangeRequest must have a parent task");
84+
if (sourcesFinished == false && parentTaskId.isSet() == false) {
85+
assert false : "ExchangeRequest with sourcesFinished=false must have a parent task";
86+
throw new IllegalStateException("ExchangeRequest with sourcesFinished=false must have a parent task");
7687
}
7788
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
7889
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -314,28 +314,20 @@ static final class TransportRemoteSink implements RemoteSink {
314314
@Override
315315
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
316316
if (allSourcesFinished) {
317-
if (finished.compareAndSet(false, true)) {
318-
doFetchPageAsync(true, listener);
319-
} else {
320-
// already finished or promised
321-
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
322-
}
323-
} else {
324-
// already finished
325-
if (finished.get()) {
326-
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
327-
return;
328-
}
329-
doFetchPageAsync(false, ActionListener.wrap(r -> {
330-
if (r.finished()) {
331-
finished.set(true);
332-
}
333-
listener.onResponse(r);
334-
}, e -> {
335-
finished.set(true);
336-
listener.onFailure(e);
337-
}));
317+
close(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
318+
return;
319+
}
320+
// already finished
321+
if (finished.get()) {
322+
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
323+
return;
338324
}
325+
doFetchPageAsync(false, ActionListener.wrap(r -> {
326+
if (r.finished()) {
327+
finished.set(true);
328+
}
329+
listener.onResponse(r);
330+
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
339331
}
340332

341333
private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
@@ -361,6 +353,15 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
361353
}, responseExecutor)
362354
);
363355
}
356+
357+
@Override
358+
public void close(ActionListener<Void> listener) {
359+
if (finished.compareAndSet(false, true)) {
360+
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
361+
} else {
362+
listener.onResponse(null);
363+
}
364+
}
364365
}
365366

366367
// For testing

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,10 @@ void fetchPage() {
200200
void onSinkFailed(Exception e) {
201201
failure.unwrapAndCollect(e);
202202
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
203-
onSinkComplete();
203+
if (finished == false) {
204+
finished = true;
205+
remoteSink.close(ActionListener.running(outstandingSinks::finishInstance));
206+
}
204207
}
205208

206209
void onSinkComplete() {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,14 @@
1212
public interface RemoteSink {
1313

1414
void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);
15+
16+
default void close(ActionListener<Void> listener) {
17+
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
18+
try {
19+
r.close();
20+
} finally {
21+
l.onResponse(null);
22+
}
23+
}));
24+
}
1525
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.exchange;
9+
10+
import org.elasticsearch.tasks.TaskId;
11+
import org.elasticsearch.test.ESTestCase;
12+
13+
import static org.hamcrest.Matchers.equalTo;
14+
15+
public class ExchangeRequestTests extends ESTestCase {
16+
17+
public void testParentTask() {
18+
ExchangeRequest r1 = new ExchangeRequest("1", true);
19+
r1.setParentTask(new TaskId("node-1", 1));
20+
assertSame(TaskId.EMPTY_TASK_ID, r1.getParentTask());
21+
22+
ExchangeRequest r2 = new ExchangeRequest("1", false);
23+
r2.setParentTask(new TaskId("node-2", 2));
24+
assertTrue(r2.getParentTask().isSet());
25+
assertThat(r2.getParentTask(), equalTo((new TaskId("node-2", 2))));
26+
}
27+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ public void testConcurrentWithTransportActions() {
400400
}
401401
}
402402

403-
public void testFailToRespondPage() {
403+
public void testFailToRespondPage() throws Exception {
404404
Settings settings = Settings.builder().build();
405405
MockTransportService node0 = newTransportService();
406406
ExchangeService exchange0 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
@@ -459,7 +459,6 @@ public void sendResponse(TransportResponse transportResponse) {
459459
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
460460
assertNotNull(cause);
461461
assertThat(cause.getMessage(), equalTo("page is too large"));
462-
sinkHandler.onFailure(new RuntimeException(cause));
463462
sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS);
464463
}
465464
}

0 commit comments

Comments
 (0)