Skip to content

Commit cf35e05

Browse files
authored
ESQL: Convert some PhysicalPlans to NamedWriteable (#112764)
This converts half of hte remaining `PhysicalPlan`s to `NamedWriteable` to line up better with the rest of Elasticsearch.
1 parent 6052331 commit cf35e05

17 files changed

+480
-82
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

Lines changed: 6 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@
77

88
package org.elasticsearch.xpack.esql.io.stream;
99

10-
import org.elasticsearch.TransportVersions;
1110
import org.elasticsearch.common.io.stream.NamedWriteable;
12-
import org.elasticsearch.index.query.QueryBuilder;
1311
import org.elasticsearch.xpack.esql.core.expression.Alias;
1412
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1513
import org.elasticsearch.xpack.esql.core.expression.Expression;
1614
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1715
import org.elasticsearch.xpack.esql.core.tree.Source;
18-
import org.elasticsearch.xpack.esql.plan.logical.Grok;
19-
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2016
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
2117
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
2218
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
@@ -88,12 +84,12 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
8884
of(PhysicalPlan.class, ExchangeSinkExec.ENTRY),
8985
of(PhysicalPlan.class, ExchangeSourceExec.ENTRY),
9086
of(PhysicalPlan.class, FieldExtractExec.ENTRY),
91-
of(PhysicalPlan.class, FilterExec.class, PlanNamedTypes::writeFilterExec, PlanNamedTypes::readFilterExec),
92-
of(PhysicalPlan.class, FragmentExec.class, PlanNamedTypes::writeFragmentExec, PlanNamedTypes::readFragmentExec),
93-
of(PhysicalPlan.class, GrokExec.class, PlanNamedTypes::writeGrokExec, PlanNamedTypes::readGrokExec),
94-
of(PhysicalPlan.class, LimitExec.class, PlanNamedTypes::writeLimitExec, PlanNamedTypes::readLimitExec),
95-
of(PhysicalPlan.class, LocalSourceExec.class, (out, v) -> v.writeTo(out), LocalSourceExec::new),
96-
of(PhysicalPlan.class, HashJoinExec.class, (out, v) -> v.writeTo(out), HashJoinExec::new),
87+
of(PhysicalPlan.class, FilterExec.ENTRY),
88+
of(PhysicalPlan.class, FragmentExec.ENTRY),
89+
of(PhysicalPlan.class, GrokExec.ENTRY),
90+
of(PhysicalPlan.class, LimitExec.ENTRY),
91+
of(PhysicalPlan.class, LocalSourceExec.ENTRY),
92+
of(PhysicalPlan.class, HashJoinExec.ENTRY),
9793
of(PhysicalPlan.class, MvExpandExec.class, PlanNamedTypes::writeMvExpandExec, PlanNamedTypes::readMvExpandExec),
9894
of(PhysicalPlan.class, OrderExec.class, PlanNamedTypes::writeOrderExec, PlanNamedTypes::readOrderExec),
9995
of(PhysicalPlan.class, ProjectExec.class, PlanNamedTypes::writeProjectExec, PlanNamedTypes::readProjectExec),
@@ -105,65 +101,6 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
105101
}
106102

