Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122257.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122257
summary: Revive inlinestats
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// TODO: re-enable the commented tests once the Join functionality stabilizes
//

maxOfInt-Ignore
maxOfInt
required_capability: join_planning_v1
Copy link
Contributor

@alex-spies alex-spies Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we'll need a new capability to avoid running this with old nodes on bwc tests.

// tag::max-languages[]
FROM employees
Expand Down Expand Up @@ -57,7 +57,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
10030 | 394597613 | M | 394597613
;

maxOfLong-Ignore
maxOfLong
required_capability: join_planning_v1

FROM employees
Expand Down Expand Up @@ -112,7 +112,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | l:keyword | max_a
10087 | 305782871 | Eugenio | E | 305782871
;

maxOfLongByCalculatedDroppedKeyword
maxOfLongByCalculatedDroppedKeyword-Ignore
required_capability: join_planning_v1

FROM employees
Expand Down Expand Up @@ -205,7 +205,7 @@ emp_no:integer | languages:integer | avg_worked_seconds:long | gender:keyword |
10007 | 4 | 393084805 | F | 2.863684210555556E8 | 5
;

byMultivaluedSimple
byMultivaluedSimple-Ignore
required_capability: join_planning_v1

// tag::mv-group[]
Expand All @@ -223,7 +223,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
// end::mv-group-result[]
;

byMultivaluedMvExpand
byMultivaluedMvExpand-Ignore
required_capability: join_planning_v1

// tag::mv-expand[]
Expand All @@ -243,7 +243,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
// end::mv-expand-result[]
;

byMvExpand
byMvExpand-Ignore
required_capability: join_planning_v1

// tag::extreme-airports[]
Expand Down Expand Up @@ -307,7 +307,7 @@ count:long | country:keyword | avg:double
17 | United Kingdom | 4.455
;

afterWhere
afterWhere-Ignore
required_capability: join_planning_v1

FROM airports
Expand All @@ -330,7 +330,7 @@ required_capability: join_planning_v1

FROM airports
| RENAME scalerank AS int
| LOOKUP int_number_names ON int
| LOOKUP JOIN int_number_names ON int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is incorrect, this test is still disabled and used to use LOOKUP_ 🐔 in the past. (But we probably also want a version of this with LOOKUP JOIN.)

| RENAME name as scalerank
| DROP int
| INLINESTATS count=COUNT(*) BY scalerank
Expand Down Expand Up @@ -366,7 +366,7 @@ abbrev:keyword | city:keyword | region:text | "COUNT(*)":long
FUK | Fukuoka | 中央区 | 2
;

beforeStats-Ignore
beforeStats
required_capability: join_planning_v1

FROM airports
Expand All @@ -379,7 +379,7 @@ northern:long | southern:long
520 | 371
;

beforeKeepSort
beforeKeepSort-Ignore
required_capability: join_planning_v1

FROM employees
Expand All @@ -394,7 +394,7 @@ emp_no:integer | languages:integer | max_salary:integer
10003 | 4 | 74572
;

beforeKeepWhere
beforeKeepWhere-Ignore
required_capability: join_planning_v1

FROM employees
Expand Down Expand Up @@ -537,7 +537,7 @@ emp_no:integer | one:integer
10005 | 1
;

percentile-Ignore
percentile
required_capability: join_planning_v1

FROM employees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;

Expand Down Expand Up @@ -71,10 +70,9 @@ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
List<Alias> aliases = new ArrayList<>(schema.size());
for (int i = 0; i < schema.size(); i++) {
Attribute attr = schema.get(i);
aliases.add(new Alias(attr.source(), attr.name(), Literal.of(attr, BlockUtils.toJavaObject(blocks[i], 0))));
aliases.add(new Alias(attr.source(), attr.name(), Literal.of(attr, BlockUtils.toJavaObject(blocks[i], 0)), attr.id()));
}
LogicalPlan left = target.left();
return new Project(target.source(), left, CollectionUtils.combine(left.output(), aliases));
return new Eval(target.source(), target.left(), aliases);
} else {
return target.replaceRight(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
Expand Down Expand Up @@ -178,6 +179,10 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
}

if (join instanceof InlineJoin) {
return new FragmentExec(bp);
}
Comment on lines +182 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is odd. What do we need this for? If this is a quick hack, can we add a TODO comment?

I'd think this would prevent the if (right instanceof LocalSourceExec localData) { branch below from ever triggering, which used to be the branch that mapped a physical operation to INLINESTATS.

It seems like this, instead, pushes the execution of the inline join into the data nodes, which cannot always be true - e.g. there could be a regular STATS before the INLINESTATS, which requries execution on the data node. Or just a LIMIT/SORT, or any other pipeline breaker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the inlinestats has been rewritten to allow lookup join to work in a more correct way, what was left for inlinestats was an incomplete solution (inlinestats.csv-spec was ignored in its entirety). EsqlSession code flow expects a fragment for inlinestats that contains logical plans for its right and left branches. It's planning then the right branch, executes it, then it comes back to its left branch and uses the right results as constant blocks in the left logical plan tree.

There are still many queries muted in the inlinestats.csv-spec test and I don't want to go the rabbit hole and try to fix everything in one go. I want on purpose to limit the extent of this PR and I will look further into fixing the rest in followups. If the followups lead to changing the fragment approach, that's ok.


PhysicalPlan left = map(bp.left());

// only broadcast joins supported for now - hence push down as a streaming operator
Expand Down