-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Limit concurrent node requests #122850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
idegtiarenko
merged 26 commits into
elastic:main
from
idegtiarenko:limit_concurrent_node_requests
Mar 12, 2025
Merged
Limit concurrent node requests #122850
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
eec2039
Limit concurrent node requests
idegtiarenko d4850fc
upd
idegtiarenko ae96763
upd
idegtiarenko 15d897f
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 666f588
do not send requests if source is completed
idegtiarenko 34badf7
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 8c55e8a
upd
idegtiarenko b2a66a2
do not erase prior shard failures on skipping node
idegtiarenko 11bd0f0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 53f7d60
upd
idegtiarenko 1bf2715
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 3296cf0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko f0cc4ec
rename
idegtiarenko 5ce4095
randomize concurrency
idegtiarenko a5084ab
skip remaining nodes
idegtiarenko 50da087
count skips
idegtiarenko 6809e58
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 1458611
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 02ddb1e
remove skipping
idegtiarenko 4058ef1
clean non-fatal errors on shard skips
idegtiarenko e8ad22a
cleanup
idegtiarenko a70aff4
ensure every shard has at least one doc
idegtiarenko ed54538
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko 7f47bde
debug flaky test
idegtiarenko f6868a1
upd
idegtiarenko f8a54f6
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -55,7 +55,10 @@ public final class QueryPragmas implements Writeable { | |||||
| */ | ||||||
| public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL); | ||||||
|
|
||||||
| public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100); | ||||||
| public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = // | ||||||
| Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100); | ||||||
| public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = // | ||||||
| Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100); | ||||||
idegtiarenko marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true); | ||||||
|
|
||||||
|
|
@@ -122,6 +125,13 @@ public int enrichMaxWorkers() { | |||||
| return ENRICH_MAX_WORKERS.get(settings); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster. | ||||||
| */ | ||||||
| public int maxConcurrentNodePerCluster() { | ||||||
|
||||||
| public int maxConcurrentNodePerCluster() { | |
| public int maxConcurrentNodesPerCluster() { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,7 @@ | |
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
|
|
||
|
|
@@ -85,14 +86,15 @@ public void setThreadPool() { | |
| } | ||
|
|
||
| @After | ||
| public void shutdownThreadPool() throws Exception { | ||
| public void shutdownThreadPool() { | ||
| terminate(threadPool); | ||
| } | ||
|
|
||
| public void testEmpty() { | ||
| var future = sendRequests( | ||
| List.of(), | ||
| randomBoolean(), | ||
| -1, | ||
| (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent") | ||
| ); | ||
| var resp = safeGet(future); | ||
|
|
@@ -107,10 +109,9 @@ public void testOnePass() { | |
| targetShard(shard4, node2, node3) | ||
| ); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { | ||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| var resp = new DataNodeComputeResponse(List.of(), Map.of()); | ||
| runWithDelay(() -> listener.onResponse(resp)); | ||
| runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); | ||
| }); | ||
| safeGet(future); | ||
| assertThat(sent.size(), equalTo(2)); | ||
|
|
@@ -120,15 +121,15 @@ public void testOnePass() { | |
| public void testMissingShards() { | ||
| { | ||
| var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); | ||
| var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { | ||
| fail("expect no data-node request is sent when target shards are missing"); | ||
| }); | ||
| var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); | ||
| assertThat(error.getMessage(), containsString("no shard copies found")); | ||
| } | ||
| { | ||
| var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); | ||
| var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> { | ||
| assertThat(shard3, not(in(shardIds))); | ||
| runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); | ||
| }); | ||
|
|
@@ -148,7 +149,7 @@ public void testRetryThenSuccess() { | |
| targetShard(shard5, node1, node3, node2) | ||
| ); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { | ||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| Map<ShardId, Exception> failures = new HashMap<>(); | ||
| if (node.equals(node1) && shardIds.contains(shard5)) { | ||
|
|
@@ -180,7 +181,7 @@ public void testRetryButFail() { | |
| targetShard(shard5, node1, node3, node2) | ||
| ); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { | ||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| Map<ShardId, Exception> failures = new HashMap<>(); | ||
| if (shardIds.contains(shard5)) { | ||
|
|
@@ -206,7 +207,7 @@ public void testDoNotRetryOnRequestLevelFailure() { | |
| var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1)); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| AtomicBoolean failed = new AtomicBoolean(); | ||
| var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { | ||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| if (node1.equals(node) && failed.compareAndSet(false, true)) { | ||
| runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); | ||
|
|
@@ -226,7 +227,7 @@ public void testAllowPartialResults() { | |
| var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2)); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| AtomicBoolean failed = new AtomicBoolean(); | ||
| var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { | ||
| var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> { | ||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| if (node1.equals(node) && failed.compareAndSet(false, true)) { | ||
| runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); | ||
|
|
@@ -244,6 +245,40 @@ public void testAllowPartialResults() { | |
| assertThat(resp.successfulShards, equalTo(1)); | ||
| } | ||
|
|
||
| public void testLimitConcurrentNodes() { | ||
| var targetShards = List.of( | ||
| targetShard(shard1, node1), | ||
| targetShard(shard2, node2), | ||
| targetShard(shard3, node3), | ||
| targetShard(shard4, node4), | ||
| targetShard(shard5, node5) | ||
| ); | ||
|
|
||
| AtomicInteger maxConcurrentRequests = new AtomicInteger(0); | ||
| AtomicInteger concurrentRequests = new AtomicInteger(0); | ||
| Queue<NodeRequest> sent = ConcurrentCollections.newQueue(); | ||
| var future = sendRequests(targetShards, randomBoolean(), 2, (node, shardIds, aliasFilters, listener) -> { | ||
| concurrentRequests.incrementAndGet(); | ||
|
|
||
| while (true) { | ||
| var priorMax = maxConcurrentRequests.get(); | ||
| var newMax = Math.max(priorMax, concurrentRequests.get()); | ||
| if (newMax <= priorMax || maxConcurrentRequests.compareAndSet(priorMax, newMax)) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| sent.add(new NodeRequest(node, shardIds, aliasFilters)); | ||
| runWithDelay(() -> { | ||
| concurrentRequests.decrementAndGet(); | ||
| listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); | ||
| }); | ||
| }); | ||
| safeGet(future); | ||
| assertThat(sent.size(), equalTo(5)); | ||
| assertThat(maxConcurrentRequests.get(), equalTo(2)); | ||
|
||
| } | ||
|
|
||
| static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { | ||
| return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); | ||
| } | ||
|
|
@@ -268,6 +303,7 @@ void runWithDelay(Runnable runnable) { | |
| PlainActionFuture<ComputeResponse> sendRequests( | ||
| List<DataNodeRequestSender.TargetShard> shards, | ||
| boolean allowPartialResults, | ||
| int concurrentRequests, | ||
| Sender sender | ||
| ) { | ||
| PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>(); | ||
|
|
@@ -281,7 +317,13 @@ PlainActionFuture<ComputeResponse> sendRequests( | |
| TaskId.EMPTY_TASK_ID, | ||
| Collections.emptyMap() | ||
| ); | ||
| DataNodeRequestSender requestSender = new DataNodeRequestSender(transportService, executor, task, allowPartialResults) { | ||
| DataNodeRequestSender requestSender = new DataNodeRequestSender( | ||
| transportService, | ||
| executor, | ||
| task, | ||
| allowPartialResults, | ||
| concurrentRequests | ||
| ) { | ||
| @Override | ||
| void searchShards( | ||
| Task parentTask, | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if instead we want to check all pending requests here before attempting a new one?
Or possibly implement a more complex strategy