107103
// -- physical plan nodes
108-
static FilterExec readFilterExec(PlanStreamInput in) throws IOException {
109-
return new FilterExec(Source.readFrom(in), in.readPhysicalPlanNode(), in.readNamedWriteable(Expression.class));
110-
}
111-
112-
static void writeFilterExec(PlanStreamOutput out, FilterExec filterExec) throws IOException {
113-
Source.EMPTY.writeTo(out);
114-
out.writePhysicalPlanNode(filterExec.child());
115-
out.writeNamedWriteable(filterExec.condition());
116-
}
117-
118-
static FragmentExec readFragmentExec(PlanStreamInput in) throws IOException {
119-
return new FragmentExec(
120-
Source.readFrom(in),
121-
in.readNamedWriteable(LogicalPlan.class),
122-
in.readOptionalNamedWriteable(QueryBuilder.class),
123-
in.readOptionalVInt(),
124-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readOptionalPhysicalPlanNode() : null
125-
);
126-
}
127-
128-
static void writeFragmentExec(PlanStreamOutput out, FragmentExec fragmentExec) throws IOException {
129-
Source.EMPTY.writeTo(out);
130-
out.writeNamedWriteable(fragmentExec.fragment());
131-
out.writeOptionalNamedWriteable(fragmentExec.esFilter());
132-
out.writeOptionalVInt(fragmentExec.estimatedRowSize());
133-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
134-
out.writeOptionalPhysicalPlanNode(fragmentExec.reducer());
135-
}
136-
}
137-
138-
static GrokExec readGrokExec(PlanStreamInput in) throws IOException {
139-
Source source;
140-
return new GrokExec(
141-
source = Source.readFrom(in),
142-
in.readPhysicalPlanNode(),
143-
in.readNamedWriteable(Expression.class),
144-
Grok.pattern(source, in.readString()),
145-
in.readNamedWriteableCollectionAsList(Attribute.class)
146-
);
147-
}
148-
149-
static void writeGrokExec(PlanStreamOutput out, GrokExec grokExec) throws IOException {
150-
Source.EMPTY.writeTo(out);
151-
out.writePhysicalPlanNode(grokExec.child());
152-
out.writeNamedWriteable(grokExec.inputExpression());
153-
out.writeString(grokExec.pattern().pattern());
154-
out.writeNamedWriteableCollection(grokExec.extractedFields());
155-
}
156-
157-
static LimitExec readLimitExec(PlanStreamInput in) throws IOException {
158-
return new LimitExec(Source.readFrom(in), in.readPhysicalPlanNode(), in.readNamedWriteable(Expression.class));
159-
}
160-
161-
static void writeLimitExec(PlanStreamOutput out, LimitExec limitExec) throws IOException {
162-
Source.EMPTY.writeTo(out);
163-
out.writePhysicalPlanNode(limitExec.child());
164-
out.writeNamedWriteable(limitExec.limit());
165-
}
166-
167104
static MvExpandExec readMvExpandExec(PlanStreamInput in) throws IOException {
168105
return new MvExpandExec(
169106
Source.readFrom(in),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FilterExec.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,26 @@
66
*/
77
package org.elasticsearch.xpack.esql.plan.physical;
88

9+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
912
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1013
import org.elasticsearch.xpack.esql.core.expression.Expression;
1114
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1215
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
17+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1318

19+
import java.io.IOException;
1420
import java.util.List;
1521
import java.util.Objects;
1622

1723
public class FilterExec extends UnaryExec {
24+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
25+
PhysicalPlan.class,
26+
"FilterExec",
27+
FilterExec::new
28+
);
1829

1930
private final Expression condition;
2031

@@ -23,6 +34,22 @@ public FilterExec(Source source, PhysicalPlan child, Expression condition) {
2334
this.condition = condition;
2435
}
2536

37+
private FilterExec(StreamInput in) throws IOException {
38+
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readPhysicalPlanNode(), in.readNamedWriteable(Expression.class));
39+
}
40+
41+
@Override
42+
public void writeTo(StreamOutput out) throws IOException {
43+
Source.EMPTY.writeTo(out);
44+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
45+
out.writeNamedWriteable(condition());
46+
}
47+
48+
@Override
49+
public String getWriteableName() {
50+
return ENTRY.name;
51+
}
52+
2653
@Override
2754
protected NodeInfo<FilterExec> info() {
2855
return NodeInfo.create(this, FilterExec::new, child(), condition);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,28 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
1014
import org.elasticsearch.index.query.QueryBuilder;
1115
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1216
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1317
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
19+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1420
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1521

22+
import java.io.IOException;
1623
import java.util.List;
1724
import java.util.Objects;
1825

1926
public class FragmentExec extends LeafExec implements EstimatesRowSize {
27+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
28+
PhysicalPlan.class,
29+
"FragmentExec",
30+
FragmentExec::new
31+
);
2032

2133
private final LogicalPlan fragment;
2234
private final QueryBuilder esFilter;
@@ -40,6 +52,32 @@ public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter,
4052
this.reducer = reducer;
4153
}
4254

55+
private FragmentExec(StreamInput in) throws IOException {
56+
this(
57+
Source.readFrom((PlanStreamInput) in),
58+
in.readNamedWriteable(LogicalPlan.class),
59+
in.readOptionalNamedWriteable(QueryBuilder.class),
60+
in.readOptionalVInt(),
61+
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? ((PlanStreamInput) in).readOptionalPhysicalPlanNode() : null
62+
);
63+
}
64+
65+
@Override
66+
public void writeTo(StreamOutput out) throws IOException {
67+
Source.EMPTY.writeTo(out);
68+
out.writeNamedWriteable(fragment());
69+
out.writeOptionalNamedWriteable(esFilter());
70+
out.writeOptionalVInt(estimatedRowSize());
71+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
72+
((PlanStreamOutput) out).writeOptionalPhysicalPlanNode(reducer());
73+
}
74+
}
75+
76+
@Override
77+
public String getWriteableName() {
78+
return ENTRY.name;
79+
}
80+
4381
public LogicalPlan fragment() {
4482
return fragment;
4583
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/GrokExec.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,27 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1114
import org.elasticsearch.xpack.esql.core.expression.Expression;
1215
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1316
import org.elasticsearch.xpack.esql.core.tree.Source;
17+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1419
import org.elasticsearch.xpack.esql.plan.logical.Grok;
1520

21+
import java.io.IOException;
1622
import java.util.List;
1723
import java.util.Objects;
1824

1925
public class GrokExec extends RegexExtractExec {
26+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
27+
PhysicalPlan.class,
28+
"GrokExec",
29+
GrokExec::readFrom
30+
);
2031

2132
private final Grok.Parser parser;
2233

@@ -31,6 +42,31 @@ public GrokExec(
3142
this.parser = parser;
3243
}
3344

45+
private static GrokExec readFrom(StreamInput in) throws IOException {
46+
Source source = Source.readFrom((PlanStreamInput) in);
47+
return new GrokExec(
48+
source,
49+
((PlanStreamInput) in).readPhysicalPlanNode(),
50+
in.readNamedWriteable(Expression.class),
51+
Grok.pattern(source, in.readString()),
52+
in.readNamedWriteableCollectionAsList(Attribute.class)
53+
);
54+
}
55+
56+
@Override
57+
public void writeTo(StreamOutput out) throws IOException {
58+
Source.EMPTY.writeTo(out);
59+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
60+
out.writeNamedWriteable(inputExpression());
61+
out.writeString(pattern().pattern());
62+
out.writeNamedWriteableCollection(extractedFields());
63+
}
64+
65+
@Override
66+
public String getWriteableName() {
67+
return ENTRY.name;
68+
}
69+
3470
@Override
3571
public UnaryExec replaceChild(PhysicalPlan newChild) {
3672
return new GrokExec(source(), newChild, inputExpression, parser, extractedFields);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1114
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1215
import org.elasticsearch.xpack.esql.core.expression.Expressions;
@@ -21,6 +24,12 @@
2124
import java.util.Set;
2225

2326
public class HashJoinExec extends UnaryExec implements EstimatesRowSize {
27+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
28+
PhysicalPlan.class,
29+
"HashJoinExec",
30+
HashJoinExec::new
31+
);
32+
2433
private final LocalSourceExec joinData;
2534
private final List<Attribute> matchFields;
2635
private final List<Attribute> leftFields;
@@ -45,25 +54,31 @@ public HashJoinExec(
4554
this.output = output;
4655
}
4756

48-
public HashJoinExec(PlanStreamInput in) throws IOException {
49-
super(Source.readFrom(in), in.readPhysicalPlanNode());
57+
private HashJoinExec(StreamInput in) throws IOException {
58+
super(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readPhysicalPlanNode());
5059
this.joinData = new LocalSourceExec(in);
5160
this.matchFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5261
this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5362
this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5463
this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
5564
}
5665

57-
public void writeTo(PlanStreamOutput out) throws IOException {
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
5868
source().writeTo(out);
59-
out.writePhysicalPlanNode(child());
69+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
6070
joinData.writeTo(out);
6171
out.writeNamedWriteableCollection(matchFields);
6272
out.writeNamedWriteableCollection(leftFields);
6373
out.writeNamedWriteableCollection(rightFields);
6474
out.writeNamedWriteableCollection(output);
6575
}
6676

77+
@Override
78+
public String getWriteableName() {
79+
return ENTRY.name;
80+
}
81+
6782
public LocalSourceExec joinData() {
6883
return joinData;
6984
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,24 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Expression;
1114
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1215
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
17+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1318

19+
import java.io.IOException;
1420
import java.util.Objects;
1521

1622
public class LimitExec extends UnaryExec {
23+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
24+
PhysicalPlan.class,
25+
"LimitExec",
26+
LimitExec::new
27+
);
1728

1829
private final Expression limit;
1930

@@ -22,6 +33,22 @@ public LimitExec(Source source, PhysicalPlan child, Expression limit) {
2233
this.limit = limit;
2334
}
2435

36+
private LimitExec(StreamInput in) throws IOException {
37+
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readPhysicalPlanNode(), in.readNamedWriteable(Expression.class));
38+
}
39+
40+
@Override
41+
public void writeTo(StreamOutput out) throws IOException {
42+
Source.EMPTY.writeTo(out);
43+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
44+
out.writeNamedWriteable(limit());
45+
}
46+
47+
@Override
48+
public String getWriteableName() {
49+
return ENTRY.name;
50+
}
51+
2552
@Override
2653
protected NodeInfo<? extends LimitExec> info() {
2754
return NodeInfo.create(this, LimitExec::new, child(), limit);

0 commit comments

Comments
 (0)