From 0a6a87deacdbb4b2f3e0e330b215a8f621dfc482 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 7 Mar 2025 09:51:58 +0100 Subject: [PATCH 1/4] Remove SearchOperationListenerExecutor abstraction This isn't even saving any lines of code and is a measurable source of both cache and branch-prediction misses on the hot and critical transport-thread path. => inline it --- .../elasticsearch/search/SearchService.java | 180 ++++++++---------- 1 file changed, 82 insertions(+), 98 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1156cd4b7bdf0..2c33365a77eda 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -760,13 +760,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella ) { tracer.startTrace("executeQueryPhase", Map.of()); final long afterQueryTime; - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final long beforeQueryTime = System.nanoTime(); + var opsListener = context.indexShard().getSearchOperationListener(); + opsListener.onPreQueryPhase(context); + try { loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) { freeReaderContext(readerContext.id()); } - afterQueryTime = executor.success(); + afterQueryTime = System.nanoTime(); + opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime); + opsListener = null; } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(context); + } tracer.stopTrace(task); } if (request.numberOfShards() == 1 && (request.source() == null || request.source().rankBuilder() == null)) { @@ -824,15 +832,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard } private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { - try ( - Releasable scope = tracer.withScope(context.getTask()); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime) - ) { + var opsListener = context.indexShard().getSearchOperationListener(); + try (Releasable scope = tracer.withScope(context.getTask());) { + opsListener.onPreFetchPhase(context); fetchPhase.execute(context, shortcutDocIdsToLoad(context), null); if (reader.singleSession()) { freeReaderContext(reader.id()); } - executor.success(); + opsListener.onFetchPhase(context, System.nanoTime() - afterQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onPreFetchPhase(context); + } } // This will incRef the QuerySearchResult when it gets created return QueryFetchSearchResult.of(context.queryResult(), context.fetchResult()); @@ -856,14 +868,21 @@ public void executeQueryPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); - processScroll(request, searchContext); - QueryPhase.execute(searchContext); - executor.success(); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) { + var opsListener = searchContext.indexShard().getSearchOperationListener(); + final long beforeQueryTime = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + try { + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, searchContext); + QueryPhase.execute(searchContext); + opsListener.onQueryPhase(searchContext, System.nanoTime() - beforeQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } + } readerContext.setRescoreDocIds(searchContext.rescoreDocIds()); // ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed. return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); @@ -894,18 +913,26 @@ public void executeQueryPhase( // fork the execution in the search thread pool runAsync(getExecutor(readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.searcher().setAggregatedDfs(request.dfs()); - QueryPhase.execute(searchContext); - final QuerySearchResult queryResult = searchContext.queryResult(); - if (queryResult.hasSearchContext() == false && readerContext.singleSession()) { - // no hits, we can release the context since there will be no fetch phase - freeReaderContext(readerContext.id()); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) { + final QuerySearchResult queryResult; + var opsListener = searchContext.indexShard().getSearchOperationListener(); + final long before = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + try { + searchContext.searcher().setAggregatedDfs(request.dfs()); + QueryPhase.execute(searchContext); + queryResult = searchContext.queryResult(); + if (queryResult.hasSearchContext() == false && readerContext.singleSession()) { + // no hits, we can release the context since there will be no fetch phase + freeReaderContext(readerContext.id()); + } + opsListener.onQueryPhase(searchContext, System.nanoTime() - before); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } } - executor.success(); // Pass the rescoreDocIds to the queryResult to send them the coordinating node // and receive them back in the fetch phase. // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node. @@ -954,16 +981,25 @@ public void executeFetchPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); - searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); - processScroll(request, searchContext); - searchContext.addQueryResult(); - QueryPhase.execute(searchContext); - final long afterQueryTime = executor.success(); + var opsListener = readerContext.indexShard().getSearchOperationListener(); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) { + final long afterQueryTime; + try { + final long beforeQueryTime = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, searchContext); + searchContext.addQueryResult(); + QueryPhase.execute(searchContext); + afterQueryTime = System.nanoTime(); + opsListener.onQueryPhase(searchContext, afterQueryTime - beforeQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } + } QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { @@ -987,18 +1023,20 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A } searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds())); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); - try ( - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor( - searchContext, - true, - System.nanoTime() - ) - ) { + final long startTime = System.nanoTime(); + var opsListener = searchContext.indexShard().getSearchOperationListener(); + opsListener.onPreFetchPhase(searchContext); + try { fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks()); if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } - executor.success(); + opsListener.onFetchPhase(searchContext, System.nanoTime() - startTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedFetchPhase(searchContext); + } } var fetchResult = searchContext.fetchResult(); // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block @@ -1972,58 +2010,4 @@ public AggregationReduceContext forFinalReduction() { } }; } - - /** - * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}. - * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}. - */ - private static final class SearchOperationListenerExecutor implements AutoCloseable { - private final SearchOperationListener listener; - private final SearchContext context; - private final long time; - private final boolean fetch; - private long afterQueryTime = -1; - private boolean closed = false; - - SearchOperationListenerExecutor(SearchContext context) { - this(context, false, System.nanoTime()); - } - - SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) { - this.listener = context.indexShard().getSearchOperationListener(); - this.context = context; - time = startTime; - this.fetch = fetch; - if (fetch) { - listener.onPreFetchPhase(context); - } else { - listener.onPreQueryPhase(context); - } - } - - long success() { - return afterQueryTime = System.nanoTime(); - } - - @Override - public void close() { - assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case"; - if (closed == false) { - closed = true; - if (afterQueryTime != -1) { - if (fetch) { - listener.onFetchPhase(context, afterQueryTime - time); - } else { - listener.onQueryPhase(context, afterQueryTime - time); - } - } else { - if (fetch) { - listener.onFailedFetchPhase(context); - } else { - listener.onFailedQueryPhase(context); - } - } - } - } - } } From bdebecad17bced4dcb604cf2e4feedb9800704e8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 7 Mar 2025 10:11:07 +0100 Subject: [PATCH 2/4] fix --- .../src/main/java/org/elasticsearch/search/SearchService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 2c33365a77eda..648352288bb53 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -843,7 +843,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon opsListener = null; } finally { if (opsListener != null) { - opsListener.onPreFetchPhase(context); + opsListener.onFailedFetchPhase(context); } } // This will incRef the QuerySearchResult when it gets created From 5aaceb030898fb4fa3277af2c6543f2335f94b5b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 7 Mar 2025 09:51:58 +0100 Subject: [PATCH 3/4] Remove SearchOperationListenerExecutor abstraction This isn't even saving any lines of code and is a measurable source of both cache and branch-prediction misses on the hot and critical transport-thread path. => inline it --- .../elasticsearch/search/SearchService.java | 180 ++++++++---------- 1 file changed, 82 insertions(+), 98 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1156cd4b7bdf0..648352288bb53 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -760,13 +760,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella ) { tracer.startTrace("executeQueryPhase", Map.of()); final long afterQueryTime; - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final long beforeQueryTime = System.nanoTime(); + var opsListener = context.indexShard().getSearchOperationListener(); + opsListener.onPreQueryPhase(context); + try { loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) { freeReaderContext(readerContext.id()); } - afterQueryTime = executor.success(); + afterQueryTime = System.nanoTime(); + opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime); + opsListener = null; } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(context); + } tracer.stopTrace(task); } if (request.numberOfShards() == 1 && (request.source() == null || request.source().rankBuilder() == null)) { @@ -824,15 +832,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard } private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { - try ( - Releasable scope = tracer.withScope(context.getTask()); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime) - ) { + var opsListener = context.indexShard().getSearchOperationListener(); + try (Releasable scope = tracer.withScope(context.getTask());) { + opsListener.onPreFetchPhase(context); fetchPhase.execute(context, shortcutDocIdsToLoad(context), null); if (reader.singleSession()) { freeReaderContext(reader.id()); } - executor.success(); + opsListener.onFetchPhase(context, System.nanoTime() - afterQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedFetchPhase(context); + } } // This will incRef the QuerySearchResult when it gets created return QueryFetchSearchResult.of(context.queryResult(), context.fetchResult()); @@ -856,14 +868,21 @@ public void executeQueryPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); - processScroll(request, searchContext); - QueryPhase.execute(searchContext); - executor.success(); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) { + var opsListener = searchContext.indexShard().getSearchOperationListener(); + final long beforeQueryTime = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + try { + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, searchContext); + QueryPhase.execute(searchContext); + opsListener.onQueryPhase(searchContext, System.nanoTime() - beforeQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } + } readerContext.setRescoreDocIds(searchContext.rescoreDocIds()); // ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed. return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); @@ -894,18 +913,26 @@ public void executeQueryPhase( // fork the execution in the search thread pool runAsync(getExecutor(readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.searcher().setAggregatedDfs(request.dfs()); - QueryPhase.execute(searchContext); - final QuerySearchResult queryResult = searchContext.queryResult(); - if (queryResult.hasSearchContext() == false && readerContext.singleSession()) { - // no hits, we can release the context since there will be no fetch phase - freeReaderContext(readerContext.id()); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) { + final QuerySearchResult queryResult; + var opsListener = searchContext.indexShard().getSearchOperationListener(); + final long before = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + try { + searchContext.searcher().setAggregatedDfs(request.dfs()); + QueryPhase.execute(searchContext); + queryResult = searchContext.queryResult(); + if (queryResult.hasSearchContext() == false && readerContext.singleSession()) { + // no hits, we can release the context since there will be no fetch phase + freeReaderContext(readerContext.id()); + } + opsListener.onQueryPhase(searchContext, System.nanoTime() - before); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } } - executor.success(); // Pass the rescoreDocIds to the queryResult to send them the coordinating node // and receive them back in the fetch phase. // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node. @@ -954,16 +981,25 @@ public void executeFetchPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try ( - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) - ) { - searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); - searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); - processScroll(request, searchContext); - searchContext.addQueryResult(); - QueryPhase.execute(searchContext); - final long afterQueryTime = executor.success(); + var opsListener = readerContext.indexShard().getSearchOperationListener(); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) { + final long afterQueryTime; + try { + final long beforeQueryTime = System.nanoTime(); + opsListener.onPreQueryPhase(searchContext); + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, searchContext); + searchContext.addQueryResult(); + QueryPhase.execute(searchContext); + afterQueryTime = System.nanoTime(); + opsListener.onQueryPhase(searchContext, afterQueryTime - beforeQueryTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedQueryPhase(searchContext); + } + } QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { @@ -987,18 +1023,20 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A } searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds())); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); - try ( - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor( - searchContext, - true, - System.nanoTime() - ) - ) { + final long startTime = System.nanoTime(); + var opsListener = searchContext.indexShard().getSearchOperationListener(); + opsListener.onPreFetchPhase(searchContext); + try { fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks()); if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } - executor.success(); + opsListener.onFetchPhase(searchContext, System.nanoTime() - startTime); + opsListener = null; + } finally { + if (opsListener != null) { + opsListener.onFailedFetchPhase(searchContext); + } } var fetchResult = searchContext.fetchResult(); // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block @@ -1972,58 +2010,4 @@ public AggregationReduceContext forFinalReduction() { } }; } - - /** - * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}. - * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}. - */ - private static final class SearchOperationListenerExecutor implements AutoCloseable { - private final SearchOperationListener listener; - private final SearchContext context; - private final long time; - private final boolean fetch; - private long afterQueryTime = -1; - private boolean closed = false; - - SearchOperationListenerExecutor(SearchContext context) { - this(context, false, System.nanoTime()); - } - - SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) { - this.listener = context.indexShard().getSearchOperationListener(); - this.context = context; - time = startTime; - this.fetch = fetch; - if (fetch) { - listener.onPreFetchPhase(context); - } else { - listener.onPreQueryPhase(context); - } - } - - long success() { - return afterQueryTime = System.nanoTime(); - } - - @Override - public void close() { - assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case"; - if (closed == false) { - closed = true; - if (afterQueryTime != -1) { - if (fetch) { - listener.onFetchPhase(context, afterQueryTime - time); - } else { - listener.onQueryPhase(context, afterQueryTime - time); - } - } else { - if (fetch) { - listener.onFailedFetchPhase(context); - } else { - listener.onFailedQueryPhase(context); - } - } - } - } - } } From 87b3c820d8c49e52047a947b3bc8fafd4c8e3582 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 10 Mar 2025 13:27:47 +0100 Subject: [PATCH 4/4] CR: move --- .../src/main/java/org/elasticsearch/search/SearchService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 648352288bb53..1c60803c55242 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -981,11 +981,11 @@ public void executeFetchPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - var opsListener = readerContext.indexShard().getSearchOperationListener(); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.FETCH, false);) { + var opsListener = readerContext.indexShard().getSearchOperationListener(); + final long beforeQueryTime = System.nanoTime(); final long afterQueryTime; try { - final long beforeQueryTime = System.nanoTime(); opsListener.onPreQueryPhase(searchContext); searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));