Skip to content

Commit ffbc38b

Browse files
kanoshiouchrisparrinello
authored andcommitted
ESQL: Fix double release in inline stats when LocalRelation is reused (elastic#136467)
When the optimizer creates a `LocalRelation` using `LocalSupplier.of()` (which delegates to `ImmediateLocalSupplier`), the same page instance can be referenced and released multiple times during inline stats execution. This happens because the `LocalRelation` serves as a source for both sides of the `InlineJoin` - once for the left-hand side and again for the right-hand side, causing a double release error and connection closure. This PR modifies the `InlineJoin.firstSubPlan()` method to ensure that all `LocalRelation` nodes use `CopyingLocalSupplier` (if they don't already) after extracting the subplan. This prevents double release issues when the same page is reused across executions. Closes elastic#135679
1 parent 7cb5834 commit ffbc38b

File tree

5 files changed

+228
-11
lines changed

5 files changed

+228
-11
lines changed

docs/changelog/136467.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 136467
2+
summary: "ESQL: Fix double release in inline stats when `LocalRelation` is reused"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135679

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3990,3 +3990,179 @@ Canada |English |null |null
39903990
Mv-Land |Mv-Lang |null |null
39913991
Mv-Land2 |Mv-Lang2 |null |null
39923992
;
3993+
3994+
// PruneColumns
3995+
inlineStatsAfterPruningAggregate
3996+
required_capability: inline_stats_double_release_fix
3997+
3998+
row a = 1
3999+
| stats b = max(a)
4000+
| inline stats c = count(*)
4001+
| drop b
4002+
;
4003+
4004+
c:long
4005+
1
4006+
;
4007+
4008+
// PropagateEmptyRelation
4009+
inlineStatsAfterPruningAggregate2
4010+
required_capability: inline_stats_double_release_fix
4011+
4012+
row a = 12
4013+
| where false
4014+
| stats b = max(a)
4015+
| inline stats c = count(b)
4016+
;
4017+
4018+
b:integer | c:long
4019+
null | 0
4020+
;
4021+
4022+
4023+
// ReplaceStatsFilteredAggWithEval
4024+
inlineStatsAfterPruningAggregate3
4025+
required_capability: inline_stats_double_release_fix
4026+
4027+
row a= 12
4028+
| stats b = sum(a) where false
4029+
| inline stats c = max(b)
4030+
;
4031+
4032+
b:long | c:long
4033+
null | null
4034+
;
4035+
4036+
4037+
inlineStatsAfterPruningAggregate4
4038+
required_capability: inline_stats_double_release_fix
4039+
4040+
from k8s
4041+
| stats PRezTcny = median(network.cost), `network.cost` = count(network.cost)
4042+
| drop network.cost
4043+
| inline stats QaoRGYyf = count(*)
4044+
| drop `PRezTcny`
4045+
;
4046+
4047+
QaoRGYyf:long
4048+
1
4049+
;
4050+
4051+
4052+
inlineStatsAfterPruningAggregate5
4053+
// https://github.com/elastic/elasticsearch/issues/135679
4054+
required_capability: inline_stats_double_release_fix
4055+
required_capability: join_lookup_v12
4056+
4057+
from colors,airports_web,boo*
4058+
| rename scalerank as language_code
4059+
| lookup join languages_lookup on language_code
4060+
| keep `ratings`, `rgb_vector`
4061+
| mv_expand ratings
4062+
| keep `ratings`, `rgb_vector`
4063+
| inline stats rycaPYFzpAId = absent(ratings), WkdxfjwuR = count(ratings) + avg(ratings)
4064+
| rename ratings AS `WkdxfjwuR`
4065+
| inline stats WkdxfjwuR = sum(WkdxfjwuR) + median(WkdxfjwuR), sjivSBvV = count(*)
4066+
| sort sjivSBvV NULLS FIRST
4067+
| where false OR NOT true OR false OR NOT false
4068+
| where false OR false AND false AND NOT true AND false
4069+
| stats rycaPYFzpAId = median(sjivSBvV) + count(sjivSBvV) + sum(sjivSBvV), NUiLBJXQoMXd = present(sjivSBvV)
4070+
| mv_expand `NUiLBJXQoMXd`
4071+
| inline stats NUiLBJXQoMXd = count(rycaPYFzpAId)
4072+
;
4073+
4074+
rycaPYFzpAId:double | NUiLBJXQoMXd:long
4075+
null | 0
4076+
;
4077+
4078+
4079+
inlineStatsAfterPruningAggregate6
4080+
// https://github.com/elastic/elasticsearch/issues/135679
4081+
required_capability: inline_stats_double_release_fix
4082+
required_capability: join_lookup_v12
4083+
4084+
from d*,*
4085+
| rename scalerank as other2
4086+
| rename author.keyword as name_str
4087+
| rename intersects as is_active_bool
4088+
| rename year as id_int
4089+
| rename status as other1
4090+
| lookup join multi_column_joinable_lookup on other2, name_str, is_active_bool, id_int, other1
4091+
| rename street AS `huZQQIKVli`, `network.eth0.currently_connected_clients` AS is_rehired
4092+
| eval GhMzAVXivNI = message.raw, VDJKNsopeKQq = substring(threat_level, 1, 3), xeUVJNra = length(huZQQIKVli), kOQjzWgZNhwI = false
4093+
| where false AND NOT false AND true AND NOT false
4094+
| CHANGE_POINT salary_change.int ON birth_date AS UpYfyPcYFkes, EwnNgKIrDEKS
4095+
| sort owner.name DESC NULLS LAST, message.raw, country.keyword DESC NULLS LAST, UpYfyPcYFkes ASC NULLS FIRST, name_str ASC NULLS FIRST, num, event_duration DESC NULLS LAST, extra2 DESC, languages.byte ASC NULLS FIRST, client.ip NULLS LAST, ip1, collection NULLS FIRST, city.country.name DESC, destination.IP DESC NULLS LAST, host.version DESC, hex_code ASC NULLS LAST, language.code ASC NULLS LAST, language.name ASC, details DESC, language_name DESC NULLS FIRST, event ASC, salary_change.keyword DESC NULLS FIRST, lk ASC
4096+
| stats CLnBmEGor = sum(height.double) + sum(height.double) + avg(height.double), JLSxMYld = count(extra2) + max(extra2) + avg(extra2), `network.total_bytes_out` = min(language_code_integer), `version` = values(env), contains = min(languages.short) + min(languages.short)
4097+
| mv_expand `CLnBmEGor`
4098+
| dissect version "%{JLSxMYld} %{LTMJrburyt}"
4099+
| enrich languages_policy on LTMJrburyt
4100+
| inline stats QyCBayGHtAF = count(CLnBmEGor), yUMkeLezSN = present(contains + network.total_bytes_out), `JLSxMYld` = max(contains), GYbfUvbf = count(*)
4101+
;
4102+
4103+
CLnBmEGor:double | network.total_bytes_out:integer | version:keyword | contains:integer | LTMJrburyt:keyword | language_name:keyword | QyCBayGHtAF:long | yUMkeLezSN:boolean | JLSxMYld:integer | GYbfUvbf:long
4104+
null | null | null | null | null | null | 0 | false | null | 1
4105+
;
4106+
4107+
4108+
inlineStatsAfterPruningAggregate7
4109+
// https://github.com/elastic/elasticsearch/issues/135679
4110+
required_capability: inline_stats_double_release_fix
4111+
required_capability: join_lookup_v12
4112+
required_capability: fork_v9
4113+
4114+
from app_log*
4115+
| FORK (where false OR NOT true) (where true) (where true) (where false OR true) (where false) (where true | mv_expand @timestamp)
4116+
| WHERE _fork == "fork2"
4117+
| DROP _fork
4118+
| rename service_id as name_str
4119+
| lookup join multi_column_joinable_lookup on name_str
4120+
| inline stats `is_active_bool` = count(id_int) + max(id_int), siofAMlpoHvh = max(id_int) + max(id_int) + max(id_int), `message` = present(other1), `id_int` = max(id_int)
4121+
| stats `message` = count(is_active_bool), id_int2 = median(is_active_bool), `siofAMlpoHvh` = count(*), `id_int` = max(id_int)
4122+
| eval NUtENNcs = message, message = "zaYFcCTy", id_int2 = null, VizcTbjMnBlQ = siofAMlpoHvh, siofAMlpoHvh = siofAMlpoHvh + siofAMlpoHvh + siofAMlpoHvh, gPzLgKdLfZ = null, `message` = 1099438720, sdsrveQOoi = -2373351299779434224, hpxNweCm = false, PwyOmHforRKI = -2230544247357512169
4123+
| stats VizcTbjMnBlQ = count(*), siofAMlpoHvh2 = median(sdsrveQOoi) + max(sdsrveQOoi), DUuKdsGgU = avg(PwyOmHforRKI) + count(PwyOmHforRKI), `gPzLgKdLfZ` = present(sdsrveQOoi + VizcTbjMnBlQ + PwyOmHforRKI), `siofAMlpoHvh` = count(sdsrveQOoi) + max(sdsrveQOoi)
4124+
| limit 61
4125+
| inline stats gPzLgKdLfZ = sum(VizcTbjMnBlQ), `siofAMlpoHvh2` = count(*), zvgqSiVGqli = max(VizcTbjMnBlQ), WFjgBzYRXwa = median(VizcTbjMnBlQ) + count(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ)
4126+
| keep `VizcTbjMnBlQ`
4127+
| inline stats wXPJguff = median(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ) + max(VizcTbjMnBlQ)
4128+
;
4129+
4130+
VizcTbjMnBlQ:long | wXPJguff:double
4131+
1 | 3.0
4132+
;
4133+
4134+
4135+
inlineStatsAfterPruningAggregate8
4136+
// https://github.com/elastic/elasticsearch/issues/135679
4137+
required_capability: inline_stats_double_release_fix
4138+
4139+
from dense_vector,airport_city_boundaries
4140+
| eval kMvNZVQSH = -4048759096317256968, GzAYNHaHuIS = 4262770411405329967, EwPpQFmOPoB = "lkdgnbTrBT", `abbrev` = "LiXI"
4141+
| keep `city`, `kMvNZVQSH`, GzAYNHaHuIS, city_boundary, EwPpQFmOPoB, `city_boundary`, `id`, kMvNZVQSH, id, airport
4142+
| limit 6888
4143+
| keep `kMvNZVQSH`, EwPpQFmOPoB, city, `id`, kMvNZVQSH
4144+
| stats `EwPpQFmOPoB2` = min(id), `EwPpQFmOPoB3` = count(*), kZiJPhNZ = count(id), ErGhdKhdv = count_distinct(city), EwPpQFmOPoB = min(id)
4145+
| eval ErGhdKhdv = null, fHSyxniFCS = -710891486, ErGhdKhdv = -1104488048, zdxdQdVIyFh = -1579062572, `kZiJPhNZ` = null, SzmOySyUh = null, eWVVRiwP = 1092261898855405071, nUxVhGhcpkiV = true, RvqGioBAAzYs = 169356242
4146+
| eval `kZiJPhNZ` = "gWJHDTcDMjRVSnlzSX", lxLgaagX = 4700531997851493340, `kZiJPhNZ` = false
4147+
| keep `zdxdQdVIyFh`
4148+
| inline stats `zdxdQdVIyFh` = count(*), WbWVJnYJ = count(*), zdxdQdVIyFh2 = max(zdxdQdVIyFh), uKjfeKTfaH = max(zdxdQdVIyFh), QpovVxjMWSjh = min(zdxdQdVIyFh)
4149+
;
4150+
4151+
zdxdQdVIyFh:long | WbWVJnYJ:long | zdxdQdVIyFh2:integer | uKjfeKTfaH:integer | QpovVxjMWSjh:integer
4152+
1 | 1 | -1579062572 | -1579062572 | -1579062572
4153+
;
4154+
4155+
4156+
inlineStatsAfterPruningAggregate9
4157+
required_capability: inline_stats_double_release_fix
4158+
4159+
from employees
4160+
| stats m = count(emp_no)
4161+
| eval c = 0
4162+
| keep c
4163+
| inline stats c = count(*)
4164+
;
4165+
4166+
c:long
4167+
1
4168+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,6 +1521,11 @@ public enum Cap {
15211521
*/
15221522
GROK_MULTI_PATTERN,
15231523

1524+
/**
1525+
* Fix double release in inline stats when LocalRelation is reused
1526+
*/
1527+
INLINE_STATS_DOUBLE_RELEASE_FIX(INLINESTATS_V11.enabled)
1528+
15241529
;
15251530

15261531
private final boolean enabled;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1313
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1414
import org.elasticsearch.xpack.esql.plan.logical.Row;
15-
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
1615
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
16+
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
1717
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
1818

1919
import java.util.ArrayList;
@@ -30,6 +30,6 @@ protected LogicalPlan rule(Row row, LogicalOptimizerContext context) {
3030
List<Object> values = new ArrayList<>(fields.size());
3131
fields.forEach(f -> values.add(f.child().fold(context.foldCtx())));
3232
var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
33-
return new LocalRelation(row.source(), row.output(), new CopyingLocalSupplier(blocks.length == 0 ? new Page(0) : new Page(blocks)));
33+
return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks.length == 0 ? new Page(0) : new Page(blocks)));
3434
}
3535
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2525
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2626
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
2728
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2829

2930
import java.io.IOException;
@@ -130,6 +131,12 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
130131
* Finds the "first" (closest to the source command or bottom up in the tree) {@link InlineJoin}, replaces the {@link StubRelation}
131132
* of the right-hand side with left-hand side's source and returns a tuple.
132133
*
134+
* <p>After extracting the subplan, the method ensures that all {@link LocalRelation} nodes use {@link CopyingLocalSupplier}
135+
* to avoid double release when the same page is reused across executions. This guarantees correctness when substituting
136+
* the subplan result back into the main plan.
137+
*
138+
* <p>Example transformation:
139+
* <pre>
133140
* Original optimized plan:
134141
* Limit[1000[INTEGER],true]
135142
* \_InlineJoin[LEFT,[],[],[]]
@@ -146,33 +153,56 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
146153
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
147154
* \_Limit[1000[INTEGER],false]
148155
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
156+
* </pre>
157+
*
158+
* <p>In the end the method returns a tuple like this:
159+
* <pre>
160+
* LogicalPlanTuple[
161+
* stubReplacedSubPlan=
162+
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
163+
* \_LocalRelation[[x{r}#99],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@7c1]
164+
* originalSubPlan=
165+
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
166+
* \_StubRelation[[x{r}#99]]
167+
* ]
168+
* </pre>
149169
*/
150170
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan, Set<LocalRelation> subPlansResults) {
151-
Holder<LogicalPlanTuple> subPlan = new Holder<>();
171+
final Holder<LogicalPlan> stubReplacedSubPlanHolder = new Holder<>();
172+
final Holder<LogicalPlan> originalSubPlanHolder = new Holder<>();
152173
// Collect the first inlinejoin (bottom up in the tree)
153174
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
154175
// extract the right side of the plan and replace its source
155-
if (subPlan.get() == null) {
176+
if (stubReplacedSubPlanHolder.get() == null) {
156177
if (ij.right().anyMatch(p -> p instanceof StubRelation)) {
157-
var p = replaceStub(ij.left(), ij.right());
158-
p.setOptimized();
159-
subPlan.set(new LogicalPlanTuple(p, ij.right()));
178+
stubReplacedSubPlanHolder.set(replaceStub(ij.left(), ij.right()));
160179
} else if (ij.right() instanceof LocalRelation relation
161180
&& (subPlansResults.isEmpty() || subPlansResults.contains(relation) == false)
162181
|| ij.right() instanceof LocalRelation == false && ij.right().anyMatch(p -> p instanceof LocalRelation)) {
163182
// In case the plan was optimized further and the StubRelation was replaced with a LocalRelation
164183
// or the right hand side became a LocalRelation alltogether, there is no need to replace the source of the
165184
// right-hand side anymore.
166-
var p = ij.right();
167-
p.setOptimized();
168-
subPlan.set(new LogicalPlanTuple(p, ij.right()));
185+
stubReplacedSubPlanHolder.set(ij.right());
169186
// TODO: INLINE STATS this is essentially an optimization similar to the one in PruneInlineJoinOnEmptyRightSide
170187
// this further supports the idea of running the optimization step again after the substitutions (see EsqlSession
171188
// executeSubPlan() method where we could run the optimizer after the results are replaced in place).
172189
}
190+
originalSubPlanHolder.set(ij.right());
173191
}
174192
});
175-
return subPlan.get();
193+
LogicalPlanTuple tuple = null;
194+
var plan = stubReplacedSubPlanHolder.get();
195+
if (plan != null) {
196+
plan = plan.transformUp(LocalRelation.class, lr -> {
197+
if (lr.supplier() instanceof CopyingLocalSupplier == false) {
198+
return new LocalRelation(lr.source(), lr.output(), new CopyingLocalSupplier(lr.supplier().get()));
199+
}
200+
return lr;
201+
});
202+
plan.setOptimized();
203+
tuple = new LogicalPlanTuple(plan, originalSubPlanHolder.get());
204+
}
205+
return tuple;
176206
}
177207

178208
public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {

0 commit comments

Comments
 (0)