Skip to content

Commit af59dcd

Browse files
authored
Limit thread queue during init in ExchangeSource (#117273) (#117286)
ES|QL doesn't work well with 500 clusters or clusters with 500 nodes. The reason is that we enqueue three tasks to the thread pool queue, which has a limit of 1000, during the initialization of the exchange for each target (cluster or node). This simple PR reduces it to one task. I'm considering using AsyncProcessor for these requests, but that will be a follow-up issue for later.
1 parent fe86e2c commit af59dcd

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -220,20 +220,21 @@ void onSinkComplete() {
220220
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
221221
*/
222222
public void addRemoteSink(RemoteSink remoteSink, int instances) {
223-
for (int i = 0; i < instances; i++) {
224-
var fetcher = new RemoteSinkFetcher(remoteSink);
225-
fetchExecutor.execute(new AbstractRunnable() {
226-
@Override
227-
public void onFailure(Exception e) {
228-
fetcher.onSinkFailed(e);
229-
}
223+
fetchExecutor.execute(new AbstractRunnable() {
224+
@Override
225+
public void onFailure(Exception e) {
226+
failure.unwrapAndCollect(e);
227+
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
228+
}
230229

231-
@Override
232-
protected void doRun() {
230+
@Override
231+
protected void doRun() {
232+
for (int i = 0; i < instances; i++) {
233+
var fetcher = new RemoteSinkFetcher(remoteSink);
233234
fetcher.fetchPage();
234235
}
235-
});
236-
}
236+
}
237+
});
237238
}
238239

239240
/**

0 commit comments

Comments
 (0)