Skip to content

Commit 11e7b4c

Browse files
authored
ESQL: Fix double release in inline stats when LocalRelation is reused (#136848)
Backports #136467
1 parent 25d8845 commit 11e7b4c

File tree

5 files changed

+201
-11
lines changed

5 files changed

+201
-11
lines changed

docs/changelog/136467.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 136467
2+
summary: "ESQL: Fix double release in inline stats when `LocalRelation` is reused"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135679

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3990,3 +3990,152 @@ Canada |English |null |null
39903990
Mv-Land |Mv-Lang |null |null
39913991
Mv-Land2 |Mv-Lang2 |null |null
39923992
;
3993+
3994+
// PruneColumns
3995+
inlineStatsAfterPruningAggregate
3996+
required_capability: inline_stats_double_release_fix
3997+
3998+
row a = 1
3999+
| stats b = max(a)
4000+
| inline stats c = count(*)
4001+
| drop b
4002+
;
4003+
4004+
c:long
4005+
1
4006+
;
4007+
4008+
// PropagateEmptyRelation
4009+
inlineStatsAfterPruningAggregate2
4010+
required_capability: inline_stats_double_release_fix
4011+
4012+
row a = 12
4013+
| where false
4014+
| stats b = max(a)
4015+
| inline stats c = count(b)
4016+
;
4017+
4018+
b:integer | c:long
4019+
null | 0
4020+
;
4021+
4022+
4023+
// ReplaceStatsFilteredAggWithEval
4024+
inlineStatsAfterPruningAggregate3
4025+
required_capability: inline_stats_double_release_fix
4026+
4027+
row a= 12
4028+
| stats b = sum(a) where false
4029+
| inline stats c = max(b)
4030+
;
4031+
4032+
b:long | c:long
4033+
null | null
4034+
;
4035+
4036+
4037+
inlineStatsAfterPruningAggregate4
4038+
required_capability: inline_stats_double_release_fix
4039+
4040+
from k8s
4041+
| stats PRezTcny = median(network.cost), `network.cost` = count(network.cost)
4042+
| drop network.cost
4043+
| inline stats QaoRGYyf = count(*)
4044+
| drop `PRezTcny`
4045+
;
4046+
4047+
QaoRGYyf:long
4048+
1
4049+
;
4050+
4051+
4052+
inlineStatsAfterPruningAggregate5
4053+
// https://github.com/elastic/elasticsearch/issues/135679
4054+
required_capability: inline_stats_double_release_fix
4055+
required_capability: join_lookup_v12
4056+
4057+
from colors,airports_web,boo*
4058+
| rename scalerank as language_code
4059+
| lookup join languages_lookup on language_code
4060+
| keep `ratings`, `rgb_vector`
4061+
| mv_expand ratings
4062+
| keep `ratings`, `rgb_vector`
4063+
| inline stats rycaPYFzpAId = absent(ratings), WkdxfjwuR = count(ratings) + avg(ratings)
4064+
| rename ratings AS `WkdxfjwuR`
4065+
| inline stats WkdxfjwuR = sum(WkdxfjwuR) + median(WkdxfjwuR), sjivSBvV = count(*)
4066+
| sort sjivSBvV NULLS FIRST
4067+
| where false OR NOT true OR false OR NOT false
4068+
| where false OR false AND false AND NOT true AND false
4069+
| stats rycaPYFzpAId = median(sjivSBvV) + count(sjivSBvV) + sum(sjivSBvV), NUiLBJXQoMXd = present(sjivSBvV)
4070+
| mv_expand `NUiLBJXQoMXd`
4071+
| inline stats NUiLBJXQoMXd = count(rycaPYFzpAId)
4072+
;
4073+
4074+
rycaPYFzpAId:double | NUiLBJXQoMXd:long
4075+
null | 0
4076+
;
4077+
4078+
4079+
inlineStatsAfterPruningAggregate6
4080+
// https://github.com/elastic/elasticsearch/issues/135679
4081+
required_capability: inline_stats_double_release_fix
4082+
required_capability: join_lookup_v12
4083+
4084+
from d*,*
4085+
| rename scalerank as other2
4086+
| rename author.keyword as name_str
4087+
| rename intersects as is_active_bool
4088+
| rename year as id_int
4089+
| rename status as other1
4090+
| lookup join multi_column_joinable_lookup on other2, name_str, is_active_bool, id_int, other1
4091+
| rename street AS `huZQQIKVli`, `network.eth0.currently_connected_clients` AS is_rehired
4092+
| eval GhMzAVXivNI = message.raw, VDJKNsopeKQq = substring(threat_level, 1, 3), xeUVJNra = length(huZQQIKVli), kOQjzWgZNhwI = false
4093+
| where false AND NOT false AND true AND NOT false
4094+
| CHANGE_POINT salary_change.int ON birth_date AS UpYfyPcYFkes, EwnNgKIrDEKS
4095+
| sort owner.name DESC NULLS LAST, message.raw, country.keyword DESC NULLS LAST, UpYfyPcYFkes ASC NULLS FIRST, name_str ASC NULLS FIRST, num, event_duration DESC NULLS LAST, extra2 DESC, languages.byte ASC NULLS FIRST, client.ip NULLS LAST, ip1, collection NULLS FIRST, city.country.name DESC, destination.IP DESC NULLS LAST, host.version DESC, hex_code ASC NULLS LAST, language.code ASC NULLS LAST, language.name ASC, details DESC, language_name DESC NULLS FIRST, event ASC, salary_change.keyword DESC NULLS FIRST, lk ASC
4096+
| stats CLnBmEGor = sum(height.double) + sum(height.double) + avg(height.double), JLSxMYld = count(extra2) + max(extra2) + avg(extra2), `network.total_bytes_out` = min(language_code_integer), `version` = values(env), contains = min(languages.short) + min(languages.short)
4097+
| mv_expand `CLnBmEGor`
4098+
| dissect version "%{JLSxMYld} %{LTMJrburyt}"
4099+
| enrich languages_policy on LTMJrburyt
4100+
| inline stats QyCBayGHtAF = count(CLnBmEGor), yUMkeLezSN = present(contains + network.total_bytes_out), `JLSxMYld` = max(contains), GYbfUvbf = count(*)
4101+
;
4102+
4103+
CLnBmEGor:double | network.total_bytes_out:integer | version:keyword | contains:integer | LTMJrburyt:keyword | language_name:keyword | QyCBayGHtAF:long | yUMkeLezSN:boolean | JLSxMYld:integer | GYbfUvbf:long
4104+
null | null | null | null | null | null | 0 | false | null | 1
4105+
;
4106+
4107+
4108+
inlineStatsAfterPruningAggregate8
4109+
// https://github.com/elastic/elasticsearch/issues/135679
4110+
required_capability: inline_stats_double_release_fix
4111+
4112+
from dense_vector,airport_city_boundaries
4113+
| eval kMvNZVQSH = -4048759096317256968, GzAYNHaHuIS = 4262770411405329967, EwPpQFmOPoB = "lkdgnbTrBT", `abbrev` = "LiXI"
4114+
| keep `city`, `kMvNZVQSH`, GzAYNHaHuIS, city_boundary, EwPpQFmOPoB, `city_boundary`, `id`, kMvNZVQSH, id, airport
4115+
| limit 6888
4116+
| keep `kMvNZVQSH`, EwPpQFmOPoB, city, `id`, kMvNZVQSH
4117+
| stats `EwPpQFmOPoB2` = min(id), `EwPpQFmOPoB3` = count(*), kZiJPhNZ = count(id), ErGhdKhdv = count_distinct(city), EwPpQFmOPoB = min(id)
4118+
| eval ErGhdKhdv = null, fHSyxniFCS = -710891486, ErGhdKhdv = -1104488048, zdxdQdVIyFh = -1579062572, `kZiJPhNZ` = null, SzmOySyUh = null, eWVVRiwP = 1092261898855405071, nUxVhGhcpkiV = true, RvqGioBAAzYs = 169356242
4119+
| eval `kZiJPhNZ` = "gWJHDTcDMjRVSnlzSX", lxLgaagX = 4700531997851493340, `kZiJPhNZ` = false
4120+
| keep `zdxdQdVIyFh`
4121+
| inline stats `zdxdQdVIyFh` = count(*), WbWVJnYJ = count(*), zdxdQdVIyFh2 = max(zdxdQdVIyFh), uKjfeKTfaH = max(zdxdQdVIyFh), QpovVxjMWSjh = min(zdxdQdVIyFh)
4122+
;
4123+
4124+
zdxdQdVIyFh:long | WbWVJnYJ:long | zdxdQdVIyFh2:integer | uKjfeKTfaH:integer | QpovVxjMWSjh:integer
4125+
1 | 1 | -1579062572 | -1579062572 | -1579062572
4126+
;
4127+
4128+
4129+
inlineStatsAfterPruningAggregate9
4130+
required_capability: inline_stats_double_release_fix
4131+
4132+
from employees
4133+
| stats m = count(emp_no)
4134+
| eval c = 0
4135+
| keep c
4136+
| inline stats c = count(*)
4137+
;
4138+
4139+
c:long
4140+
1
4141+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,11 @@ public enum Cap {
16151615
*/
16161616
FIX_FILTER_ORDINALS,
16171617

1618+
/**
1619+
* Fix double release in inline stats when LocalRelation is reused
1620+
*/
1621+
INLINE_STATS_DOUBLE_RELEASE_FIX(INLINESTATS_V11.enabled)
1622+
16181623
;
16191624

16201625
private final boolean enabled;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1212
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1313
import org.elasticsearch.xpack.esql.plan.logical.Row;
14-
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
1514
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
15+
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
1616
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
1717

1818
import java.util.ArrayList;
@@ -29,6 +29,6 @@ protected LogicalPlan rule(Row row, LogicalOptimizerContext context) {
2929
List<Object> values = new ArrayList<>(fields.size());
3030
fields.forEach(f -> values.add(f.child().fold(context.foldCtx())));
3131
var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
32-
return new LocalRelation(row.source(), row.output(), new CopyingLocalSupplier(blocks));
32+
return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks));
3333
}
3434
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2525
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2626
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
2728
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2829

