-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Aggressive release of shard contexts #129454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: Aggressive release of shard contexts #129454
Conversation
Keep better track of shard contexts using `RefCounted`, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter. This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler. An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes.
|
Hi @GalLalouche, I've created a changelog YAML for you. |
| public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { | ||
| SearchContext searchContext = super.createSearchContext(request, timeout); | ||
| onPutContext.accept(searchContext.readerContext()); | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done after confirming with @dnhatn that onPutContext here can be replaced with onCreateSearchContext. The try/catch clause was copy pasted from above.
| * A source operator whose output is the given tuple values. This operator produces pages | ||
| * with two Blocks. The returned pages preserve the order of values as given in the in initial list. | ||
| */ | ||
| public abstract class TupleAbstractBlockSourceOperator<T, S> extends AbstractBlockSourceOperator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've generalized the already existing TupleBlockSourceOperator to support more than just a tuple of two longs.
nik9000
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, but it feels right to me!
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardRefCounted.java
Show resolved
Hide resolved
...in/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Show resolved
Hide resolved
...in/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
Show resolved
Hide resolved
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java
Show resolved
Hide resolved
|
If you want me to review more let me know. Or I can wait until you remove |
Thanks @nik9000! I'm chasing down test failures right now, and will go over your comments in the meantime. I'll re-request a review when I'm done. |
0591cf6 to
b5f56f8
Compare
b5f56f8 to
1ad8eb4
Compare
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java
Outdated
Show resolved
Hide resolved
.../esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java
Show resolved
Hide resolved
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java
Show resolved
Hide resolved
.../esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java
Show resolved
Hide resolved
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java
Outdated
Show resolved
Hide resolved
…ticsearch into feature/shard_ref_count
c9bedcd to
c8af2c6
Compare
This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase. For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to #129454). There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.
This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase. For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to elastic#129454). There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.
This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase. For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to elastic#129454). There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.
Keep better track of shard contexts using
RefCounted, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter.This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler.
An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes.
Summary of main changes:
DocVectornow maintains aRefCountedinstance per shard.DocVectors (e.g.,LuceneSourceOperator,TopNOperator), can also holdRefCountedinstances, so they can pass them toDocVectorand also ensure contexts aren't released if they can still be potentially used later.Driver's main loop iteration (runSingleLoopIteration), now closes its operators even between different operator processing. This is extra aggressive, and was mostly done to improve testability.TopNOperatorand a new integration testEsqlTopNShardManagementIT, which uses the pausable plugin framework to check thatTopNOperatorreleases things as early as possible..