diff --git a/docs/changelog/136467.yaml b/docs/changelog/136467.yaml new file mode 100644 index 0000000000000..9f7d34ed460a9 --- /dev/null +++ b/docs/changelog/136467.yaml @@ -0,0 +1,6 @@ +pr: 136467 +summary: "ESQL: Fix double release in inline stats when `LocalRelation` is reused" +area: ES|QL +type: bug +issues: + - 135679 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index e6dbfa1950764..e96fe9f02a89f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -3990,3 +3990,184 @@ Canada |English |null |null Mv-Land |Mv-Lang |null |null Mv-Land2 |Mv-Lang2 |null |null ; + +// PruneColumns +inlineStatsAfterPruningAggregate +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +row a = 1 +| stats b = max(a) +| inline stats c = count(*) +| drop b +; + +c:long +1 +; + +// PropagateEmptyRelation +inlineStatsAfterPruningAggregate2 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +row a = 12 +| where false +| stats b = max(a) +| inline stats c = count(b) +; + +b:integer | c:long +null | 0 +; + + +// ReplaceStatsFilteredAggWithEval +inlineStatsAfterPruningAggregate3 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +row a= 12 +| stats b = sum(a) where false +| inline stats c = max(b) +; + +b:long | c:long +null | null +; + + +inlineStatsAfterPruningAggregate4 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +from k8s +| stats PRezTcny = median(network.cost), `network.cost` = count(network.cost) +| drop network.cost +| inline stats QaoRGYyf = count(*) +| drop `PRezTcny` +; + +QaoRGYyf:long +1 +; + + +inlineStatsAfterPruningAggregate5 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate +required_capability: join_lookup_v12 + +from colors,airports_web,boo* +| rename scalerank as language_code +| lookup join languages_lookup on language_code +| keep `ratings`, `rgb_vector` +| mv_expand ratings +| keep `ratings`, `rgb_vector` +| inline stats rycaPYFzpAId = absent(ratings), WkdxfjwuR = count(ratings) + avg(ratings) +| rename ratings AS `WkdxfjwuR` +| inline stats WkdxfjwuR = sum(WkdxfjwuR) + median(WkdxfjwuR), sjivSBvV = count(*) +| sort sjivSBvV NULLS FIRST +| where false OR NOT true OR false OR NOT false +| where false OR false AND false AND NOT true AND false +| stats rycaPYFzpAId = median(sjivSBvV) + count(sjivSBvV) + sum(sjivSBvV), NUiLBJXQoMXd = present(sjivSBvV) +| mv_expand `NUiLBJXQoMXd` +| inline stats NUiLBJXQoMXd = count(rycaPYFzpAId) +; + +rycaPYFzpAId:double | NUiLBJXQoMXd:long +null | 0 +; + + +inlineStatsAfterPruningAggregate6 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate +required_capability: join_lookup_v12 + +from d*,* +| rename scalerank as other2 +| rename author.keyword as name_str +| rename intersects as is_active_bool +| rename year as id_int +| rename status as other1 +| lookup join multi_column_joinable_lookup on other2, name_str, is_active_bool, id_int, other1 +| rename street AS `huZQQIKVli`, `network.eth0.currently_connected_clients` AS is_rehired +| eval GhMzAVXivNI = message.raw, VDJKNsopeKQq = substring(threat_level, 1, 3), xeUVJNra = length(huZQQIKVli), kOQjzWgZNhwI = false +| where false AND NOT false AND true AND NOT false +| CHANGE_POINT salary_change.int ON birth_date AS UpYfyPcYFkes, EwnNgKIrDEKS +| sort owner.name DESC NULLS LAST, message.raw, country.keyword DESC NULLS LAST, UpYfyPcYFkes ASC NULLS FIRST, name_str ASC NULLS FIRST, num, event_duration DESC NULLS LAST, extra2 DESC, languages.byte ASC NULLS FIRST, client.ip NULLS LAST, ip1, collection NULLS FIRST, city.country.name DESC, destination.IP DESC NULLS LAST, host.version DESC, hex_code ASC NULLS LAST, language.code ASC NULLS LAST, language.name ASC, details DESC, language_name DESC NULLS FIRST, event ASC, salary_change.keyword DESC NULLS FIRST, lk ASC +| stats CLnBmEGor = sum(height.double) + sum(height.double) + avg(height.double), JLSxMYld = count(extra2) + max(extra2) + avg(extra2), `network.total_bytes_out` = min(language_code_integer), `version` = values(env), contains = min(languages.short) + min(languages.short) +| mv_expand `CLnBmEGor` +| dissect version "%{JLSxMYld} %{LTMJrburyt}" +| enrich languages_policy on LTMJrburyt +| inline stats QyCBayGHtAF = count(CLnBmEGor), yUMkeLezSN = present(contains + network.total_bytes_out), `JLSxMYld` = max(contains), GYbfUvbf = count(*) +; + +CLnBmEGor:double | network.total_bytes_out:integer | version:keyword | contains:integer | LTMJrburyt:keyword | language_name:keyword | QyCBayGHtAF:long | yUMkeLezSN:boolean | JLSxMYld:integer | GYbfUvbf:long +null | null | null | null | null | null | 0 | false | null | 1 +; + + +inlineStatsAfterPruningAggregate7 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate +required_capability: join_lookup_v12 + +from app_log* +| FORK (where false OR NOT true) (where true) (where true) (where false OR true) (where false) (where true +| mv_expand @timestamp) +| WHERE _fork == "fork2" +| DROP _fork +| rename service_id as name_str +| lookup join multi_column_joinable_lookup on name_str +| inline stats `is_active_bool` = count(id_int) + max(id_int), siofAMlpoHvh = max(id_int) + max(id_int) + max(id_int), `message` = present(other1), `id_int` = max(id_int) +| stats `message` = count(is_active_bool), id_int2 = median(is_active_bool), `siofAMlpoHvh` = count(*), `id_int` = max(id_int) +| eval NUtENNcs = message, message = "zaYFcCTy", id_int2 = null, VizcTbjMnBlQ = siofAMlpoHvh, siofAMlpoHvh = siofAMlpoHvh + siofAMlpoHvh + siofAMlpoHvh, gPzLgKdLfZ = null, `message` = 1099438720, sdsrveQOoi = -2373351299779434224, hpxNweCm = false, PwyOmHforRKI = -2230544247357512169 +| stats VizcTbjMnBlQ = count(*), siofAMlpoHvh2 = median(sdsrveQOoi) + max(sdsrveQOoi), DUuKdsGgU = avg(PwyOmHforRKI) + count(PwyOmHforRKI), `gPzLgKdLfZ` = present(sdsrveQOoi + VizcTbjMnBlQ + PwyOmHforRKI), `siofAMlpoHvh` = count(sdsrveQOoi) + max(sdsrveQOoi) +| limit 61 +| inline stats gPzLgKdLfZ = sum(VizcTbjMnBlQ), `siofAMlpoHvh2` = count(*), zvgqSiVGqli = max(VizcTbjMnBlQ), WFjgBzYRXwa = median(VizcTbjMnBlQ) + count(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ) +| keep `VizcTbjMnBlQ` +| inline stats wXPJguff = median(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ) + max(VizcTbjMnBlQ) +; + +VizcTbjMnBlQ:long | wXPJguff:double +1 | 3.0 +; + + +inlineStatsAfterPruningAggregate8 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +from dense_vector,airport_city_boundaries +| eval kMvNZVQSH = -4048759096317256968, GzAYNHaHuIS = 4262770411405329967, EwPpQFmOPoB = "lkdgnbTrBT", `abbrev` = "LiXI" +| keep `city`, `kMvNZVQSH`, GzAYNHaHuIS, city_boundary, EwPpQFmOPoB, `city_boundary`, `id`, kMvNZVQSH, id, airport +| limit 6888 +| keep `kMvNZVQSH`, EwPpQFmOPoB, city, `id`, kMvNZVQSH +| stats `EwPpQFmOPoB2` = min(id), `EwPpQFmOPoB3` = count(*), kZiJPhNZ = count(id), ErGhdKhdv = count_distinct(city), EwPpQFmOPoB = min(id) +| eval ErGhdKhdv = null, fHSyxniFCS = -710891486, ErGhdKhdv = -1104488048, zdxdQdVIyFh = -1579062572, `kZiJPhNZ` = null, SzmOySyUh = null, eWVVRiwP = 1092261898855405071, nUxVhGhcpkiV = true, RvqGioBAAzYs = 169356242 +| eval `kZiJPhNZ` = "gWJHDTcDMjRVSnlzSX", lxLgaagX = 4700531997851493340, `kZiJPhNZ` = false +| keep `zdxdQdVIyFh` +| inline stats `zdxdQdVIyFh` = count(*), WbWVJnYJ = count(*), zdxdQdVIyFh2 = max(zdxdQdVIyFh), uKjfeKTfaH = max(zdxdQdVIyFh), QpovVxjMWSjh = min(zdxdQdVIyFh) +; + +zdxdQdVIyFh:long | WbWVJnYJ:long | zdxdQdVIyFh2:integer | uKjfeKTfaH:integer | QpovVxjMWSjh:integer +1 | 1 | -1579062572 | -1579062572 | -1579062572 +; + + +inlineStatsAfterPruningAggregate9 +required_capability: inline_stats +required_capability: inline_stats_after_pruning_aggregate + +from employees +| stats m = count(emp_no) +| eval c = 0 +| keep c +| inline stats c = count(*) +; + +c:long +1 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index bb4aeab593317..8b696af27e621 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1490,6 +1490,11 @@ public enum Cap { */ TS_PERMIT_TEXT_BECOMING_KEYWORD_WHEN_GROUPED_ON, + /** + * INLINE STATS support after aggregate pruning + */ + INLINE_STATS_AFTER_PRUNING_AGGREGATE, + /** * Fix management of plans with no columns * https://github.com/elastic/elasticsearch/issues/120272 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java index 818b507d6ba67..fb17e06c01321 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -43,7 +44,7 @@ protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) { List emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates()); p = replacePlanByRelation( plan, - LocalSupplier.of(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new))) + new CopyingLocalSupplier(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new))) ); } else { p = PruneEmptyPlans.skipPlan(plan); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index 78b7a4d439680..8b8cfc9b5394e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -26,8 +26,8 @@ import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.rule.Rule; @@ -111,7 +111,7 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut p = new LocalRelation( aggregate.source(), List.of(Expressions.attribute(aggregate.aggregates().getFirst())), - LocalSupplier.of(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1))) + new CopyingLocalSupplier(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1))) ); } else { // Aggs cannot produce pages with 0 columns, so retain one grouping. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java index 9a6176fa5628d..4097a69829280 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStatsFilteredAggWithEval.java @@ -24,8 +24,8 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import java.util.ArrayList; @@ -137,6 +137,6 @@ private static LocalRelation localRelation(Source source, List newEvals) attributes.add(alias.toAttribute()); blocks[i] = BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, ((Literal) alias.child()).value(), 1); } - return new LocalRelation(source, attributes, LocalSupplier.of(new Page(blocks))); + return new LocalRelation(source, attributes, new CopyingLocalSupplier(new Page(blocks))); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index dbf6c7b81b766..dc33c8428270c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.compute.aggregation.QuantileStates; +import org.elasticsearch.compute.data.ConstantNullBlock; +import org.elasticsearch.compute.data.IntArrayBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.TestBlockFactory; import org.elasticsearch.core.Nullable; @@ -134,6 +136,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; @@ -2628,6 +2631,91 @@ public void testEvalAfterStats() { assertThat(Expressions.names(eval.output()), contains("x")); } + /** + * Test for PruneColumns + *
{@code
+     * EsqlProject[[c{r}#8]]
+     * \_Limit[1000[INTEGER],false]
+     *   \_InlineJoin[LEFT,[],[]]
+     *     |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
+     *     \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN]) AS c#8]]
+     *       \_StubRelation[[b{r}#6]]
+     * }
+ */ + public void testInlineStatsAfterPruningAggregate() { + var plan = optimizedPlan(""" + row a = 1 + | stats b = max(a) + | inline stats c = count(*) + | drop b + """); + var project = as(plan, Project.class); + var limit = asLimit(project.child(), 1000, false); + var inlineJoin = as(limit.child(), InlineJoin.class); + var localRelation = as(inlineJoin.left(), LocalRelation.class); + var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class); + Page page = copyingLocalSupplier.get(); + assertEquals(1, page.getBlockCount()); + as(page.getBlock(0), ConstantNullBlock.class); + var right = as(inlineJoin.right(), Aggregate.class); + var StubRelation = as(right.child(), StubRelation.class); + } + + /** + * Test for PropagateEmptyRelation + *
{@code
+     * Limit[1000[INTEGER],false]
+     * \_InlineJoin[LEFT,[],[]]
+     *   |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
+     *   \_Aggregate[[],[COUNT(b{r}#6,true[BOOLEAN]) AS c#9]]
+     *     \_StubRelation[[b{r}#6]]
+     * }
+ */ + public void testInlineStatsAfterPruningAggregate2() { + var plan = optimizedPlan(""" + row a = 12 + | where false + | stats b = max(a) + | inline stats c = count(b) + """); + var limit = asLimit(plan, 1000, false); + var inlineJoin = as(limit.child(), InlineJoin.class); + var localRelation = as(inlineJoin.left(), LocalRelation.class); + var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class); + Page page = copyingLocalSupplier.get(); + assertEquals(1, page.getBlockCount()); + as(page.getBlock(0), IntArrayBlock.class); + var right = as(inlineJoin.right(), Aggregate.class); + var stubRelation = as(right.child(), StubRelation.class); + } + + /** + * Test for ReplaceStatsFilteredAggWithEval + *
{@code
+     * Limit[1000[INTEGER],false]
+     * \_InlineJoin[LEFT,[],[]]
+     *   |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
+     *   \_Aggregate[[],[MAX(b{r}#6,true[BOOLEAN]) AS c#9]]
+     *     \_StubRelation[[b{r}#6]]
+     * }
+ */ + public void testInlineStatsAfterPruningAggregate3() { + var plan = optimizedPlan(""" + row a= 12 + | stats b = sum(a) where false + | inline stats c = max(b) + """); + var limit = asLimit(plan, 1000, false); + var inlineJoin = as(limit.child(), InlineJoin.class); + var localRelation = as(inlineJoin.left(), LocalRelation.class); + var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class); + Page page = copyingLocalSupplier.get(); + assertEquals(1, page.getBlockCount()); + as(page.getBlock(0), ConstantNullBlock.class); + var right = as(inlineJoin.right(), Aggregate.class); + var stubRelation = as(right.child(), StubRelation.class); + } + /** * Expects *
{@code