2930
import java.io.IOException;
@@ -128,6 +129,12 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
128129
* Finds the "first" (closest to the source command or bottom up in the tree) {@link InlineJoin}, replaces the {@link StubRelation}
129130
* of the right-hand side with left-hand side's source and returns a tuple.
130131
*
132+
* <p>After extracting the subplan, the method ensures that all {@link LocalRelation} nodes use {@link CopyingLocalSupplier}
133+
* to avoid double release when the same page is reused across executions. This guarantees correctness when substituting
134+
* the subplan result back into the main plan.
135+
*
136+
* <p>Example transformation:
137+
* <pre>
131138
* Original optimized plan:
132139
* Limit[1000[INTEGER],true]
133140
* \_InlineJoin[LEFT,[],[],[]]
@@ -144,33 +151,56 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
144151
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
145152
* \_Limit[1000[INTEGER],false]
146153
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
154+
* </pre>
155+
*
156+
* <p>In the end the method returns a tuple like this:
157+
* <pre>
158+
* LogicalPlanTuple[
159+
* stubReplacedSubPlan=
160+
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
161+
* \_LocalRelation[[x{r}#99],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@7c1]
162+
* originalSubPlan=
163+
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
164+
* \_StubRelation[[x{r}#99]]
165+
* ]
166+
* </pre>
147167
*/
148168
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan, Set<LocalRelation> subPlansResults) {
149-
Holder<LogicalPlanTuple> subPlan = new Holder<>();
169+
final Holder<LogicalPlan> stubReplacedSubPlanHolder = new Holder<>();
170+
final Holder<LogicalPlan> originalSubPlanHolder = new Holder<>();
150171
// Collect the first inlinejoin (bottom up in the tree)
151172
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
152173
// extract the right side of the plan and replace its source
153-
if (subPlan.get() == null) {
174+
if (stubReplacedSubPlanHolder.get() == null) {
154175
if (ij.right().anyMatch(p -> p instanceof StubRelation)) {
155-
var p = replaceStub(ij.left(), ij.right());
156-
p.setOptimized();
157-
subPlan.set(new LogicalPlanTuple(p, ij.right()));
176+
stubReplacedSubPlanHolder.set(replaceStub(ij.left(), ij.right()));
158177
} else if (ij.right() instanceof LocalRelation relation
159178
&& (subPlansResults.isEmpty() || subPlansResults.contains(relation) == false)
160179
|| ij.right() instanceof LocalRelation == false && ij.right().anyMatch(p -> p instanceof LocalRelation)) {
161180
// In case the plan was optimized further and the StubRelation was replaced with a LocalRelation
162181
// or the right hand side became a LocalRelation alltogether, there is no need to replace the source of the
163182
// right-hand side anymore.
164-
var p = ij.right();
165-
p.setOptimized();
166-
subPlan.set(new LogicalPlanTuple(p, ij.right()));
183+
stubReplacedSubPlanHolder.set(ij.right());
167184
// TODO: INLINE STATS this is essentially an optimization similar to the one in PruneInlineJoinOnEmptyRightSide
168185
// this further supports the idea of running the optimization step again after the substitutions (see EsqlSession
169186
// executeSubPlan() method where we could run the optimizer after the results are replaced in place).
170187
}
188+
originalSubPlanHolder.set(ij.right());
171189
}
172190
});
173-
return subPlan.get();
191+
LogicalPlanTuple tuple = null;
192+
var plan = stubReplacedSubPlanHolder.get();
193+
if (plan != null) {
194+
plan = plan.transformUp(LocalRelation.class, lr -> {
195+
if (lr.supplier() instanceof CopyingLocalSupplier == false) {
196+
return new LocalRelation(lr.source(), lr.output(), new CopyingLocalSupplier(lr.supplier().get()));
197+
}
198+
return lr;
199+
});
200+
plan.setOptimized();
201+
tuple = new LogicalPlanTuple(plan, originalSubPlanHolder.get());
202+
}
203+
return tuple;
174204
}
175205

176206
public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {

0 commit comments

Comments
 (0)