Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
eec2039
Limit concurrent node requests
idegtiarenko Feb 18, 2025
d4850fc
upd
idegtiarenko Feb 18, 2025
ae96763
upd
idegtiarenko Feb 18, 2025
15d897f
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 4, 2025
666f588
do not send requests if source is completed
idegtiarenko Mar 4, 2025
34badf7
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
8c55e8a
upd
idegtiarenko Mar 5, 2025
b2a66a2
do not erase prior shard failures on skipping node
idegtiarenko Mar 5, 2025
11bd0f0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
53f7d60
upd
idegtiarenko Mar 5, 2025
1bf2715
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 7, 2025
3296cf0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
f0cc4ec
rename
idegtiarenko Mar 10, 2025
5ce4095
randomize concurrency
idegtiarenko Mar 10, 2025
a5084ab
skip remaining nodes
idegtiarenko Mar 10, 2025
50da087
count skips
idegtiarenko Mar 10, 2025
6809e58
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
1458611
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
02ddb1e
remove skipping
idegtiarenko Mar 10, 2025
4058ef1
clean non-fatal errors on shard skips
idegtiarenko Mar 10, 2025
e8ad22a
cleanup
idegtiarenko Mar 10, 2025
a70aff4
ensure every shard has at least one doc
idegtiarenko Mar 11, 2025
ed54538
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 11, 2025
7f47bde
debug flaky test
idegtiarenko Mar 11, 2025
f6868a1
upd
idegtiarenko Mar 12, 2025
f8a54f6
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
final boolean allowPartialResults = configuration.allowPartialResults();
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
new DataNodeRequestSender(
transportService,
esqlExecutor,
parentTask,
configuration.allowPartialResults(),
configuration.pragmas().maxConcurrentNodePerCluster()
) {
@Override
protected void sendRequest(
DiscoveryNode node,
Expand Down Expand Up @@ -129,7 +134,7 @@ protected void sendRequest(
listener.delegateFailureAndWrap((l, unused) -> {
final Runnable onGroupFailure;
final CancellableTask groupTask;
if (allowPartialResults) {
if (configuration.allowPartialResults()) {
groupTask = RemoteListenerGroup.createGroupTask(
transportService,
parentTask,
Expand All @@ -148,7 +153,7 @@ protected void sendRequest(
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
exchangeSource.addRemoteSink(
remoteSink,
allowPartialResults == false,
configuration.allowPartialResults() == false,
pagesFetched::incrementAndGet,
queryPragmas.concurrentExchangeClients(),
computeListener.acquireAvoid()
Expand Down Expand Up @@ -180,8 +185,7 @@ protected void sendRequest(
})
);
}
};
sender.startComputeOnDataNodes(
}.startComputeOnDataNodes(
clusterAlias,
concreteIndices,
originalIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,26 @@ abstract class DataNodeRequestSender {
private final Executor esqlExecutor;
private final CancellableTask rootTask;
private final boolean allowPartialResults;
private final Semaphore concurrentRequests;
private final ReentrantLock sendingLock = new ReentrantLock();
private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicBoolean changed = new AtomicBoolean();
private boolean reportedFailure = false; // guarded by sendingLock

DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) {
DataNodeRequestSender(
TransportService transportService,
Executor esqlExecutor,
CancellableTask rootTask,
boolean allowPartialResults,
int concurrentRequests
) {
this.transportService = transportService;
this.esqlExecutor = esqlExecutor;
this.rootTask = rootTask;
this.allowPartialResults = allowPartialResults;
this.concurrentRequests = new Semaphore(concurrentRequests);
}

final void startComputeOnDataNodes(
Expand Down Expand Up @@ -128,8 +136,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
reportedFailure = true;
reportFailures(computeListener);
} else {
var nodeRequests = selectNodeRequests(targetShards);
for (NodeRequest request : nodeRequests) {
for (NodeRequest request : selectNodeRequests(targetShards)) {
sendOneNodeRequest(targetShards, computeListener, request);
}
}
Expand Down Expand Up @@ -161,6 +168,7 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
void onAfter(List<DriverProfile> profiles) {
nodePermits.get(request.node).release();
concurrentRequests.release();
trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(profiles);
}
Expand Down Expand Up @@ -243,17 +251,11 @@ TargetShard getShard(ShardId shardId) {
/**
* (Remaining) allocated nodes of a given shard id and its alias filter
*/
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {

}

record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {

}
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {}

private record ShardFailure(boolean fatal, Exception failure) {
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {}

}
private record ShardFailure(boolean fatal, Exception failure) {}

/**
* Selects the next nodes to send requests to. Limits to at most one outstanding request per node.
Expand Down Expand Up @@ -287,17 +289,28 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);
}
}
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
for (var e : nodeToShardIds.entrySet()) {
List<ShardId> shardIds = e.getValue();
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
for (ShardId shardId : shardIds) {
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
if (aliasFilter != null) {
aliasFilters.put(shardId.getIndex(), aliasFilter);

var size = Math.min(concurrentRequests.availablePermits(), nodeToShardIds.size());
final List<NodeRequest> nodeRequests = new ArrayList<>(size);
for (var entry : nodeToShardIds.entrySet()) {
var node = entry.getKey();
var shardIds = entry.getValue();
if (concurrentRequests.tryAcquire()) {
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
for (ShardId shardId : shardIds) {
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
if (aliasFilter != null) {
aliasFilters.put(shardId.getIndex(), aliasFilter);
}
}
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
} else {
pendingShardIds.addAll(shardIds);
for (ShardId shardId : shardIds) {
targetShards.getShard(shardId).remainingNodes.add(node);
}
nodePermits.get(node).release();
}
nodeRequests.add(new NodeRequest(e.getKey(), shardIds, aliasFilters));
}
return nodeRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public final class QueryPragmas implements Writeable {
*/
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);

public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
Setting.intSetting("max_concurrent_nodes_per_cluster", 10, 1, 100);
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);

public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);

Expand Down Expand Up @@ -122,6 +125,13 @@ public int enrichMaxWorkers() {
return ENRICH_MAX_WORKERS.get(settings);
}

/**
* The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster.
*/
public int maxConcurrentNodePerCluster() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public int maxConcurrentNodePerCluster() {
public int maxConcurrentNodesPerCluster() {

return MAX_CONCURRENT_NODES_PER_CLUSTER.get(settings);
}

/**
* The maximum number of shards can be executed concurrently on a single node by this query. This is a safeguard to avoid
* opening and holding many shards (equivalent to many file descriptors) or having too many field infos created by a single query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -85,14 +86,15 @@ public void setThreadPool() {
}

@After
public void shutdownThreadPool() throws Exception {
public void shutdownThreadPool() {
terminate(threadPool);
}

public void testEmpty() {
var future = sendRequests(
List.of(),
randomBoolean(),
10,
(node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent")
);
var resp = safeGet(future);
Expand All @@ -107,10 +109,9 @@ public void testOnePass() {
targetShard(shard4, node2, node3)
);
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, randomBoolean(), 10, (node, shardIds, aliasFilters, listener) -> {
sent.add(new NodeRequest(node, shardIds, aliasFilters));
var resp = new DataNodeComputeResponse(List.of(), Map.of());
runWithDelay(() -> listener.onResponse(resp));
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
});
safeGet(future);
assertThat(sent.size(), equalTo(2));
Expand All @@ -120,15 +121,15 @@ public void testOnePass() {
public void testMissingShards() {
{
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
fail("expect no data-node request is sent when target shards are missing");
});
var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
assertThat(error.getMessage(), containsString("no shard copies found"));
}
{
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, true, 10, (node, shardIds, aliasFilters, listener) -> {
assertThat(shard3, not(in(shardIds)));
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
});
Expand All @@ -148,7 +149,7 @@ public void testRetryThenSuccess() {
targetShard(shard5, node1, node3, node2)
);
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, randomBoolean(), 10, (node, shardIds, aliasFilters, listener) -> {
sent.add(new NodeRequest(node, shardIds, aliasFilters));
Map<ShardId, Exception> failures = new HashMap<>();
if (node.equals(node1) && shardIds.contains(shard5)) {
Expand Down Expand Up @@ -180,7 +181,7 @@ public void testRetryButFail() {
targetShard(shard5, node1, node3, node2)
);
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
sent.add(new NodeRequest(node, shardIds, aliasFilters));
Map<ShardId, Exception> failures = new HashMap<>();
if (shardIds.contains(shard5)) {
Expand All @@ -206,7 +207,7 @@ public void testDoNotRetryOnRequestLevelFailure() {
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1));
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
AtomicBoolean failed = new AtomicBoolean();
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
sent.add(new NodeRequest(node, shardIds, aliasFilters));
if (node1.equals(node) && failed.compareAndSet(false, true)) {
runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
Expand All @@ -226,7 +227,7 @@ public void testAllowPartialResults() {
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2));
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
AtomicBoolean failed = new AtomicBoolean();
var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> {
var future = sendRequests(targetShards, true, 10, (node, shardIds, aliasFilters, listener) -> {
sent.add(new NodeRequest(node, shardIds, aliasFilters));
if (node1.equals(node) && failed.compareAndSet(false, true)) {
runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
Expand All @@ -244,6 +245,40 @@ public void testAllowPartialResults() {
assertThat(resp.successfulShards, equalTo(1));
}

public void testLimitConcurrentNodes() {
var targetShards = List.of(
targetShard(shard1, node1),
targetShard(shard2, node2),
targetShard(shard3, node3),
targetShard(shard4, node4),
targetShard(shard5, node5)
);

AtomicInteger maxConcurrentRequests = new AtomicInteger(0);
AtomicInteger concurrentRequests = new AtomicInteger(0);
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
var future = sendRequests(targetShards, randomBoolean(), 2, (node, shardIds, aliasFilters, listener) -> {
concurrentRequests.incrementAndGet();

while (true) {
var priorMax = maxConcurrentRequests.get();
var newMax = Math.max(priorMax, concurrentRequests.get());
if (newMax <= priorMax || maxConcurrentRequests.compareAndSet(priorMax, newMax)) {
break;
}
}

sent.add(new NodeRequest(node, shardIds, aliasFilters));
runWithDelay(() -> {
concurrentRequests.decrementAndGet();
listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
});
});
safeGet(future);
assertThat(sent.size(), equalTo(5));
assertThat(maxConcurrentRequests.get(), equalTo(2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would randomize this 2 if possible.
At least, to test some edge cases, like 1 and 5.

}

static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
}
Expand All @@ -268,6 +303,7 @@ void runWithDelay(Runnable runnable) {
PlainActionFuture<ComputeResponse> sendRequests(
List<DataNodeRequestSender.TargetShard> shards,
boolean allowPartialResults,
int concurrentRequests,
Sender sender
) {
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
Expand All @@ -281,7 +317,13 @@ PlainActionFuture<ComputeResponse> sendRequests(
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
);
DataNodeRequestSender requestSender = new DataNodeRequestSender(transportService, executor, task, allowPartialResults) {
DataNodeRequestSender requestSender = new DataNodeRequestSender(
transportService,
executor,
task,
allowPartialResults,
concurrentRequests
) {
@Override
void searchShards(
Task parentTask,
Expand Down