From 471ca56cc940624ca4e0c25cb30a1d939fded83c Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Mon, 7 Jul 2025 13:25:37 +0300 Subject: [PATCH] Track time on recent_search_load during DFSPhase Incorporate the DFS phase timing into the recent_search_load calculation. This metric represents the cumulative time spent during the Query, Fetch, and DFS phases, and is exposed via the _stats API. --- .../index/search/stats/ShardSearchStats.java | 5 ++ .../index/shard/SearchOperationListener.java | 55 +++++++++++++++++++ .../elasticsearch/search/SearchService.java | 13 ++++- .../search/stats/ShardSearchStatsTests.java | 34 ++++++++++++ 4 files changed, 106 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index d01ea2a184dbd..2c2865a9025af 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -117,6 +117,11 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { }); } + @Override + public void onDfsPhase(SearchContext searchContext, long tookInNanos) { + computeStats(searchContext, statsHolder -> { statsHolder.recentSearchLoad.addIncrement(tookInNanos, System.nanoTime()); }); + } + private void computeStats(SearchContext searchContext, Consumer consumer) { consumer.accept(totalStats); var groupStats = searchContext.groupStats(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index e8d8eec82eafa..9dc84090a610f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -65,6 +65,28 @@ default void onFailedFetchPhase(SearchContext searchContext) {} */ default void onFetchPhase(SearchContext searchContext, long tookInNanos) {} + /** + * Executed before the DFS phase is executed + * @param searchContext the current search context + */ + default void onPreDfsPhase(SearchContext searchContext) {} + + /** + * Executed after the query DFS successfully finished. + * Note: this is not invoked if the DFS phase execution failed. + * @param searchContext the current search context + * @param tookInNanos the number of nanoseconds the query execution took + * + * @see #onFailedQueryPhase(SearchContext) + */ + default void onDfsPhase(SearchContext searchContext, long tookInNanos) {} + + /** + * Executed if a dfs phased failed. + * @param searchContext the current search context + */ + default void onFailedDfsPhase(SearchContext searchContext) {} + /** * Executed when a new reader context was created * @param readerContext the created context @@ -182,6 +204,39 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) { } } + @Override + public void onPreDfsPhase(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onPreDfsPhase(searchContext); + } catch (Exception e) { + logger.warn(() -> "onPreDfsPhase listener [" + listener + "] failed", e); + } + } + } + + @Override + public void onFailedDfsPhase(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onFailedDfsPhase(searchContext); + } catch (Exception e) { + logger.warn(() -> "onFailedDfsPhase listener [" + listener + "] failed", e); + } + } + } + + @Override + public void onDfsPhase(SearchContext searchContext, long tookInNanos) { + for (SearchOperationListener listener : listeners) { + try { + listener.onDfsPhase(searchContext, tookInNanos); + } catch (Exception e) { + logger.warn(() -> "onDfsPhase listener [" + listener + "] failed", e); + } + } + } + @Override public void onNewReaderContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index d485b53e7e409..9c228468f1964 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -651,7 +651,18 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardT Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); SearchContext context = createContext(readerContext, request, task, ResultsType.DFS, false) ) { - DfsPhase.execute(context); + final long beforeQueryTime = System.nanoTime(); + var opsListener = context.indexShard().getSearchOperationListener(); + opsListener.onPreDfsPhase(context); + try { + DfsPhase.execute(context); + opsListener.onDfsPhase(context, System.nanoTime() - beforeQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedDfsPhase(context); + } + } return context.dfsResult(); } catch (Exception e) { logger.trace("Dfs phase failed", e); diff --git a/server/src/test/java/org/elasticsearch/search/stats/ShardSearchStatsTests.java b/server/src/test/java/org/elasticsearch/search/stats/ShardSearchStatsTests.java index 63298e5acb3be..1155b2b5be1ec 100644 --- a/server/src/test/java/org/elasticsearch/search/stats/ShardSearchStatsTests.java +++ b/server/src/test/java/org/elasticsearch/search/stats/ShardSearchStatsTests.java @@ -55,6 +55,40 @@ public void setup() { this.shardSearchStatsListener = new ShardSearchStats(searchStatsSettings); } + public void testDfsPhase() { + try (SearchContext sc = createSearchContext(false)) { + shardSearchStatsListener.onPreDfsPhase(sc); + shardSearchStatsListener.onDfsPhase(sc, TimeUnit.MILLISECONDS.toNanos(TEN_MILLIS)); + + SearchStats.Stats stats = shardSearchStatsListener.stats().getTotal(); + assertTrue(stats.getSearchLoadRate() > 0.0); + } + } + + public void testDfsPhase_withGroups() { + try (SearchContext sc = createSearchContext(false)) { + shardSearchStatsListener.onPreDfsPhase(sc); + shardSearchStatsListener.onDfsPhase(sc, TimeUnit.MILLISECONDS.toNanos(TEN_MILLIS)); + + SearchStats searchStats = shardSearchStatsListener.stats("_all"); + SearchStats.Stats stats = shardSearchStatsListener.stats().getTotal(); + assertTrue(stats.getSearchLoadRate() > 0.0); + + stats = Objects.requireNonNull(searchStats.getGroupStats()).get("group1"); + assertTrue(stats.getSearchLoadRate() > 0.0); + } + } + + public void testDfsPhase_Failure() { + try (SearchContext sc = createSearchContext(false)) { + shardSearchStatsListener.onPreDfsPhase(sc); + shardSearchStatsListener.onFailedDfsPhase(sc); + + SearchStats.Stats stats = shardSearchStatsListener.stats().getTotal(); + assertEquals(0.0, stats.getSearchLoadRate(), 0); + } + } + public void testQueryPhase() { try (SearchContext sc = createSearchContext(false)) { shardSearchStatsListener.onPreQueryPhase(sc);