Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);

public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_113_0_00);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,35 @@
public class DateEsField extends EsField {

public static DateEsField dateEsField(String name, Map<String, EsField> properties, boolean hasDocValues) {
return new DateEsField(name, DataType.DATETIME, properties, hasDocValues);
return new DateEsField(name, DataType.DATETIME, properties, hasDocValues, TimeSeriesFieldType.UNKNOWN);
}

private DateEsField(String name, DataType dataType, Map<String, EsField> properties, boolean hasDocValues) {
super(name, dataType, properties, hasDocValues);
private DateEsField(
String name,
DataType dataType,
Map<String, EsField> properties,
boolean hasDocValues,
TimeSeriesFieldType timeSeriesFieldType
) {
super(name, dataType, properties, hasDocValues, timeSeriesFieldType);
}

protected DateEsField(StreamInput in) throws IOException {
this(readCachedStringWithVersionCheck(in), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
this(
readCachedStringWithVersionCheck(in),
DataType.DATETIME,
in.readImmutableMap(EsField::readFrom),
in.readBoolean(),
readTimeSeriesFieldType(in)
);
}

@Override
public void writeContent(StreamOutput out) throws IOException {
writeCachedStringWithVersionCheck(out, getName());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
out.writeBoolean(isAggregatable());
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,40 @@
*/
public class EsField implements Writeable {

private static Map<String, Writeable.Reader<? extends EsField>> readers = Map.ofEntries(
/**
* Fields in a TSDB can be either dimensions or metrics. This enum provides a way to store, serialize, and operate on those field
* roles within the ESQL query processing pipeline.
*/
public enum TimeSeriesFieldType implements Writeable {
UNKNOWN(0),
NONE(1),
METRIC(2),
DIMENSION(3);

private final int id;

TimeSeriesFieldType(int id) {
this.id = id;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
}

public static TimeSeriesFieldType readFromStream(StreamInput in) throws IOException {
int id = in.readVInt();
return switch (id) {
case 0 -> UNKNOWN;
case 1 -> NONE;
case 2 -> METRIC;
case 3 -> DIMENSION;
default -> throw new IOException("Unexpected value for TimeSeriesFieldType: " + id);
};
}
}

private static Map<String, Reader<? extends EsField>> readers = Map.ofEntries(
Map.entry("DateEsField", DateEsField::new),
Map.entry("EsField", EsField::new),
Map.entry("InvalidMappedField", InvalidMappedField::new),
Expand All @@ -37,7 +70,7 @@ public class EsField implements Writeable {
Map.entry("UnsupportedEsField", UnsupportedEsField::new)
);

public static Writeable.Reader<? extends EsField> getReader(String name) {
public static Reader<? extends EsField> getReader(String name) {
Reader<? extends EsField> result = readers.get(name);
if (result == null) {
throw new IllegalArgumentException("Invalid EsField type [" + name + "]");
Expand All @@ -50,17 +83,41 @@ public static Writeable.Reader<? extends EsField> getReader(String name) {
private final Map<String, EsField> properties;
private final String name;
private final boolean isAlias;
// Because the subclasses all reimplement serialization, this needs to be writeable from subclass constructors
private final TimeSeriesFieldType timeSeriesFieldType;

public EsField(String name, DataType esDataType, Map<String, EsField> properties, boolean aggregatable) {
this(name, esDataType, properties, aggregatable, false);
this(name, esDataType, properties, aggregatable, false, TimeSeriesFieldType.UNKNOWN);
}

public EsField(
String name,
DataType esDataType,
Map<String, EsField> properties,
boolean aggregatable,
TimeSeriesFieldType timeSeriesFieldType
) {
this(name, esDataType, properties, aggregatable, false, timeSeriesFieldType);
}

public EsField(String name, DataType esDataType, Map<String, EsField> properties, boolean aggregatable, boolean isAlias) {
this(name, esDataType, properties, aggregatable, isAlias, TimeSeriesFieldType.UNKNOWN);
}

public EsField(
String name,
DataType esDataType,
Map<String, EsField> properties,
boolean aggregatable,
boolean isAlias,
TimeSeriesFieldType timeSeriesFieldType
) {
this.name = name;
this.esDataType = esDataType;
this.aggregatable = aggregatable;
this.properties = properties;
this.isAlias = isAlias;
this.timeSeriesFieldType = timeSeriesFieldType;
}

public EsField(StreamInput in) throws IOException {
Expand All @@ -69,6 +126,7 @@ public EsField(StreamInput in) throws IOException {
this.properties = in.readImmutableMap(EsField::readFrom);
this.aggregatable = in.readBoolean();
this.isAlias = in.readBoolean();
this.timeSeriesFieldType = readTimeSeriesFieldType(in);
}

private DataType readDataType(StreamInput in) throws IOException {
Expand Down Expand Up @@ -107,6 +165,21 @@ public void writeContent(StreamOutput out) throws IOException {
out.writeMap(properties, (o, x) -> x.writeTo(out));
out.writeBoolean(aggregatable);
out.writeBoolean(isAlias);
writeTimeSeriesFieldType(out);
}

protected void writeTimeSeriesFieldType(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE)) {
this.timeSeriesFieldType.writeTo(out);
}
}

protected static TimeSeriesFieldType readTimeSeriesFieldType(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE)) {
return TimeSeriesFieldType.readFromStream(in);
} else {
return TimeSeriesFieldType.UNKNOWN;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class InvalidMappedField extends EsField {
private final Map<String, Set<String>> typesToIndices;

public InvalidMappedField(String name, String errorMessage, Map<String, EsField> properties) {
this(name, errorMessage, properties, Map.of());
this(name, errorMessage, properties, Map.of(), TimeSeriesFieldType.UNKNOWN);
}

public InvalidMappedField(String name, String errorMessage) {
Expand All @@ -45,17 +45,29 @@ public InvalidMappedField(String name, String errorMessage) {
* Constructor supporting union types, used in ES|QL.
*/
public InvalidMappedField(String name, Map<String, Set<String>> typesToIndices) {
this(name, makeErrorMessage(typesToIndices, false), new TreeMap<>(), typesToIndices);
this(name, makeErrorMessage(typesToIndices, false), new TreeMap<>(), typesToIndices, TimeSeriesFieldType.UNKNOWN);
}

private InvalidMappedField(String name, String errorMessage, Map<String, EsField> properties, Map<String, Set<String>> typesToIndices) {
super(name, DataType.UNSUPPORTED, properties, false);
private InvalidMappedField(
String name,
String errorMessage,
Map<String, EsField> properties,
Map<String, Set<String>> typesToIndices,
TimeSeriesFieldType type
) {
super(name, DataType.UNSUPPORTED, properties, false, type);
this.errorMessage = errorMessage;
this.typesToIndices = typesToIndices;
}

protected InvalidMappedField(StreamInput in) throws IOException {
this(readCachedStringWithVersionCheck(in), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
this(
readCachedStringWithVersionCheck(in),
in.readString(),
in.readImmutableMap(StreamInput::readString, EsField::readFrom),
Map.of(),
readTimeSeriesFieldType(in)
);
}

public Set<DataType> types() {
Expand All @@ -67,6 +79,7 @@ public void writeContent(StreamOutput out) throws IOException {
writeCachedStringWithVersionCheck(out, getName());
out.writeString(errorMessage);
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public KeywordEsField(
boolean normalized,
boolean isAlias
) {
this(name, KEYWORD, properties, hasDocValues, precision, normalized, isAlias);
this(name, KEYWORD, properties, hasDocValues, precision, normalized, isAlias, TimeSeriesFieldType.UNKNOWN);
}

protected KeywordEsField(
Expand All @@ -52,9 +52,10 @@ protected KeywordEsField(
boolean hasDocValues,
int precision,
boolean normalized,
boolean isAlias
boolean isAlias,
TimeSeriesFieldType timeSeriesFieldType
) {
super(name, esDataType, properties, hasDocValues, isAlias);
super(name, esDataType, properties, hasDocValues, isAlias, timeSeriesFieldType);
this.precision = precision;
this.normalized = normalized;
}
Expand All @@ -67,7 +68,8 @@ public KeywordEsField(StreamInput in) throws IOException {
in.readBoolean(),
in.readInt(),
in.readBoolean(),
in.readBoolean()
in.readBoolean(),
readTimeSeriesFieldType(in)
);
}

Expand All @@ -79,6 +81,7 @@ public void writeContent(StreamOutput out) throws IOException {
out.writeInt(precision);
out.writeBoolean(normalized);
out.writeBoolean(isAlias());
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,24 @@ public MultiTypeEsField(String name, DataType dataType, boolean aggregatable, Ma
this.indexToConversionExpressions = indexToConversionExpressions;
}

public MultiTypeEsField(
String name,
DataType dataType,
boolean aggregatable,
Map<String, Expression> indexToConversionExpressions,
TimeSeriesFieldType timeSeriesFieldType
) {
super(name, dataType, Map.of(), aggregatable, timeSeriesFieldType);
this.indexToConversionExpressions = indexToConversionExpressions;
}

protected MultiTypeEsField(StreamInput in) throws IOException {
this(
readCachedStringWithVersionCheck(in),
DataType.readFrom(in),
in.readBoolean(),
in.readImmutableMap(i -> i.readNamedWriteable(Expression.class))
in.readImmutableMap(i -> i.readNamedWriteable(Expression.class)),
readTimeSeriesFieldType(in)
);
}

Expand All @@ -53,6 +65,7 @@ public void writeContent(StreamOutput out) throws IOException {
getDataType().writeTo(out);
out.writeBoolean(isAggregatable());
out.writeMap(getIndexToConversionExpressions(), (o, v) -> out.writeNamedWriteable(v));
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,27 @@ public TextEsField(String name, Map<String, EsField> properties, boolean hasDocV
}

public TextEsField(String name, Map<String, EsField> properties, boolean hasDocValues, boolean isAlias) {
super(name, TEXT, properties, hasDocValues, isAlias);
super(name, TEXT, properties, hasDocValues, isAlias, TimeSeriesFieldType.UNKNOWN);
}

public TextEsField(
String name,
Map<String, EsField> properties,
boolean hasDocValues,
boolean isAlias,
TimeSeriesFieldType timeSeriesFieldType
) {
super(name, TEXT, properties, hasDocValues, isAlias, timeSeriesFieldType);
}

protected TextEsField(StreamInput in) throws IOException {
this(readCachedStringWithVersionCheck(in), in.readImmutableMap(EsField::readFrom), in.readBoolean(), in.readBoolean());
this(
readCachedStringWithVersionCheck(in),
in.readImmutableMap(EsField::readFrom),
in.readBoolean(),
in.readBoolean(),
readTimeSeriesFieldType(in)
);
}

@Override
Expand All @@ -43,6 +59,7 @@ public void writeContent(StreamOutput out) throws IOException {
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
out.writeBoolean(isAggregatable());
out.writeBoolean(isAlias());
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,29 @@ public UnsupportedEsField(String name, List<String> originalTypes) {
}

public UnsupportedEsField(String name, List<String> originalTypes, String inherited, Map<String, EsField> properties) {
super(name, DataType.UNSUPPORTED, properties, false);
this(name, originalTypes, inherited, properties, TimeSeriesFieldType.UNKNOWN);
}

public UnsupportedEsField(
String name,
List<String> originalTypes,
String inherited,
Map<String, EsField> properties,
TimeSeriesFieldType timeSeriesFieldType
) {
super(name, DataType.UNSUPPORTED, properties, false, timeSeriesFieldType);
this.originalTypes = originalTypes;
this.inherited = inherited;
}

public UnsupportedEsField(StreamInput in) throws IOException {
this(readCachedStringWithVersionCheck(in), readOriginalTypes(in), in.readOptionalString(), in.readImmutableMap(EsField::readFrom));
this(
readCachedStringWithVersionCheck(in),
readOriginalTypes(in),
in.readOptionalString(),
in.readImmutableMap(EsField::readFrom),
readTimeSeriesFieldType(in)
);
}

private static List<String> readOriginalTypes(StreamInput in) throws IOException {
Expand All @@ -64,6 +80,7 @@ public void writeContent(StreamOutput out) throws IOException {
}
out.writeOptionalString(getInherited());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
writeTimeSeriesFieldType(out);
}

public String getWriteableName() {
Expand Down
Loading