@@ -391,10 +391,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
391391 return request ;
392392 }
393393
394- private static boolean isPartOfPIT (SearchRequest request , ShardSearchContextId contextId ) {
394+ private static boolean isPartOfPIT (
395+ SearchRequest request ,
396+ ShardSearchContextId contextId ,
397+ NamedWriteableRegistry namedWriteableRegistry
398+ ) {
395399 final PointInTimeBuilder pointInTimeBuilder = request .pointInTimeBuilder ();
396400 if (pointInTimeBuilder != null ) {
397- return request .pointInTimeBuilder ().getSearchContextId (null ).contains (contextId );
401+ return request .pointInTimeBuilder ().getSearchContextId (namedWriteableRegistry ).contains (contextId );
398402 } else {
399403 return false ;
400404 }
@@ -546,7 +550,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
546550 static void registerNodeSearchAction (
547551 SearchTransportService searchTransportService ,
548552 SearchService searchService ,
549- SearchPhaseController searchPhaseController
553+ SearchPhaseController searchPhaseController ,
554+ NamedWriteableRegistry namedWriteableRegistry
550555 ) {
551556 var transportService = searchTransportService .transportService ();
552557 var threadPool = transportService .getThreadPool ();
@@ -576,7 +581,8 @@ static void registerNodeSearchAction(
576581 request ,
577582 cancellableTask ,
578583 channel ,
579- dependencies
584+ dependencies ,
585+ namedWriteableRegistry
580586 );
581587 // TODO: log activating or otherwise limiting parallelism might be helpful here
582588 for (int i = 0 ; i < workers ; i ++) {
@@ -587,12 +593,17 @@ static void registerNodeSearchAction(
587593 TransportActionProxy .registerProxyAction (transportService , NODE_SEARCH_ACTION_NAME , true , NodeQueryResponse ::new );
588594 }
589595
590- private static void releaseLocalContext (SearchService searchService , NodeQueryRequest request , SearchPhaseResult result ) {
596+ private static void releaseLocalContext (
597+ SearchService searchService ,
598+ NodeQueryRequest request ,
599+ SearchPhaseResult result ,
600+ NamedWriteableRegistry namedWriteableRegistry
601+ ) {
591602 var phaseResult = result .queryResult () != null ? result .queryResult () : result .rankFeatureResult ();
592603 if (phaseResult != null
593604 && phaseResult .hasSearchContext ()
594605 && request .searchRequest .scroll () == null
595- && isPartOfPIT (request .searchRequest , phaseResult .getContextId ()) == false ) {
606+ && isPartOfPIT (request .searchRequest , phaseResult .getContextId (), namedWriteableRegistry ) == false ) {
596607 searchService .freeReaderContext (phaseResult .getContextId ());
597608 }
598609 }
@@ -736,13 +747,15 @@ private static final class QueryPerNodeState {
736747 private final CountDown countDown ;
737748 private final TransportChannel channel ;
738749 private volatile BottomSortValuesCollector bottomSortCollector ;
750+ private final NamedWriteableRegistry namedWriteableRegistry ;
739751
740752 private QueryPerNodeState (
741753 QueryPhaseResultConsumer queryPhaseResultConsumer ,
742754 NodeQueryRequest searchRequest ,
743755 CancellableTask task ,
744756 TransportChannel channel ,
745- Dependencies dependencies
757+ Dependencies dependencies ,
758+ NamedWriteableRegistry namedWriteableRegistry
746759 ) {
747760 this .queryPhaseResultConsumer = queryPhaseResultConsumer ;
748761 this .searchRequest = searchRequest ;
@@ -752,6 +765,7 @@ private QueryPerNodeState(
752765 this .countDown = new CountDown (queryPhaseResultConsumer .getNumShards ());
753766 this .channel = channel ;
754767 this .dependencies = dependencies ;
768+ this .namedWriteableRegistry = namedWriteableRegistry ;
755769 }
756770
757771 void onShardDone () {
@@ -762,7 +776,7 @@ void onShardDone() {
762776 try (queryPhaseResultConsumer ) {
763777 var failure = queryPhaseResultConsumer .failure .get ();
764778 if (failure != null ) {
765- handleMergeFailure (failure , channelListener );
779+ handleMergeFailure (failure , channelListener , namedWriteableRegistry );
766780 return ;
767781 }
768782 final QueryPhaseResultConsumer .MergeResult mergeResult ;
@@ -772,7 +786,7 @@ void onShardDone() {
772786 EMPTY_PARTIAL_MERGE_RESULT
773787 );
774788 } catch (Exception e ) {
775- handleMergeFailure (e , channelListener );
789+ handleMergeFailure (e , channelListener , namedWriteableRegistry );
776790 return ;
777791 }
778792 // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -799,7 +813,7 @@ void onShardDone() {
799813 && q .hasSuggestHits () == false
800814 && q .getRankShardResult () == null
801815 && searchRequest .searchRequest .scroll () == null
802- && isPartOfPIT (searchRequest .searchRequest , q .getContextId ()) == false ) {
816+ && isPartOfPIT (searchRequest .searchRequest , q .getContextId (), namedWriteableRegistry ) == false ) {
803817 if (dependencies .searchService .freeReaderContext (q .getContextId ())) {
804818 q .clearContextId ();
805819 }
@@ -816,9 +830,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
816830 }
817831 }
818832
819- private void handleMergeFailure (Exception e , ChannelActionListener <TransportResponse > channelListener ) {
833+ private void handleMergeFailure (
834+ Exception e ,
835+ ChannelActionListener <TransportResponse > channelListener ,
836+ NamedWriteableRegistry namedWriteableRegistry
837+ ) {
820838 queryPhaseResultConsumer .getSuccessfulResults ()
821- .forEach (searchPhaseResult -> releaseLocalContext (dependencies .searchService , searchRequest , searchPhaseResult ));
839+ .forEach (
840+ searchPhaseResult -> releaseLocalContext (
841+ dependencies .searchService ,
842+ searchRequest ,
843+ searchPhaseResult ,
844+ namedWriteableRegistry
845+ )
846+ );
822847 channelListener .onFailure (e );
823848 }
824849
0 commit comments