Skip to content

Commit bb0450c

Browse files
committed
Have Row - LocalRelation transformation use a copying blocks approach
1 parent 2d1a1ec commit bb0450c

File tree

6 files changed

+106
-23
lines changed

6 files changed

+106
-23
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -509,61 +509,112 @@ ALG |Algiers |385 |major |1
509509
ALL |Albenga |499 |mid |1 |Albenga
510510
;
511511

512-
shadowing-Ignore
512+
shadowing
513513
required_capability: join_planning_v1
514+
required_capability: inlinestats_v8
514515

515516
ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
516-
| INLINESTATS env=VALUES(right) BY client_ip
517+
| INLINESTATS env = VALUES(right) BY client_ip
517518
;
518519

519-
left:keyword | client_ip:keyword | right:keyword | env:keyword
520-
left | 172.21.0.5 | right | right
520+
left:keyword | right:keyword | env:keyword | client_ip:keyword
521+
left | right | right | 172.21.0.5
521522
;
522523

523-
shadowingMulti-Ignore
524+
shadowingMulti
524525
required_capability: join_planning_v1
526+
required_capability: inlinestats_v8
525527

526528
ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
527529
| INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city
528530
;
529531

530-
left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword
531-
left | Zürich | middle | right | left | left | left
532+
left:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword | city:keyword
533+
left | middle | right | left | left | left | Zürich
532534
;
533535

534-
shadowingSelf-Ignore
536+
shadowingSelf
535537
required_capability: join_planning_v1
538+
required_capability: inlinestats_v8
536539

537-
ROW city="Raleigh"
538-
| INLINESTATS city=COUNT(city)
540+
ROW city = "Raleigh"
541+
| INLINESTATS city = COUNT(city)
539542
;
540543

541544
city:long
542545
1
543546
;
544547

545-
shadowingSelfBySelf-Ignore
548+
shadowingSelfBySelf
546549
required_capability: join_planning_v1
550+
required_capability: inlinestats_v8
547551

548-
ROW city="Raleigh"
549-
| INLINESTATS city=COUNT(city) BY city
552+
ROW city = "Raleigh"
553+
| INLINESTATS city = COUNT(city) BY city
550554
;
555+
warning:Line 2:15: Field 'city' shadowed by field at line 2:37
551556

552-
city:long
553-
1
557+
city:keyword
558+
Raleigh
554559
;
555560

556-
shadowingInternal-Ignore
561+
shadowingInternal
557562
required_capability: join_planning_v1
563+
required_capability: inlinestats_v8
558564

559565
ROW city = "Zürich"
560-
| INLINESTATS x=VALUES(city), x=VALUES(city)
566+
| INLINESTATS x = VALUES(city), x = VALUES(city)
561567
;
568+
warning:Line 2:15: Field 'x' shadowed by field at line 2:33
562569

563570
city:keyword | x:keyword
564571
Zürich | Zürich
565572
;
566573

574+
multiInlinestatsWithRow
575+
required_capability: inlinestats_v8
576+
577+
row x = 1
578+
| inlinestats x = max(x) + min(x)
579+
| eval y = x + 1
580+
| inlinestats sum(y)
581+
| inlinestats count(y), count(x)
582+
;
583+
584+
x:integer | y:integer | sum(y):long | count(y):long | count(x):long
585+
2 |3 |3 |1 |1
586+
;
587+
588+
ignoreUnusedEvaledValue-Ignore
589+
required_capability: inlinestats_v8
590+
// fails with expected [keys] to be non-empty
591+
592+
ROW x = 1
593+
| INLINESTATS max(x)
594+
| EVAL y = x + 1
595+
| KEEP x
596+
;
597+
598+
x:integer
599+
1
600+
;
601+
602+
ignoreUnusedEvaledValue2-Ignore
603+
required_capability: inlinestats_v8
604+
// fails with expected [keys] to be non-empty
605+
606+
from employees
607+
| inlinestats max(salary)
608+
| eval y = salary + 1
609+
| keep salary
610+
| sort salary desc
611+
| limit 1
612+
;
613+
614+
salary:integer
615+
74999
616+
;
617+
567618
byConstant
568619
required_capability: inlinestats_v8
569620

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1212
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1313
import org.elasticsearch.xpack.esql.plan.logical.Row;
14+
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
1415
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
15-
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
1616
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
1717

1818
import java.util.ArrayList;
@@ -29,6 +29,6 @@ protected LogicalPlan rule(Row row, LogicalOptimizerContext context) {
2929
List<Object> values = new ArrayList<>(fields.size());
3030
fields.forEach(f -> values.add(f.child().fold(context.foldCtx())));
3131
var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
32-
return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks));
32+
return new LocalRelation(row.source(), row.output(), new CopyingLocalSupplier(blocks));
3333
}
3434
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.logical.local;
9+
10+
import org.elasticsearch.compute.data.Block;
11+
import org.elasticsearch.compute.data.BlockUtils;
12+
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
13+
14+
/**
15+
* A {@link LocalSupplier} that allways creates a new copy of the {@link Block}s initially provided at creation time.
16+
*/
17+
public class CopyingLocalSupplier extends ImmediateLocalSupplier {
18+
19+
public CopyingLocalSupplier(Block[] blocks) {
20+
super(blocks);
21+
}
22+
23+
@Override
24+
public Block[] get() {
25+
Block[] blockCopies = new Block[blocks.length];
26+
for (int i = 0; i < blockCopies.length; i++) {
27+
blockCopies[i] = BlockUtils.deepCopyOf(blocks[i], PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
28+
}
29+
return blockCopies;
30+
}
31+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* A {@link LocalSupplier} that contains already filled {@link Block}s.
1919
*/
2020
public class ImmediateLocalSupplier implements LocalSupplier {
21-
private final Block[] blocks;
21+
final Block[] blocks;
2222

2323
public ImmediateLocalSupplier(Block[] blocks) {
2424
this.blocks = blocks;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,7 @@ private void executeSubPlans(
235235

236236
private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
237237
Holder<LogicalPlanTuple> subPlan = new Holder<>();
238-
// Collect the first inlinejoin (bottom up in the tree or, viewing from the user-friendly query pov, the closest to ES source
239-
// inlinestats command)
238+
// Collect the first inlinejoin (bottom up in the tree)
240239
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
241240
// extract the right side of the plan and replace its source
242241
if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ public static Block[] fromPages(List<Attribute> schema, List<Page> pages) {
2626
// Limit ourselves to 1mb of results similar to LOOKUP for now.
2727
long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
2828
if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
29-
throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
29+
throw new IllegalArgumentException(
30+
"INLINESTATS sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"
31+
);
3032
}
3133
int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum();
3234
Block.Builder[] builders = new Block.Builder[schema.size()];

0 commit comments

Comments
 (0)