30
30
import org .elasticsearch .compute .data .BlockFactory ;
31
31
import org .elasticsearch .compute .data .BlockStreamInput ;
32
32
import org .elasticsearch .compute .data .ElementType ;
33
- import org .elasticsearch .compute .data .IntBlock ;
34
33
import org .elasticsearch .compute .data .IntVector ;
35
34
import org .elasticsearch .compute .data .LocalCircuitBreaker ;
36
35
import org .elasticsearch .compute .data .OrdinalBytesRefBlock ;
42
41
import org .elasticsearch .compute .operator .OutputOperator ;
43
42
import org .elasticsearch .core .AbstractRefCounted ;
44
43
import org .elasticsearch .core .RefCounted ;
44
+ import org .elasticsearch .core .Releasable ;
45
45
import org .elasticsearch .core .Releasables ;
46
46
import org .elasticsearch .index .mapper .BlockLoader ;
47
47
import org .elasticsearch .index .mapper .MappedFieldType ;
@@ -249,29 +249,52 @@ private void doLookup(
249
249
ActionListener <Page > listener
250
250
) {
251
251
Block inputBlock = inputPage .getBlock (0 );
252
- final IntBlock selectedPositions ;
253
- if (inputBlock instanceof OrdinalBytesRefBlock ordinalBytesRefBlock ) {
254
- inputBlock = ordinalBytesRefBlock .getDictionaryVector ().asBlock ();
255
- selectedPositions = ordinalBytesRefBlock .getOrdinalsBlock ();
256
- selectedPositions .mustIncRef ();
257
- } else {
258
- selectedPositions = IntVector .range (0 , inputBlock .getPositionCount (), blockFactory ).asBlock ();
252
+ if (inputBlock .areAllValuesNull ()) {
253
+ listener .onResponse (createNullResponse (inputPage .getPositionCount (), extractFields ));
254
+ return ;
259
255
}
260
- LocalCircuitBreaker localBreaker = null ;
256
+ final List <Releasable > releasables = new ArrayList <>(6 );
257
+ boolean started = false ;
261
258
try {
262
- if (inputBlock .areAllValuesNull ()) {
263
- listener .onResponse (createNullResponse (inputPage .getPositionCount (), extractFields ));
264
- return ;
265
- }
266
- ShardSearchRequest shardSearchRequest = new ShardSearchRequest (shardId , 0 , AliasFilter .EMPTY );
267
- SearchContext searchContext = searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT );
268
- listener = ActionListener .runBefore (listener , searchContext ::close );
269
- localBreaker = new LocalCircuitBreaker (
259
+ final ShardSearchRequest shardSearchRequest = new ShardSearchRequest (shardId , 0 , AliasFilter .EMPTY );
260
+ final SearchContext searchContext = searchService .createSearchContext (shardSearchRequest , SearchService .NO_TIMEOUT );
261
+ releasables .add (searchContext );
262
+ final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker (
270
263
blockFactory .breaker (),
271
264
localBreakerSettings .overReservedBytes (),
272
265
localBreakerSettings .maxOverReservedBytes ()
273
266
);
274
- DriverContext driverContext = new DriverContext (bigArrays , blockFactory .newChildFactory (localBreaker ));
267
+ releasables .add (localBreaker );
268
+ final DriverContext driverContext = new DriverContext (bigArrays , blockFactory .newChildFactory (localBreaker ));
269
+ final ElementType [] mergingTypes = new ElementType [extractFields .size ()];
270
+ for (int i = 0 ; i < extractFields .size (); i ++) {
271
+ mergingTypes [i ] = PlannerUtils .toElementType (extractFields .get (i ).dataType ());
272
+ }
273
+ final int [] mergingChannels = IntStream .range (0 , extractFields .size ()).map (i -> i + 2 ).toArray ();
274
+ final MergePositionsOperator mergePositionsOperator ;
275
+ if (inputBlock instanceof OrdinalBytesRefBlock ordinalsBytesRefBlock ) {
276
+ inputBlock = ordinalsBytesRefBlock .getDictionaryVector ().asBlock ();
277
+ var selectedPositions = ordinalsBytesRefBlock .getOrdinalsBlock ();
278
+ mergePositionsOperator = new MergePositionsOperator (
279
+ 1 ,
280
+ mergingChannels ,
281
+ mergingTypes ,
282
+ selectedPositions ,
283
+ driverContext .blockFactory ()
284
+ );
285
+
286
+ } else {
287
+ try (var selectedPositions = IntVector .range (0 , inputBlock .getPositionCount (), blockFactory ).asBlock ()) {
288
+ mergePositionsOperator = new MergePositionsOperator (
289
+ 1 ,
290
+ mergingChannels ,
291
+ mergingTypes ,
292
+ selectedPositions ,
293
+ driverContext .blockFactory ()
294
+ );
295
+ }
296
+ }
297
+ releasables .add (mergePositionsOperator );
275
298
SearchExecutionContext searchExecutionContext = searchContext .getSearchExecutionContext ();
276
299
MappedFieldType fieldType = searchExecutionContext .getFieldType (matchField );
277
300
var queryList = switch (matchType ) {
@@ -285,57 +308,13 @@ private void doLookup(
285
308
queryList ,
286
309
searchExecutionContext .getIndexReader ()
287
310
);
288
- List <Operator > intermediateOperators = new ArrayList <>(extractFields .size () + 2 );
289
- final ElementType [] mergingTypes = new ElementType [extractFields .size ()];
290
- // load the fields
291
- List <ValuesSourceReaderOperator .FieldInfo > fields = new ArrayList <>(extractFields .size ());
292
- for (int i = 0 ; i < extractFields .size (); i ++) {
293
- NamedExpression extractField = extractFields .get (i );
294
- final ElementType elementType = PlannerUtils .toElementType (extractField .dataType ());
295
- mergingTypes [i ] = elementType ;
296
- EsPhysicalOperationProviders .ShardContext ctx = new EsPhysicalOperationProviders .DefaultShardContext (
297
- 0 ,
298
- searchContext .getSearchExecutionContext (),
299
- searchContext .request ().getAliasFilter ()
300
- );
301
- BlockLoader loader = ctx .blockLoader (
302
- extractField instanceof Alias a ? ((NamedExpression ) a .child ()).name () : extractField .name (),
303
- EsqlDataTypes .isUnsupported (extractField .dataType ()),
304
- MappedFieldType .FieldExtractPreference .NONE
305
- );
306
- fields .add (
307
- new ValuesSourceReaderOperator .FieldInfo (
308
- extractField .name (),
309
- PlannerUtils .toElementType (extractField .dataType ()),
310
- shardIdx -> {
311
- if (shardIdx != 0 ) {
312
- throw new IllegalStateException ("only one shard" );
313
- }
314
- return loader ;
315
- }
316
- )
317
- );
318
- }
319
- intermediateOperators .add (
320
- new ValuesSourceReaderOperator (
321
- driverContext .blockFactory (),
322
- fields ,
323
- List .of (
324
- new ValuesSourceReaderOperator .ShardContext (
325
- searchContext .searcher ().getIndexReader (),
326
- searchContext ::newSourceLoader
327
- )
328
- ),
329
- 0
330
- )
331
- );
332
- // merging field-values by position
333
- final int [] mergingChannels = IntStream .range (0 , extractFields .size ()).map (i -> i + 2 ).toArray ();
334
- intermediateOperators .add (
335
- new MergePositionsOperator (1 , mergingChannels , mergingTypes , selectedPositions , driverContext .blockFactory ())
336
- );
311
+ releasables .add (queryOperator );
312
+ var extractFieldsOperator = extractFieldsOperator (searchContext , driverContext , extractFields );
313
+ releasables .add (extractFieldsOperator );
314
+
337
315
AtomicReference <Page > result = new AtomicReference <>();
338
316
OutputOperator outputOperator = new OutputOperator (List .of (), Function .identity (), result ::set );
317
+ releasables .add (outputOperator );
339
318
Driver driver = new Driver (
340
319
"enrich-lookup:" + sessionId ,
341
320
System .currentTimeMillis (),
@@ -351,30 +330,69 @@ private void doLookup(
351
330
inputPage .getPositionCount ()
352
331
),
353
332
queryOperator ,
354
- intermediateOperators ,
333
+ List . of ( extractFieldsOperator , mergePositionsOperator ) ,
355
334
outputOperator ,
356
335
Driver .DEFAULT_STATUS_INTERVAL ,
357
- localBreaker
336
+ Releasables . wrap ( searchContext , localBreaker )
358
337
);
359
338
task .addListener (() -> {
360
339
String reason = Objects .requireNonNullElse (task .getReasonCancelled (), "task was cancelled" );
361
340
driver .cancel (reason );
362
341
});
363
-
364
342
var threadContext = transportService .getThreadPool ().getThreadContext ();
365
- localBreaker = null ;
366
343
Driver .start (threadContext , executor , driver , Driver .DEFAULT_MAX_ITERATIONS , listener .map (ignored -> {
367
344
Page out = result .get ();
368
345
if (out == null ) {
369
346
out = createNullResponse (inputPage .getPositionCount (), extractFields );
370
347
}
371
348
return out ;
372
349
}));
350
+ started = true ;
373
351
} catch (Exception e ) {
374
352
listener .onFailure (e );
375
353
} finally {
376
- Releasables .close (selectedPositions , localBreaker );
354
+ if (started == false ) {
355
+ Releasables .close (releasables );
356
+ }
357
+ }
358
+ }
359
+
360
+ private static Operator extractFieldsOperator (
361
+ SearchContext searchContext ,
362
+ DriverContext driverContext ,
363
+ List <NamedExpression > extractFields
364
+ ) {
365
+ EsPhysicalOperationProviders .ShardContext shardContext = new EsPhysicalOperationProviders .DefaultShardContext (
366
+ 0 ,
367
+ searchContext .getSearchExecutionContext (),
368
+ searchContext .request ().getAliasFilter ()
369
+ );
370
+ List <ValuesSourceReaderOperator .FieldInfo > fields = new ArrayList <>(extractFields .size ());
371
+ for (NamedExpression extractField : extractFields ) {
372
+ BlockLoader loader = shardContext .blockLoader (
373
+ extractField instanceof Alias a ? ((NamedExpression ) a .child ()).name () : extractField .name (),
374
+ EsqlDataTypes .isUnsupported (extractField .dataType ()),
375
+ MappedFieldType .FieldExtractPreference .NONE
376
+ );
377
+ fields .add (
378
+ new ValuesSourceReaderOperator .FieldInfo (
379
+ extractField .name (),
380
+ PlannerUtils .toElementType (extractField .dataType ()),
381
+ shardIdx -> {
382
+ if (shardIdx != 0 ) {
383
+ throw new IllegalStateException ("only one shard" );
384
+ }
385
+ return loader ;
386
+ }
387
+ )
388
+ );
377
389
}
390
+ return new ValuesSourceReaderOperator (
391
+ driverContext .blockFactory (),
392
+ fields ,
393
+ List .of (new ValuesSourceReaderOperator .ShardContext (searchContext .searcher ().getIndexReader (), searchContext ::newSourceLoader )),
394
+ 0
395
+ );
378
396
}
379
397
380
398
private Page createNullResponse (int positionCount , List <NamedExpression > extractFields ) {
0 commit comments