Skip to content

Conversation

@idegtiarenko
Copy link
Contributor

This adds a possibility to limit amount of nodes a single query send requests at once.

This could be useful:

  • if we want to prevent a single query utilize the entire cluster resources at once (eg make it slower to allow other things happen in meanwhile)
  • could be a first step to allow certain queries to sample results from sub set of nodes and return quickly rather than wait for all results at once

nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);

if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
if (nodePermits.get(node).tryAcquire()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if instead we want to check all pending requests here before attempting a new one?
Or possibly implement a more complex strategy

@idegtiarenko idegtiarenko added >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL labels Feb 19, 2025
@idegtiarenko idegtiarenko marked this pull request as ready for review February 19, 2025 10:48
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

# Conflicts:
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds neat. I'm not deep enough in the request sender to give a good response about how right it is. It'd take a ton of reading. @dnhatn, can you comment?

if (exchangeSource.isCompleted()) {
nodeListener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part prevents us from sending a query to remaining data nodes if we collected enough results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There's one thing here: We'll "skip" it with onSkip(), but the Sender will still continue processing all shards. From what I see, it will continue calling this after every node finishes.

Should we instead pass something to the sender so it stops calling sendRequest()? I don't think it matters, computationally speaking, but it fells like we're doing "too much" when we could shortcircuit instead (?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we won't send more requests because we do:

                if (skipRemaining) {
                    DataNodeRequestSender.this.skipRemaining = true;
                }

So we'll only count the number of shards we skip and that's it. I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we won't send more requests because we do:

Correct, this was added today in: a5084ab

So we'll only count the number of shards we skip and that's it. I think.

The total skipped count consists of ones we skipped already (skippedShards) and count of shards we have not processed (remaining shards in pendingShardIds), please see

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these computations should not be expensive, I wonder if we should skip only here, not shortcutting in other places. The reason is that we might need to be more careful not to shortcut in other places when allow_partial_results=true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. It would also simplify the change. We can always add it back later if we see it is needed.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @idegtiarenko. I've some minor comments. The early termination part will be useful, but I couldn't find a use case for limiting concurrent nodes per cluster, except for testing.

}

public boolean isCompleted() {
return completed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use buffer.isFinished here instead adding a new variable?

this.esqlExecutor = esqlExecutor;
this.rootTask = rootTask;
this.allowPartialResults = allowPartialResults;
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we initialize the Semaphore for the -1 case with new Semaphore(Integer.MAX_VALUE)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a special case not to keep it at all if we have no limit (most of the cases). But I can do that as well.

# Conflicts:
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
#	x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
Copy link
Contributor

@ivancea ivancea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

/**
* The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster.
*/
public int maxConcurrentNodePerCluster() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public int maxConcurrentNodePerCluster() {
public int maxConcurrentNodesPerCluster() {

if (exchangeSource.isCompleted()) {
nodeListener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There's one thing here: We'll "skip" it with onSkip(), but the Sender will still continue processing all shards. From what I see, it will continue calling this after every node finishes.

Should we instead pass something to the sender so it stops calling sendRequest()? I don't think it matters, computationally speaking, but it fells like we're doing "too much" when we could shortcircuit instead (?)

});
safeGet(future);
assertThat(sent.size(), equalTo(5));
assertThat(maxConcurrentRequests.get(), equalTo(2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would randomize this 2 if possible.
At least, to test some edge cases, like 1 and 5.

assertThat(sent.size(), equalTo(2));// onResponse() + onSkip()
assertThat(response.totalShards, equalTo(5));
assertThat(response.successfulShards, equalTo(5));
assertThat(response.failedShards, equalTo(0));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should count skipped shards as successful or as org.elasticsearch.xpack.esql.plugin.ComputeResponse#skippedShards

@idegtiarenko idegtiarenko requested review from dnhatn and nik9000 March 10, 2025 12:10
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good. Let's get Nhat's 👍 too.

if (skipRemaining) {
DataNodeRequestSender.this.skipRemaining = true;
}
onAfter(List.of());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clear the shard failures for shards that are skipped; otherwise, we will still report failures.

if (exchangeSource.isCompleted()) {
nodeListener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these computations should not be expensive, I wonder if we should skip only here, not shortcutting in other places. The reason is that we might need to be more careful not to shortcut in other places when allow_partial_results=true.

@idegtiarenko idegtiarenko requested a review from dnhatn March 10, 2025 17:28
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @idegtiarenko

@idegtiarenko idegtiarenko merged commit 8d11dd2 into elastic:main Mar 12, 2025
17 checks passed
@idegtiarenko idegtiarenko deleted the limit_concurrent_node_requests branch March 12, 2025 09:02
albertzaharovits pushed a commit to albertzaharovits/elasticsearch that referenced this pull request Mar 13, 2025
jfreden pushed a commit to jfreden/elasticsearch that referenced this pull request Mar 13, 2025
idegtiarenko added a commit that referenced this pull request May 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.19.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants