Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136467.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136467
summary: "ESQL: Fix double release in inline stats when `LocalRelation` is reused"
area: ES|QL
type: bug
issues:
- 135679
Original file line number Diff line number Diff line change
Expand Up @@ -3990,3 +3990,184 @@ Canada |English |null |null
Mv-Land |Mv-Lang |null |null
Mv-Land2 |Mv-Lang2 |null |null
;

// PruneColumns
inlineStatsAfterPruningAggregate
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a = 1
| stats b = max(a)
| inline stats c = count(*)
| drop b
;

c:long
1
;

// PropagateEmptyRelation
inlineStatsAfterPruningAggregate2
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a = 12
| where false
| stats b = max(a)
| inline stats c = count(b)
;

b:integer | c:long
null | 0
;


// ReplaceStatsFilteredAggWithEval
inlineStatsAfterPruningAggregate3
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a= 12
| stats b = sum(a) where false
| inline stats c = max(b)
;

b:long | c:long
null | null
;


inlineStatsAfterPruningAggregate4
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from k8s
| stats PRezTcny = median(network.cost), `network.cost` = count(network.cost)
| drop network.cost
| inline stats QaoRGYyf = count(*)
| drop `PRezTcny`
;

QaoRGYyf:long
1
;


inlineStatsAfterPruningAggregate5
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from colors,airports_web,boo*
| rename scalerank as language_code
| lookup join languages_lookup on language_code
| keep `ratings`, `rgb_vector`
| mv_expand ratings
| keep `ratings`, `rgb_vector`
| inline stats rycaPYFzpAId = absent(ratings), WkdxfjwuR = count(ratings) + avg(ratings)
| rename ratings AS `WkdxfjwuR`
| inline stats WkdxfjwuR = sum(WkdxfjwuR) + median(WkdxfjwuR), sjivSBvV = count(*)
| sort sjivSBvV NULLS FIRST
| where false OR NOT true OR false OR NOT false
| where false OR false AND false AND NOT true AND false
| stats rycaPYFzpAId = median(sjivSBvV) + count(sjivSBvV) + sum(sjivSBvV), NUiLBJXQoMXd = present(sjivSBvV)
| mv_expand `NUiLBJXQoMXd`
| inline stats NUiLBJXQoMXd = count(rycaPYFzpAId)
;

rycaPYFzpAId:double | NUiLBJXQoMXd:long
null | 0
;


inlineStatsAfterPruningAggregate6
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from d*,*
| rename scalerank as other2
| rename author.keyword as name_str
| rename intersects as is_active_bool
| rename year as id_int
| rename status as other1
| lookup join multi_column_joinable_lookup on other2, name_str, is_active_bool, id_int, other1
| rename street AS `huZQQIKVli`, `network.eth0.currently_connected_clients` AS is_rehired
| eval GhMzAVXivNI = message.raw, VDJKNsopeKQq = substring(threat_level, 1, 3), xeUVJNra = length(huZQQIKVli), kOQjzWgZNhwI = false
| where false AND NOT false AND true AND NOT false
| CHANGE_POINT salary_change.int ON birth_date AS UpYfyPcYFkes, EwnNgKIrDEKS
| sort owner.name DESC NULLS LAST, message.raw, country.keyword DESC NULLS LAST, UpYfyPcYFkes ASC NULLS FIRST, name_str ASC NULLS FIRST, num, event_duration DESC NULLS LAST, extra2 DESC, languages.byte ASC NULLS FIRST, client.ip NULLS LAST, ip1, collection NULLS FIRST, city.country.name DESC, destination.IP DESC NULLS LAST, host.version DESC, hex_code ASC NULLS LAST, language.code ASC NULLS LAST, language.name ASC, details DESC, language_name DESC NULLS FIRST, event ASC, salary_change.keyword DESC NULLS FIRST, lk ASC
| stats CLnBmEGor = sum(height.double) + sum(height.double) + avg(height.double), JLSxMYld = count(extra2) + max(extra2) + avg(extra2), `network.total_bytes_out` = min(language_code_integer), `version` = values(env), contains = min(languages.short) + min(languages.short)
| mv_expand `CLnBmEGor`
| dissect version "%{JLSxMYld} %{LTMJrburyt}"
| enrich languages_policy on LTMJrburyt
| inline stats QyCBayGHtAF = count(CLnBmEGor), yUMkeLezSN = present(contains + network.total_bytes_out), `JLSxMYld` = max(contains), GYbfUvbf = count(*)
;

CLnBmEGor:double | network.total_bytes_out:integer | version:keyword | contains:integer | LTMJrburyt:keyword | language_name:keyword | QyCBayGHtAF:long | yUMkeLezSN:boolean | JLSxMYld:integer | GYbfUvbf:long
null | null | null | null | null | null | 0 | false | null | 1
;


inlineStatsAfterPruningAggregate7
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from app_log*
| FORK (where false OR NOT true) (where true) (where true) (where false OR true) (where false) (where true
| mv_expand @timestamp)
| WHERE _fork == "fork2"
| DROP _fork
| rename service_id as name_str
| lookup join multi_column_joinable_lookup on name_str
| inline stats `is_active_bool` = count(id_int) + max(id_int), siofAMlpoHvh = max(id_int) + max(id_int) + max(id_int), `message` = present(other1), `id_int` = max(id_int)
| stats `message` = count(is_active_bool), id_int2 = median(is_active_bool), `siofAMlpoHvh` = count(*), `id_int` = max(id_int)
| eval NUtENNcs = message, message = "zaYFcCTy", id_int2 = null, VizcTbjMnBlQ = siofAMlpoHvh, siofAMlpoHvh = siofAMlpoHvh + siofAMlpoHvh + siofAMlpoHvh, gPzLgKdLfZ = null, `message` = 1099438720, sdsrveQOoi = -2373351299779434224, hpxNweCm = false, PwyOmHforRKI = -2230544247357512169
| stats VizcTbjMnBlQ = count(*), siofAMlpoHvh2 = median(sdsrveQOoi) + max(sdsrveQOoi), DUuKdsGgU = avg(PwyOmHforRKI) + count(PwyOmHforRKI), `gPzLgKdLfZ` = present(sdsrveQOoi + VizcTbjMnBlQ + PwyOmHforRKI), `siofAMlpoHvh` = count(sdsrveQOoi) + max(sdsrveQOoi)
| limit 61
| inline stats gPzLgKdLfZ = sum(VizcTbjMnBlQ), `siofAMlpoHvh2` = count(*), zvgqSiVGqli = max(VizcTbjMnBlQ), WFjgBzYRXwa = median(VizcTbjMnBlQ) + count(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ)
| keep `VizcTbjMnBlQ`
| inline stats wXPJguff = median(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ) + max(VizcTbjMnBlQ)
;

VizcTbjMnBlQ:long | wXPJguff:double
1 | 3.0
;


inlineStatsAfterPruningAggregate8
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from dense_vector,airport_city_boundaries
| eval kMvNZVQSH = -4048759096317256968, GzAYNHaHuIS = 4262770411405329967, EwPpQFmOPoB = "lkdgnbTrBT", `abbrev` = "LiXI"
| keep `city`, `kMvNZVQSH`, GzAYNHaHuIS, city_boundary, EwPpQFmOPoB, `city_boundary`, `id`, kMvNZVQSH, id, airport
| limit 6888
| keep `kMvNZVQSH`, EwPpQFmOPoB, city, `id`, kMvNZVQSH
| stats `EwPpQFmOPoB2` = min(id), `EwPpQFmOPoB3` = count(*), kZiJPhNZ = count(id), ErGhdKhdv = count_distinct(city), EwPpQFmOPoB = min(id)
| eval ErGhdKhdv = null, fHSyxniFCS = -710891486, ErGhdKhdv = -1104488048, zdxdQdVIyFh = -1579062572, `kZiJPhNZ` = null, SzmOySyUh = null, eWVVRiwP = 1092261898855405071, nUxVhGhcpkiV = true, RvqGioBAAzYs = 169356242
| eval `kZiJPhNZ` = "gWJHDTcDMjRVSnlzSX", lxLgaagX = 4700531997851493340, `kZiJPhNZ` = false
| keep `zdxdQdVIyFh`
| inline stats `zdxdQdVIyFh` = count(*), WbWVJnYJ = count(*), zdxdQdVIyFh2 = max(zdxdQdVIyFh), uKjfeKTfaH = max(zdxdQdVIyFh), QpovVxjMWSjh = min(zdxdQdVIyFh)
;

