Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
eec2039
Limit concurrent node requests
idegtiarenko Feb 18, 2025
d4850fc
upd
idegtiarenko Feb 18, 2025
ae96763
upd
idegtiarenko Feb 18, 2025
15d897f
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 4, 2025
666f588
do not send requests if source is completed
idegtiarenko Mar 4, 2025
34badf7
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
8c55e8a
upd
idegtiarenko Mar 5, 2025
b2a66a2
do not erase prior shard failures on skipping node
idegtiarenko Mar 5, 2025
11bd0f0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
53f7d60
upd
idegtiarenko Mar 5, 2025
1bf2715
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 7, 2025
3296cf0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
f0cc4ec
rename
idegtiarenko Mar 10, 2025
5ce4095
randomize concurrency
idegtiarenko Mar 10, 2025
a5084ab
skip remaining nodes
idegtiarenko Mar 10, 2025
50da087
count skips
idegtiarenko Mar 10, 2025
6809e58
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
1458611
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 10, 2025
02ddb1e
remove skipping
idegtiarenko Mar 10, 2025
4058ef1
clean non-fatal errors on shard skips
idegtiarenko Mar 10, 2025
e8ad22a
cleanup
idegtiarenko Mar 10, 2025
a70aff4
ensure every shard has at least one doc
idegtiarenko Mar 11, 2025
ed54538
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 11, 2025
7f47bde
debug flaky test
idegtiarenko Mar 11, 2025
f6868a1
upd
idegtiarenko Mar 12, 2025
f8a54f6
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
*/
public final class ExchangeSourceHandler {

private final ExchangeBuffer buffer;
private final Executor fetchExecutor;

Expand All @@ -56,6 +57,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
}

public boolean isFinished() {
return buffer.isFinished();
}

