Skip to content

Commit 25fdcd2

Browse files
ES|QL: cache EsField on serialization (#112008)
As a follow-up to #111447, with this change we also cache `EsFields`. This gives us an additional 30-40% reduction on the size of serialized plan, according to [these tests](#111980) Related to #111358
1 parent 9db1778 commit 25fdcd2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+336
-171
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ static TransportVersion def(int id) {
197197
public static final TransportVersion LTR_SERVERLESS_RELEASE = def(8_727_00_0);
198198
public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_728_00_0);
199199
public static final TransportVersion RANK_DOCS_RETRIEVER = def(8_729_00_0);
200+
public static final TransportVersion ESQL_ES_FIELD_CACHED_SERIALIZATION = def(8_730_00_0);
200201
/*
201202
* STOP! READ THIS FIRST! No, really,
202203
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

test/framework/src/main/java/org/elasticsearch/test/AbstractWireSerializingTestCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,18 @@ public abstract class AbstractWireSerializingTestCase<T extends Writeable> exten
2525
*/
2626
protected abstract Writeable.Reader<T> instanceReader();
2727

28+
/**
29+
* Returns a {@link Writeable.Writer} that will be used to serialize the instance
30+
*/
31+
protected Writeable.Writer<T> instanceWriter() {
32+
return StreamOutput::writeWriteable;
33+
}
34+
2835
/**
2936
* Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}.
3037
*/
3138
@Override
3239
protected final T copyInstance(T instance, TransportVersion version) throws IOException {
33-
return copyWriteable(instance, getNamedWriteableRegistry(), instanceReader(), version);
40+
return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version);
3441
}
3542
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private FieldAttribute(StreamInput in) throws IOException {
112112
in.readOptionalWriteable(FieldAttribute::readFrom),
113113
in.readString(),
114114
DataType.readFrom(in),
115-
in.readNamedWriteable(EsField.class),
115+
EsField.readFrom(in),
116116
in.readOptionalString(),
117117
in.readEnum(Nullability.class),
118118
NameId.readFrom((StreamInput & PlanStreamInput) in),
@@ -127,7 +127,7 @@ public void writeTo(StreamOutput out) throws IOException {
127127
out.writeOptionalWriteable(parent);
128128
out.writeString(name());
129129
dataType().writeTo(out);
130-
out.writeNamedWriteable(field);
130+
field.writeTo(out);
131131
// We used to write the qualifier here. We can still do if needed in the future.
132132
out.writeOptionalString(null);
133133
out.writeEnum(nullable());

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DateEsField.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.esql.core.type;
88

9-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
109
import org.elasticsearch.common.io.stream.StreamInput;
1110
import org.elasticsearch.common.io.stream.StreamOutput;
1211

@@ -17,7 +16,6 @@
1716
* Information about a field in an ES index with the {@code date} type
1817
*/
1918
public class DateEsField extends EsField {
20-
static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(EsField.class, "DateEsField", DateEsField::new);
2119

2220
public static DateEsField dateEsField(String name, Map<String, EsField> properties, boolean hasDocValues) {
2321
return new DateEsField(name, DataType.DATETIME, properties, hasDocValues);
@@ -27,19 +25,19 @@ private DateEsField(String name, DataType dataType, Map<String, EsField> propert
2725
super(name, dataType, properties, hasDocValues);
2826
}
2927

30-
private DateEsField(StreamInput in) throws IOException {
31-
this(in.readString(), DataType.DATETIME, in.readMap(i -> i.readNamedWriteable(EsField.class)), in.readBoolean());
28+
protected DateEsField(StreamInput in) throws IOException {
29+
this(in.readString(), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
3230
}
3331

3432
@Override
35-
public void writeTo(StreamOutput out) throws IOException {
33+
protected void writeContent(StreamOutput out) throws IOException {
3634
out.writeString(getName());
37-
out.writeMap(getProperties(), StreamOutput::writeNamedWriteable);
35+
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
3836
out.writeBoolean(isAggregatable());
3937
}
4038

41-
@Override
4239
public String getWriteableName() {
43-
return ENTRY.name;
40+
return "DateEsField";
4441
}
42+
4543
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/EsField.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,40 @@
77
package org.elasticsearch.xpack.esql.core.type;
88

99
import org.elasticsearch.TransportVersions;
10-
import org.elasticsearch.common.io.stream.NamedWriteable;
11-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1210
import org.elasticsearch.common.io.stream.StreamInput;
1311
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
1413
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
15+
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
1516

1617
import java.io.IOException;
17-
import java.util.List;
1818
import java.util.Map;
1919
import java.util.Objects;
2020

2121
/**
2222
* Information about a field in an ES index.
2323
*/
24-
public class EsField implements NamedWriteable {
25-
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
26-
return List.of(
27-
EsField.ENTRY,
28-
DateEsField.ENTRY,
29-
InvalidMappedField.ENTRY,
30-
KeywordEsField.ENTRY,
31-
TextEsField.ENTRY,
32-
UnsupportedEsField.ENTRY
33-
);
24+
public class EsField implements Writeable {
25+
26+
private static Map<String, Writeable.Reader<? extends EsField>> readers = Map.ofEntries(
27+
Map.entry("EsField", EsField::new),
28+
Map.entry("DateEsField", DateEsField::new),
29+
Map.entry("InvalidMappedField", InvalidMappedField::new),
30+
Map.entry("KeywordEsField", KeywordEsField::new),
31+
Map.entry("MultiTypeEsField", MultiTypeEsField::new),
32+
Map.entry("TextEsField", TextEsField::new),
33+
Map.entry("UnsupportedEsField", UnsupportedEsField::new)
34+
);
35+
36+
public static Writeable.Reader<? extends EsField> getReader(String name) {
37+
Reader<? extends EsField> result = readers.get(name);
38+
if (result == null) {
39+
throw new IllegalArgumentException("Invalid EsField type [" + name + "]");
40+
}
41+
return result;
3442
}
3543

36-
static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(EsField.class, "EsField", EsField::new);
37-
3844
private final DataType esDataType;
3945
private final boolean aggregatable;
4046
private final Map<String, EsField> properties;
@@ -53,10 +59,10 @@ public EsField(String name, DataType esDataType, Map<String, EsField> properties
5359
this.isAlias = isAlias;
5460
}
5561

56-
public EsField(StreamInput in) throws IOException {
62+
protected EsField(StreamInput in) throws IOException {
5763
this.name = in.readString();
5864
this.esDataType = readDataType(in);
59-
this.properties = in.readImmutableMap(i -> i.readNamedWriteable(EsField.class));
65+
this.properties = in.readImmutableMap(EsField::readFrom);
6066
this.aggregatable = in.readBoolean();
6167
this.isAlias = in.readBoolean();
6268
}
@@ -77,18 +83,33 @@ private DataType readDataType(StreamInput in) throws IOException {
7783
return DataType.readFrom(name);
7884
}
7985

86+
public static <A extends EsField> A readFrom(StreamInput in) throws IOException {
87+
return ((PlanStreamInput) in).readEsFieldWithCache();
88+
}
89+
8090
@Override
8191
public void writeTo(StreamOutput out) throws IOException {
92+
if (((PlanStreamOutput) out).writeEsFieldCacheHeader(this)) {
93+
writeContent(out);
94+
}
95+
}
96+
97+
/**
98+
* This needs to be overridden by subclasses for specific serialization
99+
*/
100+
protected void writeContent(StreamOutput out) throws IOException {
82101
out.writeString(name);
83102
esDataType.writeTo(out);
84-
out.writeMap(properties, StreamOutput::writeNamedWriteable);
103+
out.writeMap(properties, (o, x) -> x.writeTo(out));
85104
out.writeBoolean(aggregatable);
86105
out.writeBoolean(isAlias);
87106
}
88107

89-
@Override
108+
/**
109+
* This needs to be overridden by subclasses for specific serialization
110+
*/
90111
public String getWriteableName() {
91-
return ENTRY.name;
112+
return "EsField";
92113
}
93114

94115
/**

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/InvalidMappedField.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.core.type;
99

10-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1110
import org.elasticsearch.common.io.stream.StreamInput;
1211
import org.elasticsearch.common.io.stream.StreamOutput;
1312
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
@@ -27,11 +26,6 @@
2726
* It is used specifically for the 'union types' feature in ES|QL.
2827
*/
2928
public class InvalidMappedField extends EsField {
30-
static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31-
EsField.class,
32-
"InvalidMappedField",
33-
InvalidMappedField::new
34-
);
3529

3630
private final String errorMessage;
3731
private final Map<String, Set<String>> typesToIndices;
@@ -44,10 +38,6 @@ public InvalidMappedField(String name, String errorMessage) {
4438
this(name, errorMessage, new TreeMap<>());
4539
}
4640

47-
public InvalidMappedField(String name) {
48-
this(name, StringUtils.EMPTY, new TreeMap<>());
49-
}
50-
5141
/**
5242
* Constructor supporting union types, used in ES|QL.
5343
*/
@@ -61,24 +51,23 @@ private InvalidMappedField(String name, String errorMessage, Map<String, EsField
6151
this.typesToIndices = typesToIndices;
6252
}
6353

64-
private InvalidMappedField(StreamInput in) throws IOException {
65-
this(in.readString(), in.readString(), in.readImmutableMap(StreamInput::readString, i -> i.readNamedWriteable(EsField.class)));
54+
protected InvalidMappedField(StreamInput in) throws IOException {
55+
this(in.readString(), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
6656
}
6757

6858
public Set<DataType> types() {
6959
return typesToIndices.keySet().stream().map(DataType::fromTypeName).collect(Collectors.toSet());
7060
}
7161

7262
@Override
73-
public void writeTo(StreamOutput out) throws IOException {
63+
protected void writeContent(StreamOutput out) throws IOException {
7464
out.writeString(getName());
7565
out.writeString(errorMessage);
76-
out.writeMap(getProperties(), StreamOutput::writeNamedWriteable);
66+
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
7767
}
7868

79-
@Override
8069
public String getWriteableName() {
81-
return ENTRY.name;
70+
return "InvalidMappedField";
8271
}
8372

8473
public String errorMessage() {

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/KeywordEsField.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.esql.core.type;
88

9-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
109
import org.elasticsearch.common.io.stream.StreamInput;
1110
import org.elasticsearch.common.io.stream.StreamOutput;
1211

@@ -21,11 +20,6 @@
2120
* Information about a field in an ES index with the {@code keyword} type.
2221
*/
2322
public class KeywordEsField extends EsField {
24-
static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
25-
EsField.class,
26-
"KeywordEsField",
27-
KeywordEsField::new
28-
);
2923

3024
private final int precision;
3125
private final boolean normalized;
@@ -63,11 +57,11 @@ protected KeywordEsField(
6357
this.normalized = normalized;
6458
}
6559

66-
private KeywordEsField(StreamInput in) throws IOException {
60+
public KeywordEsField(StreamInput in) throws IOException {
6761
this(
6862
in.readString(),
6963
KEYWORD,
70-
in.readMap(i -> i.readNamedWriteable(EsField.class)),
64+
in.readImmutableMap(EsField::readFrom),
7165
in.readBoolean(),
7266
in.readInt(),
7367
in.readBoolean(),
@@ -76,18 +70,17 @@ private KeywordEsField(StreamInput in) throws IOException {
7670
}
7771

7872
@Override
79-
public void writeTo(StreamOutput out) throws IOException {
73+
protected void writeContent(StreamOutput out) throws IOException {
8074
out.writeString(getName());
81-
out.writeMap(getProperties(), StreamOutput::writeNamedWriteable);
75+
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
8276
out.writeBoolean(isAggregatable());
8377
out.writeInt(precision);
8478
out.writeBoolean(normalized);
8579
out.writeBoolean(isAlias());
8680
}
8781

88-
@Override
8982
public String getWriteableName() {
90-
return ENTRY.name;
83+
return "KeywordEsField";
9184
}
9285

9386
public int getPrecision() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/MultiTypeEsField.java renamed to x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/MultiTypeEsField.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.esql.type;
8+
package org.elasticsearch.xpack.esql.core.type;
99

10-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1110
import org.elasticsearch.common.io.stream.StreamInput;
1211
import org.elasticsearch.common.io.stream.StreamOutput;
1312
import org.elasticsearch.xpack.esql.core.expression.Expression;
14-
import org.elasticsearch.xpack.esql.core.type.DataType;
15-
import org.elasticsearch.xpack.esql.core.type.EsField;
16-
import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
1713

1814
import java.io.IOException;
1915
import java.util.HashMap;
@@ -31,11 +27,6 @@
3127
* type conversion is done at the data node level.
3228
*/
3329
public class MultiTypeEsField extends EsField {
34-
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
35-
EsField.class,
36-
"MultiTypeEsField",
37-
MultiTypeEsField::new
38-
);
3930

4031
private final Map<String, Expression> indexToConversionExpressions;
4132

@@ -44,21 +35,20 @@ public MultiTypeEsField(String name, DataType dataType, boolean aggregatable, Ma
4435
this.indexToConversionExpressions = indexToConversionExpressions;
4536
}
4637

47-
public MultiTypeEsField(StreamInput in) throws IOException {
38+
protected MultiTypeEsField(StreamInput in) throws IOException {
4839
this(in.readString(), DataType.readFrom(in), in.readBoolean(), in.readImmutableMap(i -> i.readNamedWriteable(Expression.class)));
4940
}
5041

5142
@Override
52-
public void writeTo(StreamOutput out) throws IOException {
43+
public void writeContent(StreamOutput out) throws IOException {
5344
out.writeString(getName());
5445
out.writeString(getDataType().typeName());
5546
out.writeBoolean(isAggregatable());
5647
out.writeMap(getIndexToConversionExpressions(), (o, v) -> out.writeNamedWriteable(v));
5748
}
5849

59-
@Override
6050
public String getWriteableName() {
61-
return ENTRY.name;
51+
return "MultiTypeEsField";
6252
}
6353

6454
public Map<String, Expression> getIndexToConversionExpressions() {

0 commit comments

Comments
 (0)