@@ -748,13 +748,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
748748 ) {
749749 tracer .startTrace ("executeQueryPhase" , Map .of ());
750750 final long afterQueryTime ;
751- try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context )) {
751+ final long beforeQueryTime = System .nanoTime ();
752+ var opsListener = context .indexShard ().getSearchOperationListener ();
753+ opsListener .onPreQueryPhase (context );
754+ try {
752755 loadOrExecuteQueryPhase (request , context );
753756 if (context .queryResult ().hasSearchContext () == false && readerContext .singleSession ()) {
754757 freeReaderContext (readerContext .id ());
755758 }
756- afterQueryTime = executor .success ();
759+ afterQueryTime = System .nanoTime ();
760+ opsListener .onQueryPhase (context , afterQueryTime - beforeQueryTime );
761+ opsListener = null ;
757762 } finally {
763+ if (opsListener != null ) {
764+ opsListener .onFailedQueryPhase (context );
765+ }
758766 tracer .stopTrace (task );
759767 }
760768 if (request .numberOfShards () == 1 && (request .source () == null || request .source ().rankBuilder () == null )) {
@@ -812,15 +820,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
812820 }
813821
814822 private QueryFetchSearchResult executeFetchPhase (ReaderContext reader , SearchContext context , long afterQueryTime ) {
815- try (
816- Releasable scope = tracer .withScope (context .getTask ());
817- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context , true , afterQueryTime )
818- ) {
823+ var opsListener = context .indexShard ().getSearchOperationListener ();
824+ try (Releasable scope = tracer .withScope (context .getTask ());) {
825+ opsListener .onPreFetchPhase (context );
819826 fetchPhase .execute (context , shortcutDocIdsToLoad (context ), null );
820827 if (reader .singleSession ()) {
821828 freeReaderContext (reader .id ());
822829 }
823- executor .success ();
830+ opsListener .onFetchPhase (context , System .nanoTime () - afterQueryTime );
831+ opsListener = null ;
832+ } finally {
833+ if (opsListener != null ) {
834+ opsListener .onFailedFetchPhase (context );
835+ }
824836 }
825837 // This will incRef the QuerySearchResult when it gets created
826838 return QueryFetchSearchResult .of (context .queryResult (), context .fetchResult ());
@@ -844,14 +856,21 @@ public void executeQueryPhase(
844856 }
845857 runAsync (getExecutor (readerContext .indexShard ()), () -> {
846858 final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
847- try (
848- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );
849- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
850- ) {
851- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
852- processScroll (request , searchContext );
853- QueryPhase .execute (searchContext );
854- executor .success ();
859+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );) {
860+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
861+ final long beforeQueryTime = System .nanoTime ();
862+ opsListener .onPreQueryPhase (searchContext );
863+ try {
864+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
865+ processScroll (request , searchContext );
866+ QueryPhase .execute (searchContext );
867+ opsListener .onQueryPhase (searchContext , System .nanoTime () - beforeQueryTime );
868+ opsListener = null ;
869+ } finally {
870+ if (opsListener != null ) {
871+ opsListener .onFailedQueryPhase (searchContext );
872+ }
873+ }
855874 readerContext .setRescoreDocIds (searchContext .rescoreDocIds ());
856875 // ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
857876 return new ScrollQuerySearchResult (searchContext .queryResult (), searchContext .shardTarget ());
@@ -882,18 +901,26 @@ public void executeQueryPhase(
882901 // fork the execution in the search thread pool
883902 runAsync (getExecutor (readerContext .indexShard ()), () -> {
884903 readerContext .setAggregatedDfs (request .dfs ());
885- try (
886- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );
887- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
888- ) {
889- searchContext .searcher ().setAggregatedDfs (request .dfs ());
890- QueryPhase .execute (searchContext );
891- final QuerySearchResult queryResult = searchContext .queryResult ();
892- if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
893- // no hits, we can release the context since there will be no fetch phase
894- freeReaderContext (readerContext .id ());
904+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );) {
905+ final QuerySearchResult queryResult ;
906+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
907+ final long before = System .nanoTime ();
908+ opsListener .onPreQueryPhase (searchContext );
909+ try {
910+ searchContext .searcher ().setAggregatedDfs (request .dfs ());
911+ QueryPhase .execute (searchContext );
912+ queryResult = searchContext .queryResult ();
913+ if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
914+ // no hits, we can release the context since there will be no fetch phase
915+ freeReaderContext (readerContext .id ());
916+ }
917+ opsListener .onQueryPhase (searchContext , System .nanoTime () - before );
918+ opsListener = null ;
919+ } finally {
920+ if (opsListener != null ) {
921+ opsListener .onFailedQueryPhase (searchContext );
922+ }
895923 }
896- executor .success ();
897924 // Pass the rescoreDocIds to the queryResult to send them the coordinating node
898925 // and receive them back in the fetch phase.
899926 // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
@@ -942,16 +969,25 @@ public void executeFetchPhase(
942969 }
943970 runAsync (getExecutor (readerContext .indexShard ()), () -> {
944971 final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
945- try (
946- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );
947- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
948- ) {
949- searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
950- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
951- processScroll (request , searchContext );
952- searchContext .addQueryResult ();
953- QueryPhase .execute (searchContext );
954- final long afterQueryTime = executor .success ();
972+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );) {
973+ var opsListener = readerContext .indexShard ().getSearchOperationListener ();
974+ final long beforeQueryTime = System .nanoTime ();
975+ final long afterQueryTime ;
976+ try {
977+ opsListener .onPreQueryPhase (searchContext );
978+ searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
979+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
980+ processScroll (request , searchContext );
981+ searchContext .addQueryResult ();
982+ QueryPhase .execute (searchContext );
983+ afterQueryTime = System .nanoTime ();
984+ opsListener .onQueryPhase (searchContext , afterQueryTime - beforeQueryTime );
985+ opsListener = null ;
986+ } finally {
987+ if (opsListener != null ) {
988+ opsListener .onFailedQueryPhase (searchContext );
989+ }
990+ }
955991 QueryFetchSearchResult fetchSearchResult = executeFetchPhase (readerContext , searchContext , afterQueryTime );
956992 return new ScrollQueryFetchSearchResult (fetchSearchResult , searchContext .shardTarget ());
957993 } catch (Exception e ) {
@@ -975,18 +1011,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
9751011 }
9761012 searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (request .getRescoreDocIds ()));
9771013 searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (request .getAggregatedDfs ()));
978- try (
979- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (
980- searchContext ,
981- true ,
982- System .nanoTime ()
983- )
984- ) {
1014+ final long startTime = System .nanoTime ();
1015+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
1016+ opsListener .onPreFetchPhase (searchContext );
1017+ try {
9851018 fetchPhase .execute (searchContext , request .docIds (), request .getRankDocks ());
9861019 if (readerContext .singleSession ()) {
9871020 freeReaderContext (request .contextId ());
9881021 }
989- executor .success ();
1022+ opsListener .onFetchPhase (searchContext , System .nanoTime () - startTime );
1023+ opsListener = null ;
1024+ } finally {
1025+ if (opsListener != null ) {
1026+ opsListener .onFailedFetchPhase (searchContext );
1027+ }
9901028 }
9911029 var fetchResult = searchContext .fetchResult ();
9921030 // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
@@ -2007,58 +2045,4 @@ public AggregationReduceContext forFinalReduction() {
20072045 }
20082046 };
20092047 }
2010-
2011- /**
2012- * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
2013- * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
2014- */
2015- private static final class SearchOperationListenerExecutor implements AutoCloseable {
2016- private final SearchOperationListener listener ;
2017- private final SearchContext context ;
2018- private final long time ;
2019- private final boolean fetch ;
2020- private long afterQueryTime = -1 ;
2021- private boolean closed = false ;
2022-
2023- SearchOperationListenerExecutor (SearchContext context ) {
2024- this (context , false , System .nanoTime ());
2025- }
2026-
2027- SearchOperationListenerExecutor (SearchContext context , boolean fetch , long startTime ) {
2028- this .listener = context .indexShard ().getSearchOperationListener ();
2029- this .context = context ;
2030- time = startTime ;
2031- this .fetch = fetch ;
2032- if (fetch ) {
2033- listener .onPreFetchPhase (context );
2034- } else {
2035- listener .onPreQueryPhase (context );
2036- }
2037- }
2038-
2039- long success () {
2040- return afterQueryTime = System .nanoTime ();
2041- }
2042-
2043- @ Override
2044- public void close () {
2045- assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case" ;
2046- if (closed == false ) {
2047- closed = true ;
2048- if (afterQueryTime != -1 ) {
2049- if (fetch ) {
2050- listener .onFetchPhase (context , afterQueryTime - time );
2051- } else {
2052- listener .onQueryPhase (context , afterQueryTime - time );
2053- }
2054- } else {
2055- if (fetch ) {
2056- listener .onFailedFetchPhase (context );
2057- } else {
2058- listener .onFailedQueryPhase (context );
2059- }
2060- }
2061- }
2062- }
2063- }
20642048}
0 commit comments