private void checkFailure() {
if (aborted) {
throw new TaskCancelledException("remote sinks failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -39,13 +40,15 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

/**
* Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
Expand Down Expand Up @@ -256,4 +259,32 @@ public void testLimitConcurrentShards() {
}
}
}

public void testCancelUnnecessaryRequests() {
assumeTrue("Requires pragmas", canUseQueryPragmas());
internalCluster().ensureAtLeastNumDataNodes(3);

var coordinatingNode = internalCluster().getNodeNames()[0];

var exchanges = new AtomicInteger(0);
var coordinatorNodeTransport = MockTransportService.getInstance(coordinatingNode);
coordinatorNodeTransport.addSendBehavior((connection, requestId, action, request, options) -> {
if (Objects.equals(action, ExchangeService.OPEN_EXCHANGE_ACTION_NAME)) {
exchanges.incrementAndGet();
}
connection.sendRequest(requestId, action, request, options);
});

var query = EsqlQueryRequest.syncEsqlQueryRequest();
query.query("from test-* | LIMIT 1");
query.pragmas(new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_NODES_PER_CLUSTER.getKey(), 1).build()));

try {
var result = safeExecute(client(coordinatingNode), EsqlQueryAction.INSTANCE, query);
assertThat(Iterables.size(result.rows()), equalTo(1L));
assertThat(exchanges.get(), lessThanOrEqualTo(1));// 0 if result is populated from coordinating node
} finally {
coordinatorNodeTransport.clearAllRules();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,25 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
final boolean allowPartialResults = configuration.allowPartialResults();
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
new DataNodeRequestSender(
transportService,
esqlExecutor,
parentTask,
configuration.allowPartialResults(),
configuration.pragmas().maxConcurrentNodesPerCluster()
) {
@Override
protected void sendRequest(
DiscoveryNode node,
List<ShardId> shardIds,
Map<Index, AliasFilter> aliasFilters,
NodeListener nodeListener
) {
if (exchangeSource.isFinished()) {
nodeListener.onSkip(true);
return;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part prevents us from sending a query to remaining data nodes if we collected enough results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There's one thing here: We'll "skip" it with onSkip(), but the Sender will still continue processing all shards. From what I see, it will continue calling this after every node finishes.

Should we instead pass something to the sender so it stops calling sendRequest()? I don't think it matters, computationally speaking, but it fells like we're doing "too much" when we could shortcircuit instead (?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we won't send more requests because we do:

                if (skipRemaining) {
                    DataNodeRequestSender.this.skipRemaining = true;
                }

So we'll only count the number of shards we skip and that's it. I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we won't send more requests because we do:

Correct, this was added today in: a5084ab

So we'll only count the number of shards we skip and that's it. I think.

The total skipped count consists of ones we skipped already (skippedShards) and count of shards we have not processed (remaining shards in pendingShardIds), please see

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these computations should not be expensive, I wonder if we should skip only here, not shortcutting in other places. The reason is that we might need to be more careful not to shortcut in other places when allow_partial_results=true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. It would also simplify the change. We can always add it back later if we see it is needed.


final AtomicLong pagesFetched = new AtomicLong();
var listener = ActionListener.wrap(nodeListener::onResponse, e -> nodeListener.onFailure(e, pagesFetched.get() > 0));
final Transport.Connection connection;
Expand All @@ -129,7 +139,7 @@ protected void sendRequest(
listener.delegateFailureAndWrap((l, unused) -> {
final Runnable onGroupFailure;
final CancellableTask groupTask;
if (allowPartialResults) {
if (configuration.allowPartialResults()) {
try {
groupTask = computeService.createGroupTask(
parentTask,
Expand All @@ -152,7 +162,7 @@ protected void sendRequest(
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
exchangeSource.addRemoteSink(
remoteSink,
allowPartialResults == false,
configuration.allowPartialResults() == false,
pagesFetched::incrementAndGet,
queryPragmas.concurrentExchangeClients(),
computeListener.acquireAvoid()
Expand Down Expand Up @@ -184,8 +194,7 @@ protected void sendRequest(
})
);
}
};
sender.startComputeOnDataNodes(
}.startComputeOnDataNodes(
clusterAlias,
concreteIndices,
originalIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ abstract class DataNodeRequestSender {
private final Executor esqlExecutor;
private final CancellableTask rootTask;
private final boolean allowPartialResults;
private final Semaphore concurrentRequests;
private final ReentrantLock sendingLock = new ReentrantLock();
private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicBoolean changed = new AtomicBoolean();
private boolean reportedFailure = false; // guarded by sendingLock
private volatile boolean skipRemaining = false;

DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) {
DataNodeRequestSender(
TransportService transportService,
Executor esqlExecutor,
CancellableTask rootTask,
boolean allowPartialResults,
int concurrentRequests
) {
this.transportService = transportService;
this.esqlExecutor = esqlExecutor;
this.rootTask = rootTask;
this.allowPartialResults = allowPartialResults;
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we initialize the Semaphore for the -1 case with new Semaphore(Integer.MAX_VALUE)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a special case not to keep it at all if we have no limit (most of the cases). But I can do that as well.

}

final void startComputeOnDataNodes(
Expand Down Expand Up @@ -126,7 +135,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
|| (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
reportedFailure = true;
reportFailures(computeListener);
} else {
} else if (skipRemaining == false) {
for (NodeRequest request : selectNodeRequests(targetShards)) {
sendOneNodeRequest(targetShards, computeListener, request);
}
Expand Down Expand Up @@ -159,6 +168,9 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
void onAfter(List<DriverProfile> profiles) {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}
trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(profiles);
}
Expand Down Expand Up @@ -187,6 +199,14 @@ public void onFailure(Exception e, boolean receivedData) {
}
onAfter(List.of());
}

@Override
public void onSkip(boolean skipRemaining) {
if (skipRemaining) {
DataNodeRequestSender.this.skipRemaining = true;
}
onAfter(List.of());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clear the shard failures for shards that are skipped; otherwise, we will still report failures.

}
});
}

Expand All @@ -196,6 +216,8 @@ interface NodeListener {
void onResponse(DataNodeComputeResponse response);

void onFailure(Exception e, boolean receivedData);

void onSkip(boolean skipRemaining);
}

private static Exception unwrapFailure(Exception e) {
Expand Down Expand Up @@ -256,6 +278,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
assert sendingLock.isHeldByCurrentThread();
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();

while (shardsIt.hasNext()) {
ShardId shardId = shardsIt.next();
ShardFailure failure = shardFailures.get(shardId);
Expand All @@ -265,31 +288,45 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
}
TargetShard shard = targetShards.getShard(shardId);
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
DiscoveryNode selectedNode = null;
while (nodesIt.hasNext()) {
DiscoveryNode node = nodesIt.next();
if (nodeToShardIds.containsKey(node) || nodePermits.get(node).tryAcquire()) {
List<ShardId> pendingRequest = nodeToShardIds.get(node);
if (pendingRequest != null) {
pendingRequest.add(shard.shardId);
nodesIt.remove();
shardsIt.remove();
selectedNode = node;
break;
}
}
if (selectedNode != null) {
nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);

if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
if (nodePermits.get(node).tryAcquire()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if instead we want to check all pending requests here before attempting a new one?
Or possibly implement a more complex strategy

pendingRequest = new ArrayList<>();
pendingRequest.add(shard.shardId);
nodeToShardIds.put(node, pendingRequest);

nodesIt.remove();
shardsIt.remove();

break;
} else if (concurrentRequests != null) {
concurrentRequests.release();
}
}
}
}

final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
for (var e : nodeToShardIds.entrySet()) {
List<ShardId> shardIds = e.getValue();
for (var entry : nodeToShardIds.entrySet()) {
var node = entry.getKey();
var shardIds = entry.getValue();
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
for (ShardId shardId : shardIds) {
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
if (aliasFilter != null) {
aliasFilters.put(shardId.getIndex(), aliasFilter);
}
}
nodeRequests.add(new NodeRequest(e.getKey(), shardIds, aliasFilters));
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
}
return nodeRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public final class QueryPragmas implements Writeable {
*/
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);

public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);

public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);

Expand Down Expand Up @@ -122,6 +125,13 @@ public int enrichMaxWorkers() {
return ENRICH_MAX_WORKERS.get(settings);
}

/**
* The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster.
*/
public int maxConcurrentNodesPerCluster() {
return MAX_CONCURRENT_NODES_PER_CLUSTER.get(settings);
}

/**
* The maximum number of shards can be executed concurrently on a single node by this query. This is a safeguard to avoid
* opening and holding many shards (equivalent to many file descriptors) or having too many field infos created by a single query.
Expand Down
Loading