Skip to content

Commit 1dfb440

Browse files
authored
ESQL: Port more PhysicalPlan to NamedWriteable (#112622)
This lines ESQL up better with the rest of Elasticsearch. Slowly, slowly. It also documents that a part of the field loading infrastructure is local-only.
1 parent 707187d commit 1dfb440

File tree

7 files changed

+259
-91
lines changed

7 files changed

+259
-91
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

Lines changed: 2 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,12 @@
99

1010
import org.elasticsearch.TransportVersions;
1111
import org.elasticsearch.common.io.stream.NamedWriteable;
12-
import org.elasticsearch.common.io.stream.StreamInput;
13-
import org.elasticsearch.common.io.stream.StreamOutput;
14-
import org.elasticsearch.common.util.iterable.Iterables;
15-
import org.elasticsearch.index.IndexMode;
1612
import org.elasticsearch.index.query.QueryBuilder;
17-
import org.elasticsearch.transport.RemoteClusterAware;
1813
import org.elasticsearch.xpack.esql.core.expression.Alias;
1914
import org.elasticsearch.xpack.esql.core.expression.Attribute;
2015
import org.elasticsearch.xpack.esql.core.expression.Expression;
2116
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2217
import org.elasticsearch.xpack.esql.core.tree.Source;
23-
import org.elasticsearch.xpack.esql.index.EsIndex;
24-
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
2518
import org.elasticsearch.xpack.esql.plan.logical.Grok;
2619
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2720
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
@@ -50,8 +43,6 @@
5043

5144
import java.io.IOException;
5245
import java.util.List;
53-
import java.util.Map;
54-
import java.util.Set;
5546

5647
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.Entry.of;
5748

@@ -92,11 +83,11 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
9283
of(PhysicalPlan.class, EsQueryExec.ENTRY),
9384
of(PhysicalPlan.class, EsSourceExec.ENTRY),
9485
of(PhysicalPlan.class, EvalExec.ENTRY),
95-
of(PhysicalPlan.class, EnrichExec.class, PlanNamedTypes::writeEnrichExec, PlanNamedTypes::readEnrichExec),
86+
of(PhysicalPlan.class, EnrichExec.ENTRY),
9687
of(PhysicalPlan.class, ExchangeExec.ENTRY),
9788
of(PhysicalPlan.class, ExchangeSinkExec.ENTRY),
9889
of(PhysicalPlan.class, ExchangeSourceExec.ENTRY),
99-
of(PhysicalPlan.class, FieldExtractExec.class, PlanNamedTypes::writeFieldExtractExec, PlanNamedTypes::readFieldExtractExec),
90+
of(PhysicalPlan.class, FieldExtractExec.ENTRY),
10091
of(PhysicalPlan.class, FilterExec.class, PlanNamedTypes::writeFilterExec, PlanNamedTypes::readFilterExec),
10192
of(PhysicalPlan.class, FragmentExec.class, PlanNamedTypes::writeFragmentExec, PlanNamedTypes::readFragmentExec),
10293
of(PhysicalPlan.class, GrokExec.class, PlanNamedTypes::writeGrokExec, PlanNamedTypes::readGrokExec),
@@ -114,72 +105,6 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
114105
}
115106

