Skip to content

Conversation

@piergm
Copy link
Member

@piergm piergm commented Feb 11, 2025

With this PR we introduce a way to track EMWA and total time spent executing tasks for each index in the search thread-pool.
We extended TaskExecutionTimeTrackingEsThreadPoolExecutor that already has logic to track globally (not per-index) EWMA and time spent executing tasks in the search thread pool to track on a per-index basis in TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor. We decided to extend this logic in order not to duplicate the time tracking logic.
We take care of new indices being tracked with the computeIfAbsent inside the trackExecutionTime method and take care or removing deleted indices with the cluster state listener SearchIndexTimeTrackingCleanupService

@drempapis drempapis requested a review from a team as a code owner March 7, 2025 13:35
@drempapis
Copy link
Contributor

drempapis commented Mar 7, 2025

Do we really need this when we already have org.elasticsearch.search.SearchService.SearchOperationListenerExecutor#SearchOperationListenerExecutor(org.elasticsearch.search.internal.SearchContext) which seems to do the exact same thing as all of this code does for reporting to APM? Shouldn't we move both things to the same codebase?

I mentioned this to Luca a couple minutes ago, we're already running into massive I-cache miss rates for this logic, this PR will make that situation even worse and as we already do time tracking per shard in one way we should be able to piggy back on that?

The code has been refactored based on Armin's suggestion to register a SearchOperationListener to track the execution time per index. To verify the functionality, the following test was performed locally. When deploying a serverless cluster (master/search/ML), adding three empty indices index_1, index_2, index_3, and using locust to benchmark the deployment by sending match_all request to the indices with a load of 60%, 30%, 10%.
For the reference, the code is the following

@task
    def benchmark_search_load(self):
        headers = {"Content-Type": "application/json"}
        query = {
            "query": {
                "match_all": {}
            }
        }
        index_request = "/{}/_search".format(self.weighted_random_index())

        with self.client.post(index_request, data=json.dumps(query), headers=headers, catch_response=True) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Failed! Status Code: {response.status_code}")


    def weighted_random_index(self):
        probabilities = [0.6, 0.3, 0.1]  # 60%, 30%, 10%
        indices = ['index_1', 'index_2', 'index_3']
        return random.choices(indices, weights=probabilities, k=1)[0]

The benchmark after running for a while is the following,
benchmark locust
On the ES side, we get a periodic print of the recorded execution time per index (in nanos).
tracking numbers
We also normalize the execution time per index to [0,1] with a total sum of 1, getting, as expected, a load that converges to the request percentage per index -> 0.6 / 0.3 / 0.1

@drempapis
Copy link
Contributor

drempapis commented Mar 7, 2025

I'm not sure about all this history appearing. To work on this issue, I did the following

git remote add matteo-fork [email protected]:piergm/elasticsearch.git
git fetch matteo-fork 
git checkout -b mp_search_load-per-index  matteo-fork/mp_search_load-per-index 
git checkout -b mp_searfh_load-per-index_update
git merge main mp_searfh_load-per-index_update 
git merge mp_searfh_load-per-index_update mp_search_load-per-index
git push matteo-fork HEAD:mp_search_load-per-index 

<method v="2" />
</configuration>
</component>
</component>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to revert this, but the newline persists there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's intelliJ's fault, try removing it from vim/nano (with IJ closed) or another editor then commit the change and it should work 😄

* @param tookInNanos the number of nanoseconds the query execution took
*/
default void onFailedQueryPhase(SearchContext searchContext) {}
default void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to track the execution time when a phase fails.

* @param indexName the name of the index
* @return the EWMA of the execution time for the index
*/
public double getLoadEMWAPerIndex(String indexName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EMWA is still under consideration if we need to calculate and export it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some more context here:
We are calculating load on a per-index basis, loads are then collected and summed up (TODO) in the master node. With this information we will need to calculate which of the indices where under the most load and act based on that. The idea is to then normalize the "global" index load and act on the normalized values.
That said the per-node EMWA is not really suitable to be summed across nodes in our opinion.
That's why this comment.


private static final Logger logger = LogManager.getLogger(ShardSearchPerIndexTimeTrackingMetrics.class);

private final ConcurrentHashMap<String, Tuple<LongAdder, ExponentiallyWeightedMovingAverage>> indexExecutionTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for saying it in such a straightforward manner, but do we really want to add more logic based on this class?

Especially on a per-shard basis, using the math in ExponentiallyWeightedMovingAverage seems questionable.

We calculate newValue = alpha * lastValue + (1 - alpha) * currentValue. In a large number of use-cases you may see the fetch and query times be an order of magnitude apart. So now, assuming the query always matches, we will essentially flap between two values constantly for a shard?

Why not just use the existing metrics we have in org.elasticsearch.index.search.stats.ShardSearchStats? EWMA makes no sense here, if anything isn't total query time and it's derivative what we care about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we left it in because we where not sure about it and I asked Dimi to leave a comment on the PR about this.

Copy link
Contributor

@drempapis drempapis Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, let's check how it can be adapted into the ShardSearchStats

Copy link
Contributor

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this given that we have ShardSearchStats already. If we miss any metric we should add it to that thing shouldn't we?

@drempapis drempapis closed this May 21, 2025
@javanna javanna removed the v9.1.0 label May 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>feature :Search Foundations/Search Catch all for Search Foundations Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants