@@ -760,13 +760,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
760760 ) {
761761 tracer .startTrace ("executeQueryPhase" , Map .of ());
762762 final long afterQueryTime ;
763- try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context )) {
763+ final long beforeQueryTime = System .nanoTime ();
764+ var opsListener = context .indexShard ().getSearchOperationListener ();
765+ opsListener .onPreQueryPhase (context );
766+ try {
764767 loadOrExecuteQueryPhase (request , context );
765768 if (context .queryResult ().hasSearchContext () == false && readerContext .singleSession ()) {
766769 freeReaderContext (readerContext .id ());
767770 }
768- afterQueryTime = executor .success ();
771+ afterQueryTime = System .nanoTime ();
772+ opsListener .onQueryPhase (context , afterQueryTime - beforeQueryTime );
773+ opsListener = null ;
769774 } finally {
775+ if (opsListener != null ) {
776+ opsListener .onFailedQueryPhase (context );
777+ }
770778 tracer .stopTrace (task );
771779 }
772780 if (request .numberOfShards () == 1 && (request .source () == null || request .source ().rankBuilder () == null )) {
@@ -824,15 +832,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
824832 }
825833
826834 private QueryFetchSearchResult executeFetchPhase (ReaderContext reader , SearchContext context , long afterQueryTime ) {
827- try (
828- Releasable scope = tracer .withScope (context .getTask ());
829- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context , true , afterQueryTime )
830- ) {
835+ var opsListener = context .indexShard ().getSearchOperationListener ();
836+ try (Releasable scope = tracer .withScope (context .getTask ());) {
837+ opsListener .onPreFetchPhase (context );
831838 fetchPhase .execute (context , shortcutDocIdsToLoad (context ), null );
832839 if (reader .singleSession ()) {
833840 freeReaderContext (reader .id ());
834841 }
835- executor .success ();
842+ opsListener .onFetchPhase (context , System .nanoTime () - afterQueryTime );
843+ opsListener = null ;
844+ } finally {
845+ if (opsListener != null ) {
846+ opsListener .onFailedFetchPhase (context );
847+ }
836848 }
837849 // This will incRef the QuerySearchResult when it gets created
838850 return QueryFetchSearchResult .of (context .queryResult (), context .fetchResult ());
@@ -856,14 +868,21 @@ public void executeQueryPhase(
856868 }
857869 runAsync (getExecutor (readerContext .indexShard ()), () -> {
858870 final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
859- try (
860- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );
861- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
862- ) {
863- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
864- processScroll (request , searchContext );
865- QueryPhase .execute (searchContext );
866- executor .success ();
871+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );) {
872+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
873+ final long beforeQueryTime = System .nanoTime ();
874+ opsListener .onPreQueryPhase (searchContext );
875+ try {
876+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
877+ processScroll (request , searchContext );
878+ QueryPhase .execute (searchContext );
879+ opsListener .onQueryPhase (searchContext , System .nanoTime () - beforeQueryTime );
880+ opsListener = null ;
881+ } finally {
882+ if (opsListener != null ) {
883+ opsListener .onFailedQueryPhase (searchContext );
884+ }
885+ }
867886 readerContext .setRescoreDocIds (searchContext .rescoreDocIds ());
868887 // ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
869888 return new ScrollQuerySearchResult (searchContext .queryResult (), searchContext .shardTarget ());
@@ -894,18 +913,26 @@ public void executeQueryPhase(
894913 // fork the execution in the search thread pool
895914 runAsync (getExecutor (readerContext .indexShard ()), () -> {
896915 readerContext .setAggregatedDfs (request .dfs ());
897- try (
898- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );
899- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
900- ) {
901- searchContext .searcher ().setAggregatedDfs (request .dfs ());
902- QueryPhase .execute (searchContext );
903- final QuerySearchResult queryResult = searchContext .queryResult ();
904- if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
905- // no hits, we can release the context since there will be no fetch phase
906- freeReaderContext (readerContext .id ());
916+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );) {
917+ final QuerySearchResult queryResult ;
918+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
919+ final long before = System .nanoTime ();
920+ opsListener .onPreQueryPhase (searchContext );
921+ try {
922+ searchContext .searcher ().setAggregatedDfs (request .dfs ());
923+ QueryPhase .execute (searchContext );
924+ queryResult = searchContext .queryResult ();
925+ if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
926+ // no hits, we can release the context since there will be no fetch phase
927+ freeReaderContext (readerContext .id ());
928+ }
929+ opsListener .onQueryPhase (searchContext , System .nanoTime () - before );
930+ opsListener = null ;
931+ } finally {
932+ if (opsListener != null ) {
933+ opsListener .onFailedQueryPhase (searchContext );
934+ }
907935 }
908- executor .success ();
909936 // Pass the rescoreDocIds to the queryResult to send them the coordinating node
910937 // and receive them back in the fetch phase.
911938 // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
@@ -954,16 +981,25 @@ public void executeFetchPhase(
954981 }
955982 runAsync (getExecutor (readerContext .indexShard ()), () -> {
956983 final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
957- try (
958- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );
959- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
960- ) {
961- searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
962- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
963- processScroll (request , searchContext );
964- searchContext .addQueryResult ();
965- QueryPhase .execute (searchContext );
966- final long afterQueryTime = executor .success ();
984+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );) {
985+ var opsListener = readerContext .indexShard ().getSearchOperationListener ();
986+ final long beforeQueryTime = System .nanoTime ();
987+ final long afterQueryTime ;
988+ try {
989+ opsListener .onPreQueryPhase (searchContext );
990+ searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
991+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
992+ processScroll (request , searchContext );
993+ searchContext .addQueryResult ();
994+ QueryPhase .execute (searchContext );
995+ afterQueryTime = System .nanoTime ();
996+ opsListener .onQueryPhase (searchContext , afterQueryTime - beforeQueryTime );
997+ opsListener = null ;
998+ } finally {
999+ if (opsListener != null ) {
1000+ opsListener .onFailedQueryPhase (searchContext );
1001+ }
1002+ }
9671003 QueryFetchSearchResult fetchSearchResult = executeFetchPhase (readerContext , searchContext , afterQueryTime );
9681004 return new ScrollQueryFetchSearchResult (fetchSearchResult , searchContext .shardTarget ());
9691005 } catch (Exception e ) {
@@ -987,18 +1023,20 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
9871023 }
9881024 searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (request .getRescoreDocIds ()));
9891025 searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (request .getAggregatedDfs ()));
990- try (
991- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (
992- searchContext ,
993- true ,
994- System .nanoTime ()
995- )
996- ) {
1026+ final long startTime = System .nanoTime ();
1027+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
1028+ opsListener .onPreFetchPhase (searchContext );
1029+ try {
9971030 fetchPhase .execute (searchContext , request .docIds (), request .getRankDocks ());
9981031 if (readerContext .singleSession ()) {
9991032 freeReaderContext (request .contextId ());
10001033 }
1001- executor .success ();
1034+ opsListener .onFetchPhase (searchContext , System .nanoTime () - startTime );
1035+ opsListener = null ;
1036+ } finally {
1037+ if (opsListener != null ) {
1038+ opsListener .onFailedFetchPhase (searchContext );
1039+ }
10021040 }
10031041 var fetchResult = searchContext .fetchResult ();
10041042 // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
@@ -1972,58 +2010,4 @@ public AggregationReduceContext forFinalReduction() {
19722010 }
19732011 };
19742012 }
1975-
1976- /**
1977- * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
1978- * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
1979- */
1980- private static final class SearchOperationListenerExecutor implements AutoCloseable {
1981- private final SearchOperationListener listener ;
1982- private final SearchContext context ;
1983- private final long time ;
1984- private final boolean fetch ;
1985- private long afterQueryTime = -1 ;
1986- private boolean closed = false ;
1987-
1988- SearchOperationListenerExecutor (SearchContext context ) {
1989- this (context , false , System .nanoTime ());
1990- }
1991-
1992- SearchOperationListenerExecutor (SearchContext context , boolean fetch , long startTime ) {
1993- this .listener = context .indexShard ().getSearchOperationListener ();
1994- this .context = context ;
1995- time = startTime ;
1996- this .fetch = fetch ;
1997- if (fetch ) {
1998- listener .onPreFetchPhase (context );
1999- } else {
2000- listener .onPreQueryPhase (context );
2001- }
2002- }
2003-
2004- long success () {
2005- return afterQueryTime = System .nanoTime ();
2006- }
2007-
2008- @ Override
2009- public void close () {
2010- assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case" ;
2011- if (closed == false ) {
2012- closed = true ;
2013- if (afterQueryTime != -1 ) {
2014- if (fetch ) {
2015- listener .onFetchPhase (context , afterQueryTime - time );
2016- } else {
2017- listener .onQueryPhase (context , afterQueryTime - time );
2018- }
2019- } else {
2020- if (fetch ) {
2021- listener .onFailedFetchPhase (context );
2022- } else {
2023- listener .onFailedQueryPhase (context );
2024- }
2025- }
2026- }
2027- }
2028- }
20292013}
0 commit comments