116107
// -- physical plan nodes
117-
static EnrichExec readEnrichExec(PlanStreamInput in) throws IOException {
118-
final Source source = Source.readFrom(in);
119-
final PhysicalPlan child = in.readPhysicalPlanNode();
120-
final NamedExpression matchField = in.readNamedWriteable(NamedExpression.class);
121-
final String policyName = in.readString();
122-
final String matchType = (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) ? in.readString() : "match";
123-
final String policyMatchField = in.readString();
124-
final Map<String, String> concreteIndices;
125-
final Enrich.Mode mode;
126-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
127-
mode = in.readEnum(Enrich.Mode.class);
128-
concreteIndices = in.readMap(StreamInput::readString, StreamInput::readString);
129-
} else {
130-
mode = Enrich.Mode.ANY;
131-
EsIndex esIndex = new EsIndex(in);
132-
if (esIndex.concreteIndices().size() != 1) {
133-
throw new IllegalStateException("expected a single concrete enrich index; got " + esIndex.concreteIndices());
134-
}
135-
concreteIndices = Map.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Iterables.get(esIndex.concreteIndices(), 0));
136-
}
137-
return new EnrichExec(
138-
source,
139-
child,
140-
mode,
141-
matchType,
142-
matchField,
143-
policyName,
144-
policyMatchField,
145-
concreteIndices,
146-
in.readNamedWriteableCollectionAsList(NamedExpression.class)
147-
);
148-
}
149-
150-
static void writeEnrichExec(PlanStreamOutput out, EnrichExec enrich) throws IOException {
151-
Source.EMPTY.writeTo(out);
152-
out.writePhysicalPlanNode(enrich.child());
153-
out.writeNamedWriteable(enrich.matchField());
154-
out.writeString(enrich.policyName());
155-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
156-
out.writeString(enrich.matchType());
157-
}
158-
out.writeString(enrich.policyMatchField());
159-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
160-
out.writeEnum(enrich.mode());
161-
out.writeMap(enrich.concreteIndices(), StreamOutput::writeString, StreamOutput::writeString);
162-
} else {
163-
if (enrich.concreteIndices().keySet().equals(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY))) {
164-
String concreteIndex = enrich.concreteIndices().get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
165-
new EsIndex(concreteIndex, Map.of(), Map.of(concreteIndex, IndexMode.STANDARD)).writeTo(out);
166-
} else {
167-
throw new IllegalStateException("expected a single concrete enrich index; got " + enrich.concreteIndices());
168-
}
169-
}
170-
out.writeNamedWriteableCollection(enrich.enrichFields());
171-
}
172-
173-
static FieldExtractExec readFieldExtractExec(PlanStreamInput in) throws IOException {
174-
return new FieldExtractExec(Source.readFrom(in), in.readPhysicalPlanNode(), in.readNamedWriteableCollectionAsList(Attribute.class));
175-
}
176-
177-
static void writeFieldExtractExec(PlanStreamOutput out, FieldExtractExec fieldExtractExec) throws IOException {
178-
Source.EMPTY.writeTo(out);
179-
out.writePhysicalPlanNode(fieldExtractExec.child());
180-
out.writeNamedWriteableCollection(fieldExtractExec.attributesToExtract());
181-
}
182-
183108
static FilterExec readFilterExec(PlanStreamInput in) throws IOException {
184109
return new FilterExec(Source.readFrom(in), in.readPhysicalPlanNode(), in.readNamedWriteable(Expression.class));
185110
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
5959
// add extractor
6060
if (missing.isEmpty() == false) {
6161
// collect source attributes and add the extractor
62-
var extractor = new FieldExtractExec(p.source(), p.child(), List.copyOf(missing));
62+
var extractor = new FieldExtractExec(p.source(), p.child(), List.copyOf(missing), Set.of());
6363
p = p.replaceChild(extractor);
6464
}
6565

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EnrichExec.java

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,37 @@
66
*/
77
package org.elasticsearch.xpack.esql.plan.physical;
88

9+
import org.elasticsearch.TransportVersions;
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.util.iterable.Iterables;
14+
import org.elasticsearch.index.IndexMode;
15+
import org.elasticsearch.transport.RemoteClusterAware;
916
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1017
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1118
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1219
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1320
import org.elasticsearch.xpack.esql.core.tree.Source;
21+
import org.elasticsearch.xpack.esql.index.EsIndex;
22+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
23+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1424
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1525

26+
import java.io.IOException;
1627
import java.util.List;
1728
import java.util.Map;
1829
import java.util.Objects;
30+
import java.util.Set;
1931

2032
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
2133

2234
public class EnrichExec extends UnaryExec implements EstimatesRowSize {
35+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
36+
PhysicalPlan.class,
37+
"EnrichExec",
38+
EnrichExec::readFrom
39+
);
2340

2441
private final Enrich.Mode mode;
2542
private final String matchType;
@@ -30,9 +47,6 @@ public class EnrichExec extends UnaryExec implements EstimatesRowSize {
3047
private final List<NamedExpression> enrichFields;
3148

3249
/**
33-
*
34-
* @param source
35-
* @param child
3650
* @param matchField the match field in the source data
3751
* @param policyName the enrich policy name
3852
* @param policyMatchField the match field name in the policy
@@ -60,6 +74,68 @@ public EnrichExec(
6074
this.enrichFields = enrichFields;
6175
}
6276

77+
private static EnrichExec readFrom(StreamInput in) throws IOException {
78+
final Source source = Source.readFrom((PlanStreamInput) in);
79+
final PhysicalPlan child = ((PlanStreamInput) in).readPhysicalPlanNode();
80+
final NamedExpression matchField = in.readNamedWriteable(NamedExpression.class);
81+
final String policyName = in.readString();
82+
final String matchType = (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) ? in.readString() : "match";
83+
final String policyMatchField = in.readString();
84+
final Map<String, String> concreteIndices;
85+
final Enrich.Mode mode;
86+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
87+
mode = in.readEnum(Enrich.Mode.class);
88+
concreteIndices = in.readMap(StreamInput::readString, StreamInput::readString);
89+
} else {
90+
mode = Enrich.Mode.ANY;
91+
EsIndex esIndex = new EsIndex(in);
92+
if (esIndex.concreteIndices().size() != 1) {
93+
throw new IllegalStateException("expected a single concrete enrich index; got " + esIndex.concreteIndices());
94+
}
95+
concreteIndices = Map.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Iterables.get(esIndex.concreteIndices(), 0));
96+
}
97+
return new EnrichExec(
98+
source,
99+
child,
100+
mode,
101+
matchType,
102+
matchField,
103+
policyName,
104+
policyMatchField,
105+
concreteIndices,
106+
in.readNamedWriteableCollectionAsList(NamedExpression.class)
107+
);
108+
}
109+
110+
@Override
111+
public void writeTo(StreamOutput out) throws IOException {
112+
Source.EMPTY.writeTo(out);
113+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
114+
out.writeNamedWriteable(matchField());
115+
out.writeString(policyName());
116+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
117+
out.writeString(matchType());
118+
}
119+
out.writeString(policyMatchField());
120+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
121+
out.writeEnum(mode());
122+
out.writeMap(concreteIndices(), StreamOutput::writeString, StreamOutput::writeString);
123+
} else {
124+
if (concreteIndices().keySet().equals(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY))) {
125+
String concreteIndex = concreteIndices().get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
126+
new EsIndex(concreteIndex, Map.of(), Map.of(concreteIndex, IndexMode.STANDARD)).writeTo(out);
127+
} else {
128+
throw new IllegalStateException("expected a single concrete enrich index; got " + concreteIndices());
129+
}
130+
}
131+
out.writeNamedWriteableCollection(enrichFields());
132+
}
133+
134+
@Override
135+
public String getWriteableName() {
136+
return ENTRY.name;
137+
}
138+
63139
@Override
64140
protected AttributeSet computeReferences() {
65141
return matchField.references();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FieldExtractExec.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,39 +7,74 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1114
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1215
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1316
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
1417
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
19+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1520

21+
import java.io.IOException;
1622
import java.util.ArrayList;
17-
import java.util.Collection;
1823
import java.util.List;
1924
import java.util.Objects;
2025
import java.util.Set;
2126

22-
import static java.util.Collections.emptySet;
23-
2427
public class FieldExtractExec extends UnaryExec implements EstimatesRowSize {
28+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
29+
PhysicalPlan.class,
30+
"FieldExtractExec",
31+
FieldExtractExec::new
32+
);
33+
2534
private final List<Attribute> attributesToExtract;
2635
private final Attribute sourceAttribute;
27-
// attributes to extract as doc values
36+
/**
37+
* Attributes that many be extracted as doc values even if that makes them
38+
* less accurate. This is mostly used for geo fields which lose a lot of
39+
* precision in their doc values, but in some cases doc values provides
40+
* <strong>enough</strong> precision to do the job.
41+
* <p>
42+
* This is never serialized between nodes and only used locally.
43+
* </p>
44+
*/
2845
private final Set<Attribute> docValuesAttributes;
2946

3047
private List<Attribute> lazyOutput;
3148

32-
public FieldExtractExec(Source source, PhysicalPlan child, List<Attribute> attributesToExtract) {
33-
this(source, child, attributesToExtract, emptySet());
34-
}
35-
3649
public FieldExtractExec(Source source, PhysicalPlan child, List<Attribute> attributesToExtract, Set<Attribute> docValuesAttributes) {
3750
super(source, child);
3851
this.attributesToExtract = attributesToExtract;
3952
this.sourceAttribute = extractSourceAttributesFrom(child);
4053
this.docValuesAttributes = docValuesAttributes;
4154
}
4255

56+
private FieldExtractExec(StreamInput in) throws IOException {
57+
this(
58+
Source.readFrom((PlanStreamInput) in),
59+
((PlanStreamInput) in).readPhysicalPlanNode(),
60+
in.readNamedWriteableCollectionAsList(Attribute.class),
61+
Set.of() // docValueAttributes are only used on the data node and never serialized.
62+
);
63+
}
64+
65+
@Override
66+
public void writeTo(StreamOutput out) throws IOException {
67+
Source.EMPTY.writeTo(out);
68+
((PlanStreamOutput) out).writePhysicalPlanNode(child());
69+
out.writeNamedWriteableCollection(attributesToExtract());
70+
// docValueAttributes are only used on the data node and never serialized.
71+
}
72+
73+
@Override
74+
public String getWriteableName() {
75+
return ENTRY.name;
76+
}
77+
4378
public static Attribute extractSourceAttributesFrom(PhysicalPlan plan) {
4479
for (Attribute attribute : plan.outputSet()) {
4580
if (EsQueryExec.isSourceAttribute(attribute)) {
@@ -77,7 +112,7 @@ public Attribute sourceAttribute() {
77112
return sourceAttribute;
78113
}
79114

80-
public Collection<Attribute> docValuesAttributes() {
115+
public Set<Attribute> docValuesAttributes() {
81116
return docValuesAttributes;
82117
}
83118

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
2626
return List.of(
2727
AggregateExec.ENTRY,
2828
DissectExec.ENTRY,
29+
EnrichExec.ENTRY,
2930
EsQueryExec.ENTRY,
3031
EsSourceExec.ENTRY,
3132
EvalExec.ENTRY,
3233
ExchangeExec.ENTRY,
3334
ExchangeSinkExec.ENTRY,
34-
ExchangeSourceExec.ENTRY
35+
ExchangeSourceExec.ENTRY,
36+
FieldExtractExec.ENTRY
3537
);
3638
}
3739

0 commit comments

Comments
 (0)