Skip to content

Commit 0692571

Browse files
committed
Fix attribute serialization behavior
1 parent 896c378 commit 0692571

File tree

6 files changed

+60
-28
lines changed

6 files changed

+60
-28
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,19 @@
66
*/
77
package org.elasticsearch.xpack.esql.core.expression;
88

9+
import org.elasticsearch.TransportVersion;
910
import org.elasticsearch.core.Nullable;
1011
import org.elasticsearch.xpack.esql.core.tree.Source;
1112
import org.elasticsearch.xpack.esql.core.type.DataType;
13+
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
14+
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
1215

16+
import java.io.IOException;
1317
import java.util.List;
1418
import java.util.Objects;
1519

1620
import static java.util.Collections.emptyList;
21+
import static org.elasticsearch.TransportVersions.ESQL_QUALIFIERS_IN_ATTRIBUTES;
1722

1823
/**
1924
* {@link Expression}s that can be materialized and describe properties of the derived table.
@@ -197,4 +202,22 @@ public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right
197202
* @return true if the attribute represents a TSDB dimension type
198203
*/
199204
public abstract boolean isDimension();
205+
206+
protected void checkAndSerializeQualifier(PlanStreamOutput out, TransportVersion version) throws IOException {
207+
if (version.onOrAfter(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
208+
out.writeOptionalCachedString(qualifier());
209+
} else if (qualifier() != null) {
210+
// Non-null qualifier means the query specifically defined one. Old nodes don't know what to do with it and just writing
211+
// null would lose information and lead to undefined, likely invalid queries.
212+
// IllegalArgumentException returns a 400 to the user, which is what we want here.
213+
throw new IllegalArgumentException("Trying to serialize an Attribute with a qualifier to an old node");
214+
}
215+
}
216+
217+
protected static String readQualifier(PlanStreamInput in, TransportVersion version) throws IOException {
218+
if (version.onOrAfter(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
219+
return in.readOptionalCachedString();
220+
}
221+
return null;
222+
}
200223
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Objects;
2323

2424
import static org.elasticsearch.TransportVersions.ESQL_FIELD_ATTRIBUTE_DROP_TYPE;
25-
import static org.elasticsearch.TransportVersions.ESQL_QUALIFIERS_IN_ATTRIBUTES;
2625
import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
2726
import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
2827

@@ -105,10 +104,7 @@ private static FieldAttribute innerReadFrom(StreamInput in) throws IOException {
105104
*/
106105
Source source = Source.readFrom((StreamInput & PlanStreamInput) in);
107106
String parentName = ((PlanStreamInput) in).readOptionalCachedString();
108-
String qualifier = null;
109-
if (in.getTransportVersion().onOrAfter(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
110-
qualifier = ((PlanStreamInput) in).readOptionalCachedString();
111-
}
107+
String qualifier = readQualifier((PlanStreamInput) in, in.getTransportVersion());
112108
String name = readCachedStringWithVersionCheck(in);
113109
if (in.getTransportVersion().before(ESQL_FIELD_ATTRIBUTE_DROP_TYPE)) {
114110
DataType.readFrom(in);
@@ -128,16 +124,14 @@ public void writeTo(StreamOutput out) throws IOException {
128124
if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
129125
Source.EMPTY.writeTo(out);
130126
((PlanStreamOutput) out).writeOptionalCachedString(parentName);
131-
if (out.getTransportVersion().onOrAfter(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
132-
((PlanStreamOutput) out).writeOptionalCachedString(qualifier());
133-
}
127+
checkAndSerializeQualifier((PlanStreamOutput) out, out.getTransportVersion());
134128
writeCachedStringWithVersionCheck(out, name());
135129
if (out.getTransportVersion().before(ESQL_FIELD_ATTRIBUTE_DROP_TYPE)) {
136130
dataType().writeTo(out);
137131
}
138132
field.writeTo(out);
139133
if (out.getTransportVersion().before(ESQL_FIELD_ATTRIBUTE_DROP_TYPE)) {
140-
// We used to write the qualifier here. We can still do if needed in the future.
134+
// We used to write the qualifier here, even though it was always null.
141135
out.writeOptionalString(null);
142136
}
143137
out.writeEnum(nullable());

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/ReferenceAttribute.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.io.IOException;
2020

21+
import static org.elasticsearch.TransportVersions.ESQL_QUALIFIERS_IN_ATTRIBUTES;
22+
2123
/**
2224
* Attribute based on a reference to an expression.
2325
*/
@@ -58,7 +60,11 @@ public void writeTo(StreamOutput out) throws IOException {
5860
Source.EMPTY.writeTo(out);
5961
out.writeString(name());
6062
dataType().writeTo(out);
61-
out.writeOptionalString(qualifier());
63+
checkAndSerializeQualifier((PlanStreamOutput) out, out.getTransportVersion());
64+
if (out.getTransportVersion().before(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
65+
// We used to always serialize a null qualifier here, so do the same for bwc.
66+
out.writeOptionalString(null);
67+
}
6268
out.writeEnum(nullable());
6369
id().writeTo(out);
6470
out.writeBoolean(synthetic());
@@ -82,8 +88,10 @@ private static ReferenceAttribute innerReadFrom(StreamInput in) throws IOExcepti
8288
// We could cache this if we wanted to.
8389
String name = in.readString();
8490
DataType dataType = DataType.readFrom(in);
85-
// Also cacheable
86-
String qualifier = in.readOptionalString();
91+
String qualifier = readQualifier((PlanStreamInput) in, in.getTransportVersion());
92+
if (in.getTransportVersion().before(ESQL_QUALIFIERS_IN_ATTRIBUTES)) {
93+
in.readOptionalString();
94+
}
8795
Nullability nullability = in.readEnum(Nullability.class);
8896
NameId id = NameId.readFrom((StreamInput & PlanStreamInput) in);
8997
boolean synthetic = in.readBoolean();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttribute.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,24 @@ public UnsupportedAttribute(
8888
this.message = customMessage == null ? errorMessage(name(), field) : customMessage;
8989
}
9090

91-
private UnsupportedAttribute(StreamInput in) throws IOException {
92-
this(
93-
Source.readFrom((PlanStreamInput) in),
94-
readCachedStringWithVersionCheck(in),
95-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_2) ? EsField.readFrom(in) : new UnsupportedEsField(in),
96-
in.readOptionalString(),
97-
NameId.readFrom((PlanStreamInput) in)
98-
);
91+
private static UnsupportedAttribute innerReadFrom(StreamInput in) throws IOException {
92+
Source source = Source.readFrom((PlanStreamInput) in);
93+
String qualifier = readQualifier((PlanStreamInput) in, in.getTransportVersion());
94+
String name = readCachedStringWithVersionCheck(in);
95+
UnsupportedEsField field = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_2)
96+
? EsField.readFrom(in)
97+
: new UnsupportedEsField(in);
98+
String message = in.readOptionalString();
99+
NameId id = NameId.readFrom((PlanStreamInput) in);
100+
101+
return new UnsupportedAttribute(source, qualifier, name, field, message, id);
99102
}
100103

101104
@Override
102105
public void writeTo(StreamOutput out) throws IOException {
103106
if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
104107
Source.EMPTY.writeTo(out);
108+
checkAndSerializeQualifier((PlanStreamOutput) out, out.getTransportVersion());
105109
writeCachedStringWithVersionCheck(out, name());
106110
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_2)) {
107111
field().writeTo(out);
@@ -114,7 +118,7 @@ public void writeTo(StreamOutput out) throws IOException {
114118
}
115119

116120
public static UnsupportedAttribute readFrom(StreamInput in) throws IOException {
117-
return ((PlanStreamInput) in).readAttributeWithCache(UnsupportedAttribute::new);
121+
return ((PlanStreamInput) in).readAttributeWithCache(UnsupportedAttribute::innerReadFrom);
118122
}
119123

120124
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/ExpressionBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ private Alias visitRerankField(EsqlBaseParser.RerankFieldContext ctx, Source sou
941941

942942
var boolExprCtx = ctx.booleanExpression();
943943
Expression value = boolExprCtx == null ? id : expression(boolExprCtx);
944-
return new Alias(source, source(ctx.qualifiedName()).text(), value);
944+
return new Alias(source, id.qualifier() != null ? id.qualifiedName() : id.name(), value);
945945
}
946946

947947
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttributeTests.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,28 @@ protected UnsupportedAttribute create() {
1919
}
2020

2121
public static UnsupportedAttribute randomUnsupportedAttribute() {
22+
String qualifier = randomBoolean() ? null : randomAlphaOfLength(3);
2223
String name = randomAlphaOfLength(5);
2324
UnsupportedEsField field = UnsupportedEsFieldTests.randomUnsupportedEsField(4);
2425
String customMessage = randomBoolean() ? null : randomAlphaOfLength(9);
2526
NameId id = new NameId();
26-
return new UnsupportedAttribute(Source.EMPTY, name, field, customMessage, id);
27+
return new UnsupportedAttribute(Source.EMPTY, qualifier, name, field, customMessage, id);
2728
}
2829

2930
@Override
3031
protected UnsupportedAttribute mutate(UnsupportedAttribute instance) {
3132
Source source = instance.source();
33+
String qualifier = instance.qualifier();
3234
String name = instance.name();
3335
UnsupportedEsField field = instance.field();
3436
String customMessage = instance.hasCustomMessage() ? instance.unresolvedMessage() : null;
35-
switch (between(0, 2)) {
36-
case 0 -> name = randomAlphaOfLength(name.length() + 1);
37-
case 1 -> field = randomValueOtherThan(field, () -> UnsupportedEsFieldTests.randomUnsupportedEsField(4));
38-
case 2 -> customMessage = randomValueOtherThan(customMessage, () -> randomBoolean() ? null : randomAlphaOfLength(9));
37+
switch (between(0, 3)) {
38+
case 0 -> qualifier = randomAlphaOfLength(qualifier == null ? 3 : qualifier.length() + 1);
39+
case 1 -> name = randomAlphaOfLength(name.length() + 1);
40+
case 2 -> field = randomValueOtherThan(field, () -> UnsupportedEsFieldTests.randomUnsupportedEsField(4));
41+
case 3 -> customMessage = randomValueOtherThan(customMessage, () -> randomBoolean() ? null : randomAlphaOfLength(9));
3942
default -> throw new IllegalArgumentException();
4043
}
41-
return new UnsupportedAttribute(source, name, field, customMessage, new NameId());
44+
return new UnsupportedAttribute(source, qualifier, name, field, customMessage, new NameId());
4245
}
4346
}

0 commit comments

Comments
 (0)