zdxdQdVIyFh:long | WbWVJnYJ:long | zdxdQdVIyFh2:integer | uKjfeKTfaH:integer | QpovVxjMWSjh:integer
1 | 1 | -1579062572 | -1579062572 | -1579062572
;


inlineStatsAfterPruningAggregate9
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from employees
| stats m = count(emp_no)
| eval c = 0
| keep c
| inline stats c = count(*)
;

c:long
1
;
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,11 @@ public enum Cap {
*/
TS_PERMIT_TEXT_BECOMING_KEYWORD_WHEN_GROUPED_ON,

/**
* INLINE STATS support after aggregate pruning
*/
INLINE_STATS_AFTER_PRUNING_AGGREGATE,

/**
* Fix management of plans with no columns
* https://github.com/elastic/elasticsearch/issues/120272
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -43,7 +44,7 @@ protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());
p = replacePlanByRelation(
plan,
LocalSupplier.of(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new)))
new CopyingLocalSupplier(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new)))
);
} else {
p = PruneEmptyPlans.skipPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand Down Expand Up @@ -111,7 +111,7 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
p = new LocalRelation(
aggregate.source(),
List.of(Expressions.attribute(aggregate.aggregates().getFirst())),
LocalSupplier.of(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1)))
new CopyingLocalSupplier(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1)))
);
} else {
// Aggs cannot produce pages with 0 columns, so retain one grouping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -137,6 +137,6 @@ private static LocalRelation localRelation(Source source, List<Alias> newEvals)
attributes.add(alias.toAttribute());
blocks[i] = BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, ((Literal) alias.child()).value(), 1);
}
return new LocalRelation(source, attributes, LocalSupplier.of(new Page(blocks)));
return new LocalRelation(source, attributes, new CopyingLocalSupplier(new Page(blocks)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.aggregation.QuantileStates;
import org.elasticsearch.compute.data.ConstantNullBlock;
import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -134,6 +136,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
Expand Down Expand Up @@ -2628,6 +2631,91 @@ public void testEvalAfterStats() {
assertThat(Expressions.names(eval.output()), contains("x"));
}

/**
* Test for <code>PruneColumns</code>
* <pre>{@code
* EsqlProject[[c{r}#8]]
* \_Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN]) AS c#8]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate() {
var plan = optimizedPlan("""
row a = 1
| stats b = max(a)
| inline stats c = count(*)
| drop b
""");
var project = as(plan, Project.class);
var limit = asLimit(project.child(), 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), ConstantNullBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var StubRelation = as(right.child(), StubRelation.class);
}

/**
* Test for <code>PropagateEmptyRelation</code>
* <pre>{@code
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[COUNT(b{r}#6,true[BOOLEAN]) AS c#9]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate2() {
var plan = optimizedPlan("""
row a = 12
| where false
| stats b = max(a)
| inline stats c = count(b)
""");
var limit = asLimit(plan, 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), IntArrayBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var stubRelation = as(right.child(), StubRelation.class);
}

/**
* Test for <code>ReplaceStatsFilteredAggWithEval</code>
* <pre>{@code
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[MAX(b{r}#6,true[BOOLEAN]) AS c#9]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate3() {
var plan = optimizedPlan("""
row a= 12
| stats b = sum(a) where false
| inline stats c = max(b)
""");
var limit = asLimit(plan, 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), ConstantNullBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var stubRelation = as(right.child(), StubRelation.class);
}

/**
* Expects
* <pre>{@code
Expand Down