@@ -394,10 +394,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
394394 return request ;
395395 }
396396
397- private static boolean isPartOfPIT (SearchRequest request , ShardSearchContextId contextId ) {
397+ private static boolean isPartOfPIT (
398+ SearchRequest request ,
399+ ShardSearchContextId contextId ,
400+ NamedWriteableRegistry namedWriteableRegistry
401+ ) {
398402 final PointInTimeBuilder pointInTimeBuilder = request .pointInTimeBuilder ();
399403 if (pointInTimeBuilder != null ) {
400- return request .pointInTimeBuilder ().getSearchContextId (null ).contains (contextId );
404+ return request .pointInTimeBuilder ().getSearchContextId (namedWriteableRegistry ).contains (contextId );
401405 } else {
402406 return false ;
403407 }
@@ -551,7 +555,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
551555 static void registerNodeSearchAction (
552556 SearchTransportService searchTransportService ,
553557 SearchService searchService ,
554- SearchPhaseController searchPhaseController
558+ SearchPhaseController searchPhaseController ,
559+ NamedWriteableRegistry namedWriteableRegistry
555560 ) {
556561 var transportService = searchTransportService .transportService ();
557562 var threadPool = transportService .getThreadPool ();
@@ -581,7 +586,8 @@ static void registerNodeSearchAction(
581586 request ,
582587 cancellableTask ,
583588 channel ,
584- dependencies
589+ dependencies ,
590+ namedWriteableRegistry
585591 );
586592 // TODO: log activating or otherwise limiting parallelism might be helpful here
587593 for (int i = 0 ; i < workers ; i ++) {
@@ -592,12 +598,17 @@ static void registerNodeSearchAction(
592598 TransportActionProxy .registerProxyAction (transportService , NODE_SEARCH_ACTION_NAME , true , NodeQueryResponse ::new );
593599 }
594600
595- private static void releaseLocalContext (SearchService searchService , NodeQueryRequest request , SearchPhaseResult result ) {
601+ private static void releaseLocalContext (
602+ SearchService searchService ,
603+ NodeQueryRequest request ,
604+ SearchPhaseResult result ,
605+ NamedWriteableRegistry namedWriteableRegistry
606+ ) {
596607 var phaseResult = result .queryResult () != null ? result .queryResult () : result .rankFeatureResult ();
597608 if (phaseResult != null
598609 && phaseResult .hasSearchContext ()
599610 && request .searchRequest .scroll () == null
600- && isPartOfPIT (request .searchRequest , phaseResult .getContextId ()) == false ) {
611+ && isPartOfPIT (request .searchRequest , phaseResult .getContextId (), namedWriteableRegistry ) == false ) {
601612 searchService .freeReaderContext (phaseResult .getContextId ());
602613 }
603614 }
@@ -741,13 +752,15 @@ private static final class QueryPerNodeState {
741752 private final CountDown countDown ;
742753 private final TransportChannel channel ;
743754 private volatile BottomSortValuesCollector bottomSortCollector ;
755+ private final NamedWriteableRegistry namedWriteableRegistry ;
744756
745757 private QueryPerNodeState (
746758 QueryPhaseResultConsumer queryPhaseResultConsumer ,
747759 NodeQueryRequest searchRequest ,
748760 CancellableTask task ,
749761 TransportChannel channel ,
750- Dependencies dependencies
762+ Dependencies dependencies ,
763+ NamedWriteableRegistry namedWriteableRegistry
751764 ) {
752765 this .queryPhaseResultConsumer = queryPhaseResultConsumer ;
753766 this .searchRequest = searchRequest ;
@@ -757,6 +770,7 @@ private QueryPerNodeState(
757770 this .countDown = new CountDown (queryPhaseResultConsumer .getNumShards ());
758771 this .channel = channel ;
759772 this .dependencies = dependencies ;
773+ this .namedWriteableRegistry = namedWriteableRegistry ;
760774 }
761775
762776 void onShardDone () {
@@ -769,7 +783,7 @@ void onShardDone() {
769783 try (queryPhaseResultConsumer ) {
770784 var failure = queryPhaseResultConsumer .failure .get ();
771785 if (failure != null ) {
772- handleMergeFailure (failure , channelListener );
786+ handleMergeFailure (failure , channelListener , namedWriteableRegistry );
773787 return ;
774788 }
775789 final QueryPhaseResultConsumer .MergeResult mergeResult ;
@@ -779,7 +793,7 @@ void onShardDone() {
779793 EMPTY_PARTIAL_MERGE_RESULT
780794 );
781795 } catch (Exception e ) {
782- handleMergeFailure (e , channelListener );
796+ handleMergeFailure (e , channelListener , namedWriteableRegistry );
783797 return ;
784798 }
785799 // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -804,14 +818,14 @@ void onShardDone() {
804818 NodeQueryResponse .writePerShardException (out , failures .remove (i ));
805819 } else {
806820 // free context id and remove it from the result right away in case we don't need it anymore
807- maybeFreeContext (result , relevantShardIndices );
821+ maybeFreeContext (result , relevantShardIndices , namedWriteableRegistry );
808822 NodeQueryResponse .writePerShardResult (out , result );
809823 }
810824 }
811825 NodeQueryResponse .writeMergeResult (out , mergeResult , queryPhaseResultConsumer .topDocsStats );
812826 success = true ;
813827 } catch (IOException e ) {
814- handleMergeFailure (e , channelListener );
828+ handleMergeFailure (e , channelListener , namedWriteableRegistry );
815829 return ;
816830 }
817831 } finally {
@@ -822,23 +836,38 @@ void onShardDone() {
822836 ActionListener .respondAndRelease (channelListener , new BytesTransportResponse (out .moveToBytesReference ()));
823837 }
824838
825- private void maybeFreeContext (SearchPhaseResult result , BitSet relevantShardIndices ) {
839+ private void maybeFreeContext (
840+ SearchPhaseResult result ,
841+ BitSet relevantShardIndices ,
842+ NamedWriteableRegistry namedWriteableRegistry
843+ ) {
826844 if (result instanceof QuerySearchResult q
827845 && q .getContextId () != null
828846 && relevantShardIndices .get (q .getShardIndex ()) == false
829847 && q .hasSuggestHits () == false
830848 && q .getRankShardResult () == null
831849 && searchRequest .searchRequest .scroll () == null
832- && isPartOfPIT (searchRequest .searchRequest , q .getContextId ()) == false ) {
850+ && isPartOfPIT (searchRequest .searchRequest , q .getContextId (), namedWriteableRegistry ) == false ) {
833851 if (dependencies .searchService .freeReaderContext (q .getContextId ())) {
834852 q .clearContextId ();
835853 }
836854 }
837855 }
838856
839- private void handleMergeFailure (Exception e , ChannelActionListener <TransportResponse > channelListener ) {
857+ private void handleMergeFailure (
858+ Exception e ,
859+ ChannelActionListener <TransportResponse > channelListener ,
860+ NamedWriteableRegistry namedWriteableRegistry
861+ ) {
840862 queryPhaseResultConsumer .getSuccessfulResults ()
841- .forEach (searchPhaseResult -> releaseLocalContext (dependencies .searchService , searchRequest , searchPhaseResult ));
863+ .forEach (
864+ searchPhaseResult -> releaseLocalContext (
865+ dependencies .searchService ,
866+ searchRequest ,
867+ searchPhaseResult ,
868+ namedWriteableRegistry
869+ )
870+ );
842871 channelListener .onFailure (e );
843872 }
844873
0 commit comments