@@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
167167 private final SearchResponseMetrics searchResponseMetrics ;
168168 private final Client client ;
169169 private final UsageService usageService ;
170- private final boolean collectTelemetry ;
170+ private final boolean collectCCSTelemetry ;
171171 private final TimeValue forceConnectTimeoutSecs ;
172172
173173 @ Inject
@@ -213,7 +213,7 @@ public TransportSearchAction(
213213 var settings = clusterService .getSettings ();
214214 this .defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE .get (settings );
215215 this .ccsCheckCompatibility = SearchService .CCS_VERSION_CHECK_SETTING .get (settings );
216- this .collectTelemetry = SearchService .CCS_COLLECT_TELEMETRY .get (settings );
216+ this .collectCCSTelemetry = SearchService .CCS_COLLECT_TELEMETRY .get (settings );
217217 this .searchResponseMetrics = searchResponseMetrics ;
218218 this .client = client ;
219219 this .usageService = usageService ;
@@ -333,14 +333,24 @@ public long buildTookInMillis() {
333333
334334 @ Override
335335 protected void doExecute (Task task , SearchRequest searchRequest , ActionListener <SearchResponse > listener ) {
336- executeRequest ((SearchTask ) task , searchRequest , new SearchResponseActionListener ( listener ) , AsyncSearchActionProvider ::new );
336+ executeRequest ((SearchTask ) task , searchRequest , listener , AsyncSearchActionProvider ::new , true );
337337 }
338338
339- void executeRequest (
339+ void executeOpenPit (
340340 SearchTask task ,
341341 SearchRequest original ,
342- ActionListener <SearchResponse > listener ,
342+ ActionListener <SearchResponse > originalListener ,
343343 Function <ActionListener <SearchResponse >, SearchPhaseProvider > searchPhaseProvider
344+ ) {
345+ executeRequest (task , original , originalListener , searchPhaseProvider , false );
346+ }
347+
348+ private void executeRequest (
349+ SearchTask task ,
350+ SearchRequest original ,
351+ ActionListener <SearchResponse > originalListener ,
352+ Function <ActionListener <SearchResponse >, SearchPhaseProvider > searchPhaseProvider ,
353+ boolean collectSearchTelemetry
344354 ) {
345355 final long relativeStartNanos = System .nanoTime ();
346356 final SearchTimeProvider timeProvider = new SearchTimeProvider (
@@ -372,48 +382,93 @@ void executeRequest(
372382 frozenIndexCheck (resolvedIndices );
373383 }
374384
375- ActionListener <SearchRequest > rewriteListener = listener .delegateFailureAndWrap ((delegate , rewritten ) -> {
385+ final SearchSourceBuilder source = original .source ();
386+ if (shouldOpenPIT (source )) {
387+ // disabling shard reordering for request
388+ original .setPreFilterShardSize (Integer .MAX_VALUE );
389+ openPIT (
390+ client ,
391+ original ,
392+ searchService .getDefaultKeepAliveInMillis (),
393+ originalListener .delegateFailureAndWrap ((delegate , resp ) -> {
394+ // We set the keep alive to -1 to indicate that we don't need the pit id in the response.
395+ // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
396+ source .pointInTimeBuilder (new PointInTimeBuilder (resp .getPointInTimeId ()).setKeepAlive (TimeValue .MINUS_ONE ));
397+ var pitListener = new ActionListener <SearchResponse >() {
398+ @ Override
399+ public void onResponse (SearchResponse response ) {
400+ // we need to close the PIT first so we delay the release of the response to after the closing
401+ response .incRef ();
402+ closePIT (
403+ client ,
404+ original .source ().pointInTimeBuilder (),
405+ () -> ActionListener .respondAndRelease (delegate , response )
406+ );
407+ }
408+
409+ @ Override
410+ public void onFailure (Exception e ) {
411+ closePIT (client , original .source ().pointInTimeBuilder (), () -> delegate .onFailure (e ));
412+ }
413+ };
414+ executeRequest (task , original , pitListener , searchPhaseProvider , true );
415+ })
416+ );
417+ return ;
418+ }
419+
420+ ActionListener <SearchRequest > rewriteListener = originalListener .delegateFailureAndWrap ((delegate , rewritten ) -> {
376421 if (ccsCheckCompatibility ) {
377422 checkCCSVersionCompatibility (rewritten );
378423 }
379424
380- if (resolvedIndices .getRemoteClusterIndices ().isEmpty ()) {
381- executeLocalSearch (
382- task ,
383- timeProvider ,
384- rewritten ,
385- resolvedIndices ,
386- projectState ,
387- SearchResponse .Clusters .EMPTY ,
388- searchPhaseProvider .apply (delegate )
389- );
390- } else {
391- if (delegate instanceof TelemetryListener tl ) {
392- tl .setRemotes (resolvedIndices .getRemoteClusterIndices ().size ());
425+ final ActionListener <SearchResponse > searchResponseActionListener ;
426+ if (collectSearchTelemetry ) {
427+ if (collectCCSTelemetry == false || resolvedIndices .getRemoteClusterIndices ().isEmpty ()) {
428+ searchResponseActionListener = new SearchTelemetryListener (delegate , searchResponseMetrics );
429+ } else {
430+ CCSUsage .Builder usageBuilder = new CCSUsage .Builder ();
431+ usageBuilder .setRemotesCount (resolvedIndices .getRemoteClusterIndices ().size ());
432+ usageBuilder .setClientFromTask (task );
393433 if (task .isAsync ()) {
394- tl .setFeature (CCSUsageTelemetry .ASYNC_FEATURE );
434+ usageBuilder .setFeature (CCSUsageTelemetry .ASYNC_FEATURE );
395435 }
396436 if (original .pointInTimeBuilder () != null ) {
397- tl .setFeature (CCSUsageTelemetry .PIT_FEATURE );
437+ usageBuilder .setFeature (CCSUsageTelemetry .PIT_FEATURE );
398438 }
399- tl .setClient (task );
400439 // Check if any of the index patterns are wildcard patterns
401440 var localIndices = resolvedIndices .getLocalIndices ();
402441 if (localIndices != null && Arrays .stream (localIndices .indices ()).anyMatch (Regex ::isSimpleMatchPattern )) {
403- tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
442+ usageBuilder .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
404443 }
405444 if (resolvedIndices .getRemoteClusterIndices ()
406445 .values ()
407446 .stream ()
408447 .anyMatch (indices -> Arrays .stream (indices .indices ()).anyMatch (Regex ::isSimpleMatchPattern ))) {
409- tl .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
448+ usageBuilder .setFeature (CCSUsageTelemetry .WILDCARD_FEATURE );
449+ }
450+ if (shouldMinimizeRoundtrips (rewritten )) {
451+ usageBuilder .setFeature (CCSUsageTelemetry .MRT_FEATURE );
410452 }
453+ searchResponseActionListener = new SearchTelemetryListener (delegate , searchResponseMetrics , usageService , usageBuilder );
411454 }
455+ } else {
456+ searchResponseActionListener = delegate ;
457+ }
458+
459+ if (resolvedIndices .getRemoteClusterIndices ().isEmpty ()) {
460+ executeLocalSearch (
461+ task ,
462+ timeProvider ,
463+ rewritten ,
464+ resolvedIndices ,
465+ projectState ,
466+ SearchResponse .Clusters .EMPTY ,
467+ searchPhaseProvider .apply (searchResponseActionListener )
468+ );
469+ } else {
412470 final TaskId parentTaskId = task .taskInfo (clusterService .localNode ().getId (), false ).taskId ();
413471 if (shouldMinimizeRoundtrips (rewritten )) {
414- if (delegate instanceof TelemetryListener tl ) {
415- tl .setFeature (CCSUsageTelemetry .MRT_FEATURE );
416- }
417472 final AggregationReduceContext .Builder aggregationReduceContextBuilder = rewritten .source () != null
418473 && rewritten .source ().aggregations () != null
419474 ? searchService .aggReduceContextBuilder (task ::isCancelled , rewritten .source ().aggregations ())
@@ -439,7 +494,7 @@ void executeRequest(
439494 aggregationReduceContextBuilder ,
440495 remoteClusterService ,
441496 threadPool ,
442- delegate ,
497+ searchResponseActionListener ,
443498 (r , l ) -> executeLocalSearch (
444499 task ,
445500 timeProvider ,
@@ -473,7 +528,7 @@ void executeRequest(
473528 clusters ,
474529 timeProvider ,
475530 transportService ,
476- delegate .delegateFailureAndWrap ((finalDelegate , searchShardsResponses ) -> {
531+ searchResponseActionListener .delegateFailureAndWrap ((finalDelegate , searchShardsResponses ) -> {
477532 final BiFunction <String , String , DiscoveryNode > clusterNodeLookup = getRemoteClusterNodeLookup (
478533 searchShardsResponses
479534 );
@@ -517,49 +572,20 @@ void executeRequest(
517572 }
518573 });
519574
520- final SearchSourceBuilder source = original .source ();
521575 final boolean isExplain = source != null && source .explain () != null && source .explain ();
522- if (shouldOpenPIT (source )) {
523- // disabling shard reordering for request
524- original .setPreFilterShardSize (Integer .MAX_VALUE );
525- openPIT (client , original , searchService .getDefaultKeepAliveInMillis (), listener .delegateFailureAndWrap ((delegate , resp ) -> {
526- // We set the keep alive to -1 to indicate that we don't need the pit id in the response.
527- // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
528- source .pointInTimeBuilder (new PointInTimeBuilder (resp .getPointInTimeId ()).setKeepAlive (TimeValue .MINUS_ONE ));
529- var pitListener = new SearchResponseActionListener (delegate ) {
530- @ Override
531- public void onResponse (SearchResponse response ) {
532- // we need to close the PIT first so we delay the release of the response to after the closing
533- response .incRef ();
534- closePIT (
535- client ,
536- original .source ().pointInTimeBuilder (),
537- () -> ActionListener .respondAndRelease (delegate , response )
538- );
539- }
540-
541- @ Override
542- public void onFailure (Exception e ) {
543- closePIT (client , original .source ().pointInTimeBuilder (), () -> delegate .onFailure (e ));
544- }
545- };
546- executeRequest (task , original , pitListener , searchPhaseProvider );
547- }));
548- } else {
549- Rewriteable .rewriteAndFetch (
550- original ,
551- searchService .getRewriteContext (
552- timeProvider ::absoluteStartMillis ,
553- clusterState .getMinTransportVersion (),
554- original .getLocalClusterAlias (),
555- resolvedIndices ,
556- original .pointInTimeBuilder (),
557- shouldMinimizeRoundtrips (original ),
558- isExplain
559- ),
560- rewriteListener
561- );
562- }
576+ Rewriteable .rewriteAndFetch (
577+ original ,
578+ searchService .getRewriteContext (
579+ timeProvider ::absoluteStartMillis ,
580+ clusterState .getMinTransportVersion (),
581+ original .getLocalClusterAlias (),
582+ resolvedIndices ,
583+ original .pointInTimeBuilder (),
584+ shouldMinimizeRoundtrips (original ),
585+ isExplain
586+ ),
587+ rewriteListener
588+ );
563589 }
564590
565591 /**
@@ -2001,49 +2027,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
20012027 .toArray (String []::new );
20022028 }
20032029 return concreteIndices ;
2004- }
2005-
2006- private interface TelemetryListener {
2007- void setRemotes (int count );
2008-
2009- void setFeature (String feature );
20102030
2011- void setClient (Task task );
20122031 }
20132032
2014- private class SearchResponseActionListener extends DelegatingActionListener <SearchResponse , SearchResponse >
2015- implements
2016- TelemetryListener {
2033+ private static class SearchTelemetryListener extends DelegatingActionListener <SearchResponse , SearchResponse > {
20172034 private final CCSUsage .Builder usageBuilder ;
2018-
2019- SearchResponseActionListener (ActionListener <SearchResponse > listener ) {
2035+ private final SearchResponseMetrics searchResponseMetrics ;
2036+ private final UsageService usageService ;
2037+ private final boolean collectCCSTelemetry ;
2038+
2039+ SearchTelemetryListener (
2040+ ActionListener <SearchResponse > listener ,
2041+ SearchResponseMetrics searchResponseMetrics ,
2042+ UsageService usageService ,
2043+ CCSUsage .Builder usageBuilder
2044+ ) {
20202045 super (listener );
2021- if (listener instanceof SearchResponseActionListener srListener ) {
2022- usageBuilder = srListener .usageBuilder ;
2023- } else {
2024- usageBuilder = new CCSUsage .Builder ();
2025- }
2026- }
2027-
2028- /**
2029- * Should we collect telemetry for this search?
2030- */
2031- private boolean collectTelemetry () {
2032- return collectTelemetry && usageBuilder .getRemotesCount () > 0 ;
2033- }
2034-
2035- public void setRemotes (int count ) {
2036- usageBuilder .setRemotesCount (count );
2037- }
2038-
2039- @ Override
2040- public void setFeature (String feature ) {
2041- usageBuilder .setFeature (feature );
2046+ this .searchResponseMetrics = searchResponseMetrics ;
2047+ this .collectCCSTelemetry = true ;
2048+ this .usageService = usageService ;
2049+ this .usageBuilder = usageBuilder ;
20422050 }
20432051
2044- @ Override
2045- public void setClient (Task task ) {
2046- usageBuilder .setClientFromTask (task );
2052+ SearchTelemetryListener (ActionListener <SearchResponse > listener , SearchResponseMetrics searchResponseMetrics ) {
2053+ super (listener );
2054+ this .searchResponseMetrics = searchResponseMetrics ;
2055+ this .collectCCSTelemetry = false ;
2056+ this .usageService = null ;
2057+ this .usageBuilder = null ;
20472058 }
20482059
20492060 @ Override
@@ -2069,7 +2080,7 @@ public void onResponse(SearchResponse searchResponse) {
20692080 }
20702081 searchResponseMetrics .incrementResponseCount (responseCountTotalStatus );
20712082
2072- if (collectTelemetry () ) {
2083+ if (collectCCSTelemetry ) {
20732084 extractCCSTelemetry (searchResponse );
20742085 recordTelemetry ();
20752086 }
@@ -2084,7 +2095,7 @@ public void onResponse(SearchResponse searchResponse) {
20842095 @ Override
20852096 public void onFailure (Exception e ) {
20862097 searchResponseMetrics .incrementResponseCount (SearchResponseMetrics .ResponseCountTotalStatus .FAILURE );
2087- if (collectTelemetry () ) {
2098+ if (collectCCSTelemetry ) {
20882099 usageBuilder .setFailure (e );
20892100 recordTelemetry ();
20902101 }
@@ -2109,8 +2120,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) {
21092120 usageBuilder .perClusterUsage (clusterAlias , cluster .getTook ());
21102121 }
21112122 }
2112-
21132123 }
2114-
21152124 }
21162125}
0 commit comments