129129 * the same number of rows that it was sent no matter how many documents match.
130130 * </p>
131131 */
132- abstract class AbstractLookupService <R extends AbstractLookupService .Request , T extends AbstractLookupService .TransportRequest > {
132+ public abstract class AbstractLookupService <R extends AbstractLookupService .Request , T extends AbstractLookupService .TransportRequest > {
133133 private final String actionName ;
134134 private final ClusterService clusterService ;
135- private final SearchService searchService ;
135+ private final CreateShardContext createShardContext ;
136136 private final TransportService transportService ;
137137 private final Executor executor ;
138138 private final BigArrays bigArrays ;
@@ -151,7 +151,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
151151 AbstractLookupService (
152152 String actionName ,
153153 ClusterService clusterService ,
154- SearchService searchService ,
154+ CreateShardContext createShardContext ,
155155 TransportService transportService ,
156156 BigArrays bigArrays ,
157157 BlockFactory blockFactory ,
@@ -160,7 +160,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
160160 ) {
161161 this .actionName = actionName ;
162162 this .clusterService = clusterService ;
163- this .searchService = searchService ;
163+ this .createShardContext = createShardContext ;
164164 this .transportService = transportService ;
165165 this .executor = transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH );
166166 this .bigArrays = bigArrays ;
@@ -326,9 +326,8 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
326326 final List <Releasable > releasables = new ArrayList <>(6 );
327327 boolean started = false ;
328328 try {
329- final ShardSearchRequest shardSearchRequest = new ShardSearchRequest (request .shardId , 0 , AliasFilter .EMPTY );
330- final SearchContext searchContext = searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT );
331- releasables .add (searchContext );
329+ LookupShardContext shardContext = createShardContext .create (request .shardId );
330+ releasables .add (shardContext .release );
332331 final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker (
333332 blockFactory .breaker (),
334333 localBreakerSettings .overReservedBytes (),
@@ -366,8 +365,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
366365 }
367366 }
368367 releasables .add (finishPages );
369- SearchExecutionContext searchExecutionContext = searchContext .getSearchExecutionContext ();
370- QueryList queryList = queryList (request , searchExecutionContext , inputBlock , request .inputDataType );
368+ QueryList queryList = queryList (request , shardContext .executionContext , inputBlock , request .inputDataType );
371369 var warnings = Warnings .createWarnings (
372370 DriverContext .WarningsMode .COLLECT ,
373371 request .source .source ().getLineNumber (),
@@ -378,11 +376,11 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
378376 driverContext .blockFactory (),
379377 EnrichQuerySourceOperator .DEFAULT_MAX_PAGE_SIZE ,
380378 queryList ,
381- searchExecutionContext .getIndexReader (),
379+ shardContext . context . searcher () .getIndexReader (),
382380 warnings
383381 );
384382 releasables .add (queryOperator );
385- var extractFieldsOperator = extractFieldsOperator (searchContext , driverContext , request .extractFields );
383+ var extractFieldsOperator = extractFieldsOperator (shardContext . context , driverContext , request .extractFields );
386384 releasables .add (extractFieldsOperator );
387385
388386 /*
@@ -405,7 +403,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
405403 List .of (extractFieldsOperator , finishPages ),
406404 outputOperator ,
407405 Driver .DEFAULT_STATUS_INTERVAL ,
408- Releasables .wrap (searchContext , localBreaker )
406+ Releasables .wrap (shardContext . release , localBreaker )
409407 );
410408 task .addListener (() -> {
411409 String reason = Objects .requireNonNullElse (task .getReasonCancelled (), "task was cancelled" );
@@ -430,15 +428,10 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
430428 }
431429
432430 private static Operator extractFieldsOperator (
433- SearchContext searchContext ,
431+ EsPhysicalOperationProviders . ShardContext shardContext ,
434432 DriverContext driverContext ,
435433 List <NamedExpression > extractFields
436434 ) {
437- EsPhysicalOperationProviders .ShardContext shardContext = new EsPhysicalOperationProviders .DefaultShardContext (
438- 0 ,
439- searchContext .getSearchExecutionContext (),
440- searchContext .request ().getAliasFilter ()
441- );
442435 List <ValuesSourceReaderOperator .FieldInfo > fields = new ArrayList <>(extractFields .size ());
443436 for (NamedExpression extractField : extractFields ) {
444437 BlockLoader loader = shardContext .blockLoader (
@@ -462,7 +455,7 @@ private static Operator extractFieldsOperator(
462455 return new ValuesSourceReaderOperator (
463456 driverContext .blockFactory (),
464457 fields ,
465- List .of (new ValuesSourceReaderOperator .ShardContext (searchContext .searcher ().getIndexReader (), searchContext ::newSourceLoader )),
458+ List .of (new ValuesSourceReaderOperator .ShardContext (shardContext .searcher ().getIndexReader (), shardContext ::newSourceLoader )),
466459 0
467460 );
468461 }
@@ -670,4 +663,42 @@ public boolean hasReferences() {
670663 return refs .hasReferences ();
671664 }
672665 }
666+
667+ /**
668+ * Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
669+ */
670+ public interface CreateShardContext {
671+ LookupShardContext create (ShardId shardId ) throws IOException ;
672+
673+ static CreateShardContext fromSearchService (SearchService searchService ) {
674+ return shardId -> {
675+ ShardSearchRequest shardSearchRequest = new ShardSearchRequest (shardId , 0 , AliasFilter .EMPTY );
676+ return LookupShardContext .fromSearchContext (
677+ searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT )
678+ );
679+ };
680+ }
681+ }
682+
683+ /**
684+ * {@link AbstractLookupService} uses this to power the queries and field loading that
685+ * it needs to perform to actually do the lookup.
686+ */
687+ public record LookupShardContext (
688+ EsPhysicalOperationProviders .ShardContext context ,
689+ SearchExecutionContext executionContext ,
690+ Releasable release
691+ ) {
692+ public static LookupShardContext fromSearchContext (SearchContext context ) {
693+ return new LookupShardContext (
694+ new EsPhysicalOperationProviders .DefaultShardContext (
695+ 0 ,
696+ context .getSearchExecutionContext (),
697+ context .request ().getAliasFilter ()
698+ ),
699+ context .getSearchExecutionContext (),
700+ context
701+ );
702+ }
703+ }
673704}
0 commit comments