Skip to content

Commit 68b7b7f

Browse files
authored
ESQL: Migrate more physical plan writeable (#112248)
Migrates a few more of our physical plan nodes to `NamedWriteable`.
1 parent e7c0ba9 commit 68b7b7f

File tree

10 files changed

+268
-151
lines changed

10 files changed

+268
-151
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

Lines changed: 2 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,13 @@
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
1414
import org.elasticsearch.common.util.iterable.Iterables;
15-
import org.elasticsearch.index.IndexMode;
1615
import org.elasticsearch.index.query.QueryBuilder;
1716
import org.elasticsearch.transport.RemoteClusterAware;
1817
import org.elasticsearch.xpack.esql.core.expression.Alias;
1918
import org.elasticsearch.xpack.esql.core.expression.Attribute;
2019
import org.elasticsearch.xpack.esql.core.expression.Expression;
21-
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
2220
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2321
import org.elasticsearch.xpack.esql.core.tree.Source;
24-
import org.elasticsearch.xpack.esql.expression.Order;
2522
import org.elasticsearch.xpack.esql.index.EsIndex;
2623
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
2724
import org.elasticsearch.xpack.esql.plan.logical.Grok;
@@ -56,8 +53,6 @@
5653
import java.util.Set;
5754

5855
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.Entry.of;
59-
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader.readerFromPlanReader;
60-
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter.writerFromPlanWriter;
6156

6257
/**
6358
* A utility class that consists solely of static methods that describe how to serialize and
@@ -93,9 +88,9 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
9388
// Physical Plan Nodes
9489
of(PhysicalPlan.class, AggregateExec.ENTRY),
9590
of(PhysicalPlan.class, DissectExec.ENTRY),
96-
of(PhysicalPlan.class, EsQueryExec.class, PlanNamedTypes::writeEsQueryExec, PlanNamedTypes::readEsQueryExec),
91+
of(PhysicalPlan.class, EsQueryExec.ENTRY),
9792
of(PhysicalPlan.class, EsSourceExec.ENTRY),
98-
of(PhysicalPlan.class, EvalExec.class, PlanNamedTypes::writeEvalExec, PlanNamedTypes::readEvalExec),
93+
of(PhysicalPlan.class, EvalExec.ENTRY),
9994
of(PhysicalPlan.class, EnrichExec.class, PlanNamedTypes::writeEnrichExec, PlanNamedTypes::readEnrichExec),
10095
of(PhysicalPlan.class, ExchangeExec.class, PlanNamedTypes::writeExchangeExec, PlanNamedTypes::readExchangeExec),
10196
of(PhysicalPlan.class, ExchangeSinkExec.class, PlanNamedTypes::writeExchangeSinkExec, PlanNamedTypes::readExchangeSinkExec),
@@ -123,57 +118,6 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
123118
}
124119

125120
// -- physical plan nodes
126-
static EsQueryExec readEsQueryExec(PlanStreamInput in) throws IOException {
127-
return new EsQueryExec(
128-
Source.readFrom(in),
129-
new EsIndex(in),
130-
readIndexMode(in),
131-
in.readNamedWriteableCollectionAsList(Attribute.class),
132-
in.readOptionalNamedWriteable(QueryBuilder.class),
133-
in.readOptionalNamed(Expression.class),
134-
in.readOptionalCollectionAsList(readerFromPlanReader(PlanNamedTypes::readFieldSort)),
135-
in.readOptionalVInt()
136-
);
137-
}
138-
139-
static void writeEsQueryExec(PlanStreamOutput out, EsQueryExec esQueryExec) throws IOException {
140-
assert esQueryExec.children().size() == 0;
141-
Source.EMPTY.writeTo(out);
142-
esQueryExec.index().writeTo(out);
143-
writeIndexMode(out, esQueryExec.indexMode());
144-
out.writeNamedWriteableCollection(esQueryExec.output());
145-
out.writeOptionalNamedWriteable(esQueryExec.query());
146-
out.writeOptionalNamedWriteable(esQueryExec.limit());
147-
out.writeOptionalCollection(esQueryExec.sorts(), writerFromPlanWriter(PlanNamedTypes::writeFieldSort));
148-
out.writeOptionalInt(esQueryExec.estimatedRowSize());
149-
}
150-
151-
public static IndexMode readIndexMode(StreamInput in) throws IOException {
152-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
153-
return IndexMode.fromString(in.readString());
154-
} else {
155-
return IndexMode.STANDARD;
156-
}
157-
}
158-
159-
public static void writeIndexMode(StreamOutput out, IndexMode indexMode) throws IOException {
160-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
161-
out.writeString(indexMode.getName());
162-
} else if (indexMode != IndexMode.STANDARD) {
163-
throw new IllegalStateException("not ready to support index mode [" + indexMode + "]");
164-
}
165-
}
166-
167-
static EvalExec readEvalExec(PlanStreamInput in) throws IOException {
168-
return new EvalExec(Source.readFrom(in), in.readPhysicalPlanNode(), in.readCollectionAsList(Alias::new));
169-
}
170-
171-
static void writeEvalExec(PlanStreamOutput out, EvalExec evalExec) throws IOException {
172-
Source.EMPTY.writeTo(out);
173-
out.writePhysicalPlanNode(evalExec.child());
174-
out.writeCollection(evalExec.fields());
175-
}
176-
177121
static EnrichExec readEnrichExec(PlanStreamInput in) throws IOException {
178122
final Source source = Source.readFrom(in);
179123
final PhysicalPlan child = in.readPhysicalPlanNode();
@@ -426,20 +370,4 @@ static void writeTopNExec(PlanStreamOutput out, TopNExec topNExec) throws IOExce
426370
out.writeNamedWriteable(topNExec.limit());
427371
out.writeOptionalVInt(topNExec.estimatedRowSize());
428372
}
429-
430-
// -- ancillary supporting classes of plan nodes, etc
431-
432-
static EsQueryExec.FieldSort readFieldSort(PlanStreamInput in) throws IOException {
433-
return new EsQueryExec.FieldSort(
434-
FieldAttribute.readFrom(in),
435-
in.readEnum(Order.OrderDirection.class),
436-
in.readEnum(Order.NullsPosition.class)
437-
);
438-
}
439-
440-
static void writeFieldSort(PlanStreamOutput out, EsQueryExec.FieldSort fieldSort) throws IOException {
441-
fieldSort.field().writeTo(out);
442-
out.writeEnum(fieldSort.direction());
443-
out.writeEnum(fieldSort.nulls());
444-
}
445373
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.xpack.esql.core.tree.Source;
2020
import org.elasticsearch.xpack.esql.core.type.EsField;
2121
import org.elasticsearch.xpack.esql.index.EsIndex;
22-
import org.elasticsearch.xpack.esql.io.stream.PlanNamedTypes;
2322
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2423

2524
import java.io.IOException;
@@ -67,7 +66,7 @@ private static EsRelation readFrom(StreamInput in) throws IOException {
6766
in.readOptionalString();
6867
in.readOptionalString();
6968
}
70-
IndexMode indexMode = PlanNamedTypes.readIndexMode(in);
69+
IndexMode indexMode = readIndexMode(in);
7170
boolean frozen = in.readBoolean();
7271
return new EsRelation(source, esIndex, attributes, indexMode, frozen);
7372
}
@@ -83,7 +82,7 @@ public void writeTo(StreamOutput out) throws IOException {
8382
out.writeOptionalString(null);
8483
out.writeOptionalString(null);
8584
}
86-
PlanNamedTypes.writeIndexMode(out, indexMode());
85+
writeIndexMode(out, indexMode());
8786
out.writeBoolean(frozen());
8887
}
8988

@@ -174,4 +173,20 @@ public boolean equals(Object obj) {
174173
public String nodeString() {
175174
return nodeName() + "[" + index + "]" + NodeUtils.limitedToString(attrs);
176175
}
176+
177+
public static IndexMode readIndexMode(StreamInput in) throws IOException {
178+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
179+
return IndexMode.fromString(in.readString());
180+
} else {
181+
return IndexMode.STANDARD;
182+
}
183+
}
184+
185+
public static void writeIndexMode(StreamOutput out, IndexMode indexMode) throws IOException {
186+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ADD_INDEX_MODE_TO_SOURCE)) {
187+
out.writeString(indexMode.getName());
188+
} else if (indexMode != IndexMode.STANDARD) {
189+
throw new IllegalStateException("not ready to support index mode [" + indexMode + "]");
190+
}
191+
}
177192
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
package org.elasticsearch.xpack.esql.plan.physical;
99

1010
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
1115
import org.elasticsearch.index.IndexMode;
1216
import org.elasticsearch.index.query.QueryBuilder;
1317
import org.elasticsearch.search.sort.FieldSortBuilder;
@@ -22,12 +26,21 @@
2226
import org.elasticsearch.xpack.esql.core.type.EsField;
2327
import org.elasticsearch.xpack.esql.expression.Order;
2428
import org.elasticsearch.xpack.esql.index.EsIndex;
29+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
30+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2531

32+
import java.io.IOException;
2633
import java.util.List;
2734
import java.util.Map;
2835
import java.util.Objects;
2936

3037
public class EsQueryExec extends LeafExec implements EstimatesRowSize {
38+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
39+
PhysicalPlan.class,
40+
"EsQueryExec",
41+
EsQueryExec::new
42+
);
43+
3144
public static final EsField DOC_ID_FIELD = new EsField("_doc", DataType.DOC_DATA_TYPE, Map.of(), false);
3245

3346
private final EsIndex index;
@@ -43,14 +56,29 @@ public class EsQueryExec extends LeafExec implements EstimatesRowSize {
4356
*/
4457
private final Integer estimatedRowSize;
4558

46-
public record FieldSort(FieldAttribute field, Order.OrderDirection direction, Order.NullsPosition nulls) {
59+
public record FieldSort(FieldAttribute field, Order.OrderDirection direction, Order.NullsPosition nulls) implements Writeable {
4760
public FieldSortBuilder fieldSortBuilder() {
4861
FieldSortBuilder builder = new FieldSortBuilder(field.name());
4962
builder.order(Direction.from(direction).asOrder());
5063
builder.missing(Missing.from(nulls).searchOrder());
5164
builder.unmappedType(field.dataType().esType());
5265
return builder;
5366
}
67+
68+
private static FieldSort readFrom(StreamInput in) throws IOException {
69+
return new EsQueryExec.FieldSort(
70+
FieldAttribute.readFrom(in),
71+
in.readEnum(Order.OrderDirection.class),
72+
in.readEnum(Order.NullsPosition.class)
73+
);
74+
}
75+
76+
@Override
77+
public void writeTo(StreamOutput out) throws IOException {
78+
field().writeTo(out);
79+
out.writeEnum(direction());
80+
out.writeEnum(nulls());
81+
}
5482
}
5583

5684
public EsQueryExec(Source source, EsIndex index, IndexMode indexMode, List<Attribute> attributes, QueryBuilder query) {
@@ -77,6 +105,36 @@ public EsQueryExec(
77105
this.estimatedRowSize = estimatedRowSize;
78106
}
79107

108+
private EsQueryExec(StreamInput in) throws IOException {
109+
this(
110+
Source.readFrom((PlanStreamInput) in),
111+
new EsIndex(in),
112+
EsRelation.readIndexMode(in),
113+
in.readNamedWriteableCollectionAsList(Attribute.class),
114+
in.readOptionalNamedWriteable(QueryBuilder.class),
115+
in.readOptionalNamedWriteable(Expression.class),
116+
in.readOptionalCollectionAsList(FieldSort::readFrom),
117+
in.readOptionalVInt()
118+
);
119+
}
120+
121+
@Override
122+
public void writeTo(StreamOutput out) throws IOException {
123+
Source.EMPTY.writeTo(out);
124+
index().writeTo(out);
125+
EsRelation.writeIndexMode(out, indexMode());
126+
out.writeNamedWriteableCollection(output());
127+
out.writeOptionalNamedWriteable(query());
128+
out.writeOptionalNamedWriteable(limit());
129+
out.writeOptionalCollection(sorts());
130+
out.writeOptionalVInt(estimatedRowSize());
131+
}
132+
133+
@Override
134+
public String getWriteableName() {
135+
return ENTRY.name;
136+
}
137+
80138
public static boolean isSourceAttribute(Attribute attr) {
81139
return DOC_ID_FIELD.getName().equals(attr.name());
82140
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExec.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
1818
import org.elasticsearch.xpack.esql.core.tree.Source;
1919
import org.elasticsearch.xpack.esql.index.EsIndex;
20-
import org.elasticsearch.xpack.esql.io.stream.PlanNamedTypes;
2120
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2221
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2322

@@ -55,7 +54,7 @@ private EsSourceExec(StreamInput in) throws IOException {
5554
new EsIndex(in),
5655
in.readNamedWriteableCollectionAsList(Attribute.class),
5756
in.readOptionalNamedWriteable(QueryBuilder.class),
58-
PlanNamedTypes.readIndexMode(in)
57+
EsRelation.readIndexMode(in)
5958
);
6059
}
6160

@@ -65,7 +64,7 @@ public void writeTo(StreamOutput out) throws IOException {
6564
index().writeTo(out);
6665
out.writeNamedWriteableCollection(output());
6766
out.writeOptionalNamedWriteable(query());
68-
PlanNamedTypes.writeIndexMode(out, indexMode());
67+
EsRelation.writeIndexMode(out, indexMode());
6968
}
7069

7170
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EvalExec.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,52 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Alias;
1114
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1215
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1316
import org.elasticsearch.xpack.esql.core.tree.Source;
17+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1419

20+
import java.io.IOException;
1521
import java.util.List;
1622
import java.util.Objects;
1723

1824
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
1925

2026
public class EvalExec extends UnaryExec implements EstimatesRowSize {
27+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
28+
PhysicalPlan.class,
29+
"EvalExec",
30+
EvalExec::new
31+
);
32+
2133
private final List<Alias> fields;
2234

2335
public EvalExec(Source source, PhysicalPlan child, List<Alias> fields) {
2436
super(source, child);
2537
this.fields = fields;
2638
}
2739

40+
private EvalExec(StreamInput in) throws IOException {
41+
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readPhysicalPlanNode(), in.readCollectionAsList(Alias::new));
42+
}
43+
44+
@Override
45+
public void writeTo(StreamOutput out) throws IOException {
46+
Source.EMPTY.writeTo(out);
47+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
48+
out.writeCollection(fields());
49+
}
50+
51+
@Override
52+
public String getWriteableName() {
53+
return ENTRY.name;
54+
}
55+
2856
public List<Alias> fields() {
2957
return fields;
3058
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> {
2525
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
26-
return List.of(AggregateExec.ENTRY, DissectExec.ENTRY, EsSourceExec.ENTRY);
26+
return List.of(AggregateExec.ENTRY, DissectExec.ENTRY, EsQueryExec.ENTRY, EsSourceExec.ENTRY, EvalExec.ENTRY);
2727
}
2828

2929
public PhysicalPlan(Source source, List<PhysicalPlan> children) {

0 commit comments

Comments
 (0)