Skip to content

Commit 8e3e19d

Browse files
committed
Wire ESQL query into Datafeeds
1 parent 10ac063 commit 8e3e19d

File tree

5 files changed

+127
-26
lines changed

5 files changed

+127
-26
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ public class DatafeedConfig implements SimpleDiffable<DatafeedConfig>, ToXConten
119119
public static final ParseField INDEXES = new ParseField("indexes");
120120
public static final ParseField INDICES = new ParseField("indices");
121121
public static final ParseField QUERY = new ParseField("query");
122+
public static final ParseField ESQL_QUERY = new ParseField("esql_query");
123+
// TODO: implement "esql_params" for parametrized queries
122124
public static final ParseField SCROLL_SIZE = new ParseField("scroll_size");
123125
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
124126
public static final ParseField AGGS = new ParseField("aggs");
@@ -177,6 +179,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
177179
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, DATAFEED_CONFIG_QUERY_BAD_FORMAT),
178180
QUERY
179181
);
182+
parser.declareString(Builder::setEsqlQuery, ESQL_QUERY);
180183
parser.declareObject(Builder::setAggregationsSafe, (p, c) -> AggProvider.fromXContent(p, ignoreUnknownFields), AGGREGATIONS);
181184
parser.declareObject(Builder::setAggregationsSafe, (p, c) -> AggProvider.fromXContent(p, ignoreUnknownFields), AGGS);
182185
parser.declareObject(Builder::setScriptFields, (p, c) -> {
@@ -230,6 +233,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
230233
private final List<String> indices;
231234
private final QueryProvider queryProvider;
232235
private final AggProvider aggProvider;
236+
private final String esqlQuery;
233237
private final List<SearchSourceBuilder.ScriptField> scriptFields;
234238
private final Integer scrollSize;
235239
private final ChunkingConfig chunkingConfig;
@@ -247,6 +251,7 @@ private DatafeedConfig(
247251
List<String> indices,
248252
QueryProvider queryProvider,
249253
AggProvider aggProvider,
254+
String esqlQuery,
250255
List<SearchSourceBuilder.ScriptField> scriptFields,
251256
Integer scrollSize,
252257
ChunkingConfig chunkingConfig,
@@ -263,6 +268,7 @@ private DatafeedConfig(
263268
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
264269
this.queryProvider = queryProvider == null ? null : new QueryProvider(queryProvider);
265270
this.aggProvider = aggProvider == null ? null : new AggProvider(aggProvider);
271+
this.esqlQuery = esqlQuery;
266272
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
267273
this.scrollSize = scrollSize;
268274
this.chunkingConfig = chunkingConfig;
@@ -278,15 +284,12 @@ public DatafeedConfig(StreamInput in) throws IOException {
278284
this.jobId = in.readString();
279285
this.queryDelay = in.readOptionalTimeValue();
280286
this.frequency = in.readOptionalTimeValue();
281-
if (in.readBoolean()) {
282-
this.indices = in.readCollectionAsImmutableList(StreamInput::readString);
283-
} else {
284-
this.indices = null;
285-
}
287+
this.indices = in.readOptionalStringCollectionAsList();
286288
// each of these writables are version aware
287-
this.queryProvider = QueryProvider.fromStream(in);
289+
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
288290
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
289291
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
292+
this.esqlQuery = in.readOptionalString();
290293

291294
if (in.readBoolean()) {
292295
this.scriptFields = in.readCollectionAsImmutableList(SearchSourceBuilder.ScriptField::new);
@@ -394,6 +397,10 @@ public Map<String, Object> getQuery() {
394397
return queryProvider == null ? null : queryProvider.getQuery();
395398
}
396399

400+
public String getEsqlQuery() {
401+
return esqlQuery;
402+
}
403+
397404
/**
398405
* Fully parses the semi-parsed {@code Map<String, Object>} aggregations
399406
*
@@ -515,24 +522,11 @@ public void writeTo(StreamOutput out) throws IOException {
515522
out.writeString(jobId);
516523
out.writeOptionalTimeValue(queryDelay);
517524
out.writeOptionalTimeValue(frequency);
518-
if (indices != null) {
519-
out.writeBoolean(true);
520-
out.writeStringCollection(indices);
521-
} else {
522-
out.writeBoolean(false);
523-
}
524-
525-
// Each of these writables are version aware
526-
queryProvider.writeTo(out); // never null
527-
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
525+
out.writeOptionalStringCollection(indices);
526+
out.writeOptionalWriteable(queryProvider);
528527
out.writeOptionalWriteable(aggProvider);
529-
530-
if (scriptFields != null) {
531-
out.writeBoolean(true);
532-
out.writeCollection(scriptFields);
533-
} else {
534-
out.writeBoolean(false);
535-
}
528+
out.writeOptionalString(esqlQuery);
529+
out.writeOptionalCollection(scriptFields);
536530
out.writeOptionalVInt(scrollSize);
537531
out.writeOptionalWriteable(chunkingConfig);
538532
out.writeMap(headers, StreamOutput::writeString);
@@ -582,7 +576,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
582576
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
583577
}
584578
}
585-
builder.field(QUERY.getPreferredName(), queryProvider.getQuery());
579+
if (queryProvider != null) {
580+
builder.field(QUERY.getPreferredName(), queryProvider.getQuery());
581+
}
582+
if (esqlQuery != null) {
583+
builder.field(ESQL_QUERY.getPreferredName(), esqlQuery);
584+
}
586585
if (frequency != null) {
587586
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
588587
}
@@ -751,7 +750,8 @@ public static class Builder implements Writeable {
751750
private TimeValue queryDelay;
752751
private TimeValue frequency;
753752
private List<String> indices = Collections.emptyList();
754-
private QueryProvider queryProvider = QueryProvider.defaultQuery();
753+
private QueryProvider queryProvider;
754+
private String esqlQuery;
755755
private AggProvider aggProvider;
756756
private List<SearchSourceBuilder.ScriptField> scriptFields;
757757
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
@@ -778,6 +778,7 @@ public Builder(DatafeedConfig config) {
778778
this.indices = new ArrayList<>(config.indices);
779779
this.queryProvider = config.queryProvider == null ? null : new QueryProvider(config.queryProvider);
780780
this.aggProvider = config.aggProvider == null ? null : new AggProvider(config.aggProvider);
781+
this.esqlQuery = config.esqlQuery;
781782
this.scriptFields = config.scriptFields == null ? null : new ArrayList<>(config.scriptFields);
782783
this.scrollSize = config.scrollSize;
783784
this.chunkingConfig = config.chunkingConfig;
@@ -802,6 +803,7 @@ public Builder(StreamInput in) throws IOException {
802803
this.queryProvider = QueryProvider.fromStream(in);
803804
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
804805
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
806+
this.esqlQuery = in.readOptionalString();
805807

806808
if (in.readBoolean()) {
807809
this.scriptFields = in.readCollectionAsImmutableList(SearchSourceBuilder.ScriptField::new);
@@ -836,6 +838,7 @@ public void writeTo(StreamOutput out) throws IOException {
836838
queryProvider.writeTo(out); // never null
837839
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
838840
out.writeOptionalWriteable(aggProvider);
841+
out.writeOptionalString(esqlQuery);
839842

840843
if (scriptFields != null) {
841844
out.writeBoolean(true);
@@ -939,7 +942,12 @@ public Builder setFrequency(TimeValue frequency) {
939942
}
940943

941944
public Builder setQueryProvider(QueryProvider queryProvider) {
942-
this.queryProvider = ExceptionsHelper.requireNonNull(queryProvider, QUERY.getPreferredName());
945+
this.queryProvider = queryProvider;
946+
return this;
947+
}
948+
949+
public Builder setEsqlQuery(String esqlQuery) {
950+
this.esqlQuery = esqlQuery;
943951
return this;
944952
}
945953

@@ -1055,6 +1063,11 @@ public DatafeedConfig build() {
10551063
if (indicesOptions == null) {
10561064
indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED;
10571065
}
1066+
1067+
if (queryProvider == null && esqlQuery == null) {
1068+
queryProvider = QueryProvider.defaultQuery();
1069+
}
1070+
10581071
return new DatafeedConfig(
10591072
id,
10601073
jobId,
@@ -1063,6 +1076,7 @@ public DatafeedConfig build() {
10631076
indices,
10641077
queryProvider,
10651078
aggProvider,
1079+
esqlQuery,
10661080
scriptFields,
10671081
scrollSize,
10681082
chunkingConfig,

x-pack/plugin/core/template-resources/src/main/resources/ml/config_index_mappings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,9 @@
376376
}
377377
}
378378
},
379+
"esql_query": {
380+
"type": "keyword"
381+
},
379382
"finished_time" : {
380383
"type" : "date"
381384
},

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.CompositeAggregationDataExtractorFactory;
2626
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
2727
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
28+
import org.elasticsearch.xpack.ml.datafeed.extractor.esql.EsqlDataExtractorFactory;
2829
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
2930

3031
public interface DataExtractorFactory {
@@ -59,6 +60,7 @@ static void create(
5960
) {
6061
final boolean hasAggs = datafeed.hasAggregations();
6162
final boolean isComposite = hasAggs && datafeed.hasCompositeAgg(xContentRegistry);
63+
final boolean hasEsqlQuery = datafeed.getEsqlQuery() != null;
6264
ActionListener<DataExtractorFactory> factoryHandler = listener.delegateFailureAndWrap(
6365
(l, factory) -> l.onResponse(
6466
datafeed.getChunkingConfig().isEnabled()
@@ -68,6 +70,10 @@ static void create(
6870
);
6971

7072
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(response -> {
73+
if (hasEsqlQuery) {
74+
EsqlDataExtractorFactory.create(datafeed, factoryHandler);
75+
return;
76+
}
7177
final boolean hasRollup = response.getJobs().isEmpty() == false;
7278
if (hasRollup && hasAggs == false) {
7379
listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices"));
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.datafeed.extractor.esql;
9+
10+
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
11+
12+
import java.io.IOException;
13+
14+
public class EsqlDataExtractor implements DataExtractor {
15+
16+
@Override
17+
public DataSummary getSummary() {
18+
return null;
19+
}
20+
21+
@Override
22+
public boolean hasNext() {
23+
return false;
24+
}
25+
26+
@Override
27+
public Result next() throws IOException {
28+
return null;
29+
}
30+
31+
@Override
32+
public boolean isCancelled() {
33+
return false;
34+
}
35+
36+
@Override
37+
public void cancel() {
38+
39+
}
40+
41+
@Override
42+
public void destroy() {
43+
44+
}
45+
46+
@Override
47+
public long getEndTime() {
48+
return 0;
49+
}
50+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.datafeed.extractor.esql;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
12+
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
13+
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
14+
//
15+
//import java.time.ZoneOffset;
16+
//import java.util.List;
17+
//import java.util.Locale;
18+
19+
public class EsqlDataExtractorFactory implements DataExtractorFactory {
20+
21+
public static void create(DatafeedConfig datafeed, ActionListener<DataExtractorFactory> factoryHandler) {
22+
}
23+
24+
@Override
25+
public DataExtractor newExtractor(long start, long end) {
26+
return null;
27+
}
28+
}

0 commit comments

Comments
 (0)