-
Notifications
You must be signed in to change notification settings - Fork 25.5k
ESQL: Timezone query setting and added to the query config #136205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
2d627e7
15907af
05129b0
65e332e
2dbd374
3f90a9e
098645f
f750112
06391d8
deff0a0
38e0d31
8d44bb6
fdd23af
e395106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
9191000 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
reordered_translog_operations,9190000 | ||
esql_configuration_query_settings,9191000 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,96 +7,74 @@ | |
|
||
package org.elasticsearch.xpack.esql.plan; | ||
|
||
import org.elasticsearch.TransportVersion; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.transport.RemoteClusterService; | ||
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException; | ||
import org.elasticsearch.xpack.esql.core.expression.Expression; | ||
import org.elasticsearch.xpack.esql.core.type.DataType; | ||
import org.elasticsearch.xpack.esql.expression.Foldables; | ||
import org.elasticsearch.xpack.esql.parser.ParsingException; | ||
|
||
import java.util.function.Predicate; | ||
import java.io.IOException; | ||
import java.time.ZoneId; | ||
import java.time.ZoneOffset; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
public enum QuerySettings { | ||
public class QuerySettings { | ||
// TODO check cluster state and see if project routing is allowed | ||
// see https://github.com/elastic/elasticsearch/pull/134446 | ||
// PROJECT_ROUTING(..., state -> state.getRemoteClusterNames().crossProjectEnabled()); | ||
PROJECT_ROUTING( | ||
public static final QuerySettingDef<String> PROJECT_ROUTING = new QuerySettingDef<>( | ||
"project_routing", | ||
DataType.KEYWORD, | ||
true, | ||
false, | ||
true, | ||
"A project routing expression, " | ||
+ "used to define which projects to route the query to. " | ||
+ "Only supported if Cross-Project Search is enabled." | ||
),; | ||
|
||
private String settingName; | ||
private DataType type; | ||
private final boolean serverlessOnly; | ||
private final boolean snapshotOnly; | ||
private final boolean preview; | ||
private final String description; | ||
private final Predicate<RemoteClusterService> validator; | ||
|
||
QuerySettings( | ||
String name, | ||
DataType type, | ||
boolean serverlessOnly, | ||
boolean preview, | ||
boolean snapshotOnly, | ||
String description, | ||
Predicate<RemoteClusterService> validator | ||
) { | ||
this.settingName = name; | ||
this.type = type; | ||
this.serverlessOnly = serverlessOnly; | ||
this.preview = preview; | ||
this.snapshotOnly = snapshotOnly; | ||
this.description = description; | ||
this.validator = validator; | ||
} | ||
|
||
QuerySettings(String name, DataType type, boolean serverlessOnly, boolean preview, boolean snapshotOnly, String description) { | ||
this(name, type, serverlessOnly, preview, snapshotOnly, description, state -> true); | ||
} | ||
|
||
public String settingName() { | ||
return settingName; | ||
} | ||
|
||
public DataType type() { | ||
return type; | ||
} | ||
|
||
public boolean serverlessOnly() { | ||
return serverlessOnly; | ||
} | ||
+ "Only supported if Cross-Project Search is enabled.", | ||
(value, settings) -> Foldables.stringLiteralValueOf(value, "Unexpected value"), | ||
(_rcs) -> null | ||
); | ||
|
||
public boolean snapshotOnly() { | ||
return snapshotOnly; | ||
} | ||
|
||
public boolean preview() { | ||
return preview; | ||
} | ||
|
||
public String description() { | ||
return description; | ||
} | ||
public static final QuerySettingDef<ZoneId> TIME_ZONE = new QuerySettingDef<>( | ||
"time_zone", | ||
DataType.KEYWORD, | ||
false, | ||
true, | ||
true, | ||
"The default timezone to be used in the query, by the functions and commands that require it. Defaults to UTC", | ||
(value, _rcs) -> { | ||
String timeZone = Foldables.stringLiteralValueOf(value, "Unexpected value"); | ||
try { | ||
return ZoneId.of(timeZone); | ||
} catch (Exception exc) { | ||
throw new QlIllegalArgumentException("Invalid time zone [" + timeZone + "]"); | ||
alex-spies marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
}, | ||
(_rcs) -> ZoneOffset.UTC | ||
); | ||
|
||
public Predicate<RemoteClusterService> validator() { | ||
return validator; | ||
} | ||
public static final QuerySettingDef<?>[] ALL_SETTINGS = { PROJECT_ROUTING, TIME_ZONE }; | ||
alex-spies marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
public static void validate(EsqlStatement statement, RemoteClusterService clusterService) { | ||
for (QuerySetting setting : statement.settings()) { | ||
boolean found = false; | ||
for (QuerySettings qs : values()) { | ||
if (qs.settingName().equals(setting.name())) { | ||
for (QuerySettingDef<?> def : ALL_SETTINGS) { | ||
if (def.name().equals(setting.name())) { | ||
found = true; | ||
if (setting.value().dataType() != qs.type()) { | ||
throw new ParsingException(setting.source(), "Setting [" + setting.name() + "] must be of type " + qs.type()); | ||
if (setting.value().dataType() != def.type()) { | ||
throw new ParsingException(setting.source(), "Setting [" + setting.name() + "] must be of type " + def.type()); | ||
} | ||
if (qs.validator().test(clusterService) == false) { | ||
throw new ParsingException(setting.source(), "Setting [" + setting.name() + "] is not allowed"); | ||
String error = def.validator().validate(setting.value(), clusterService); | ||
if (error != null) { | ||
throw new ParsingException("Error validating setting [" + setting.name() + "]: " + error); | ||
} | ||
break; | ||
} | ||
|
@@ -106,4 +84,118 @@ public static void validate(EsqlStatement statement, RemoteClusterService cluste | |
} | ||
} | ||
} | ||
|
||
public static QuerySettingsMap toMap(EsqlStatement statement) { | ||
var settings = new HashMap<String, Expression>(); | ||
for (QuerySetting setting : statement.settings()) { | ||
settings.put(setting.name(), setting.value()); | ||
} | ||
return new QuerySettingsMap(settings); | ||
} | ||
|
||
public static class QuerySettingsMap implements Writeable { | ||
|
||
private static final TransportVersion ESQL_CONFIGURATION_QUERY_SETTINGS = TransportVersion.fromName( | ||
"esql_configuration_query_settings" | ||
); | ||
|
||
private final Map<String, Expression> settings; | ||
|
||
|
||
public QuerySettingsMap(Map<String, Expression> settings) { | ||
|
||
this.settings = settings; | ||
} | ||
|
||
public QuerySettingsMap(StreamInput in) throws IOException { | ||
if (in.getTransportVersion().supports(ESQL_CONFIGURATION_QUERY_SETTINGS)) { | ||
this.settings = in.readMap(StreamInput::readString, r -> r.readNamedWriteable(Expression.class)); | ||
} else { | ||
this.settings = Map.of(); | ||
} | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
if (out.getTransportVersion().supports(ESQL_CONFIGURATION_QUERY_SETTINGS)) { | ||
out.writeMap(settings, StreamOutput::writeString, StreamOutput::writeNamedWriteable); | ||
} | ||
} | ||
|
||
public <T> T get(QuerySettingDef<T> def, RemoteClusterService clusterService) { | ||
|
||
Expression value = settings.get(def.name()); | ||
if (value == null) { | ||
return def.defaultValueSupplier.apply(clusterService); | ||
} | ||
return def.get(value, clusterService); | ||
} | ||
} | ||
|
||
/** | ||
* Definition of a query setting. | ||
* | ||
* @param name The name to be used when setting it in the query. E.g. {@code SET name=value} | ||
* @param type The allowed datatype of the setting. | ||
* @param serverlessOnly | ||
* @param preview | ||
* @param snapshotOnly | ||
* @param description The user-facing description of the setting. | ||
* @param validator A validation function to check the setting value. | ||
* Defaults to calling the {@link #parser} and returning the error message of any exception it throws. | ||
* @param parser A function to parse the setting value into the final object. | ||
* @param defaultValueSupplier A supplier of the default value to be used when the setting is not set. | ||
* @param <T> The type of the setting value. | ||
*/ | ||
public record QuerySettingDef<T>( | ||
alex-spies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String name, | ||
DataType type, | ||
boolean serverlessOnly, | ||
boolean preview, | ||
boolean snapshotOnly, | ||
String description, | ||
Validator validator, | ||
Parser<T> parser, | ||
Function<RemoteClusterService, T> defaultValueSupplier | ||
) { | ||
public QuerySettingDef( | ||
String name, | ||
DataType type, | ||
boolean serverlessOnly, | ||
boolean preview, | ||
boolean snapshotOnly, | ||
String description, | ||
Parser<T> parser, | ||
Function<RemoteClusterService, T> defaultValueSupplier | ||
) { | ||
this(name, type, serverlessOnly, preview, snapshotOnly, description, (value, rcs) -> { | ||
try { | ||
parser.parse(value, rcs); | ||
return null; | ||
} catch (Exception exc) { | ||
return exc.getMessage(); | ||
} | ||
}, parser, defaultValueSupplier); | ||
} | ||
|
||
public T get(Expression value, RemoteClusterService clusterService) { | ||
if (value == null) { | ||
return defaultValueSupplier.apply(clusterService); | ||
} | ||
return parser.parse(value, clusterService); | ||
} | ||
|
||
@FunctionalInterface | ||
public interface Validator { | ||
/** | ||
* Validates the setting value and returns the error message if there's an error, or null otherwise. | ||
*/ | ||
@Nullable | ||
String validate(Expression value, RemoteClusterService clusterService); | ||
} | ||
|
||
@FunctionalInterface | ||
public interface Parser<T> { | ||
/** | ||
* Parses an already validated expression. | ||
*/ | ||
T parse(Expression value, RemoteClusterService clusterService); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.plugin; | ||
|
||
public record EsqlQueryClusterSettings( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a bundle of the required cluster settings used by the EsqlSession to create the configuration. We could also pass the cluster settings themselves and let the EsqlSession do it itself, but I wasn't sure if we wanted that. I'll evaluate the change |
||
int resultTruncationMaxSize, | ||
int resultTruncationDefaultSize, | ||
int timeseriesResultTruncationMaxSize, | ||
int timeseriesResultTruncationDefaultSize | ||
) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "Configuration" object wasn't being used here, at all. So I removed it. And this led to many changes along tests.
That's good btw, as otherwise creating the config after parsing would have been quite harder