130130 * the same number of rows that it was sent no matter how many documents match.
131131 * </p>
132132 */
133- abstract class AbstractLookupService <R extends AbstractLookupService .Request , T extends AbstractLookupService .TransportRequest > {
133+ public abstract class AbstractLookupService <R extends AbstractLookupService .Request , T extends AbstractLookupService .TransportRequest > {
134134 private final String actionName ;
135135 private final ClusterService clusterService ;
136- private final SearchService searchService ;
136+ private final CreateShardContext createShardContext ;
137137 private final TransportService transportService ;
138138 private final Executor executor ;
139139 private final BigArrays bigArrays ;
@@ -152,7 +152,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
152152 AbstractLookupService (
153153 String actionName ,
154154 ClusterService clusterService ,
155- SearchService searchService ,
155+ CreateShardContext createShardContext ,
156156 TransportService transportService ,
157157 BigArrays bigArrays ,
158158 BlockFactory blockFactory ,
@@ -161,7 +161,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
161161 ) {
162162 this .actionName = actionName ;
163163 this .clusterService = clusterService ;
164- this .searchService = searchService ;
164+ this .createShardContext = createShardContext ;
165165 this .transportService = transportService ;
166166 this .executor = transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH );
167167 this .bigArrays = bigArrays ;
@@ -324,9 +324,8 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
324324 final List <Releasable > releasables = new ArrayList <>(6 );
325325 boolean started = false ;
326326 try {
327- final ShardSearchRequest shardSearchRequest = new ShardSearchRequest (request .shardId , 0 , AliasFilter .EMPTY );
328- final SearchContext searchContext = searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT );
329- releasables .add (searchContext );
327+ LookupShardContext shardContext = createShardContext .create (request .shardId );
328+ releasables .add (shardContext .release );
330329 final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker (
331330 blockFactory .breaker (),
332331 localBreakerSettings .overReservedBytes (),
@@ -364,8 +363,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
364363 }
365364 }
366365 releasables .add (finishPages );
367- SearchExecutionContext searchExecutionContext = searchContext .getSearchExecutionContext ();
368- QueryList queryList = queryList (request , searchExecutionContext , inputBlock , request .inputDataType );
366+ QueryList queryList = queryList (request , shardContext .executionContext , inputBlock , request .inputDataType );
369367 var warnings = Warnings .createWarnings (
370368 DriverContext .WarningsMode .COLLECT ,
371369 request .source .source ().getLineNumber (),
@@ -376,11 +374,11 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
376374 driverContext .blockFactory (),
377375 EnrichQuerySourceOperator .DEFAULT_MAX_PAGE_SIZE ,
378376 queryList ,
379- searchExecutionContext .getIndexReader (),
377+ shardContext . context . searcher () .getIndexReader (),
380378 warnings
381379 );
382380 releasables .add (queryOperator );
383- var extractFieldsOperator = extractFieldsOperator (searchContext , driverContext , request .extractFields );
381+ var extractFieldsOperator = extractFieldsOperator (shardContext . context , driverContext , request .extractFields );
384382 releasables .add (extractFieldsOperator );
385383
386384 /*
@@ -403,7 +401,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
403401 List .of (extractFieldsOperator , finishPages ),
404402 outputOperator ,
405403 Driver .DEFAULT_STATUS_INTERVAL ,
406- Releasables .wrap (searchContext , localBreaker )
404+ Releasables .wrap (shardContext . release , localBreaker )
407405 );
408406 task .addListener (() -> {
409407 String reason = Objects .requireNonNullElse (task .getReasonCancelled (), "task was cancelled" );
@@ -440,15 +438,10 @@ public void onFailure(Exception e) {
440438 }
441439
442440 private static Operator extractFieldsOperator (
443- SearchContext searchContext ,
441+ EsPhysicalOperationProviders . ShardContext shardContext ,
444442 DriverContext driverContext ,
445443 List <NamedExpression > extractFields
446444 ) {
447- EsPhysicalOperationProviders .ShardContext shardContext = new EsPhysicalOperationProviders .DefaultShardContext (
448- 0 ,
449- searchContext .getSearchExecutionContext (),
450- searchContext .request ().getAliasFilter ()
451- );
452445 List <ValuesSourceReaderOperator .FieldInfo > fields = new ArrayList <>(extractFields .size ());
453446 for (NamedExpression extractField : extractFields ) {
454447 BlockLoader loader = shardContext .blockLoader (
@@ -472,7 +465,7 @@ private static Operator extractFieldsOperator(
472465 return new ValuesSourceReaderOperator (
473466 driverContext .blockFactory (),
474467 fields ,
475- List .of (new ValuesSourceReaderOperator .ShardContext (searchContext .searcher ().getIndexReader (), searchContext ::newSourceLoader )),
468+ List .of (new ValuesSourceReaderOperator .ShardContext (shardContext .searcher ().getIndexReader (), shardContext ::newSourceLoader )),
476469 0
477470 );
478471 }
@@ -680,4 +673,42 @@ public boolean hasReferences() {
680673 return refs .hasReferences ();
681674 }
682675 }
676+
677+ /**
678+ * Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
679+ */
680+ public interface CreateShardContext {
681+ LookupShardContext create (ShardId shardId ) throws IOException ;
682+
683+ static CreateShardContext fromSearchService (SearchService searchService ) {
684+ return shardId -> {
685+ ShardSearchRequest shardSearchRequest = new ShardSearchRequest (shardId , 0 , AliasFilter .EMPTY );
686+ return LookupShardContext .fromSearchContext (
687+ searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT )
688+ );
689+ };
690+ }
691+ }
692+
693+ /**
694+ * {@link AbstractLookupService} uses this to power the queries and field loading that
695+ * it needs to perform to actually do the lookup.
696+ */
697+ public record LookupShardContext (
698+ EsPhysicalOperationProviders .ShardContext context ,
699+ SearchExecutionContext executionContext ,
700+ Releasable release
701+ ) {
702+ public static LookupShardContext fromSearchContext (SearchContext context ) {
703+ return new LookupShardContext (
704+ new EsPhysicalOperationProviders .DefaultShardContext (
705+ 0 ,
706+ context .getSearchExecutionContext (),
707+ context .request ().getAliasFilter ()
708+ ),
709+ context .getSearchExecutionContext (),
710+ context
711+ );
712+ }
713+ }
683714}
0 commit comments