Skip to content

Commit d6f73d8

Browse files
committed
Implement EsqlDataExtractor.
1 parent 8e3e19d commit d6f73d8

File tree

7 files changed

+165
-19
lines changed

7 files changed

+165
-19
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM;
7070
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT;
7171
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS;
72+
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_INCOMPATIBLE_WITH_ESQL;
7273
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE;
7374
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT;
7475
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION;
@@ -800,7 +801,7 @@ public Builder(StreamInput in) throws IOException {
800801
this.indices = null;
801802
}
802803
// each of these writables are version aware
803-
this.queryProvider = QueryProvider.fromStream(in);
804+
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
804805
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
805806
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
806807
this.esqlQuery = in.readOptionalString();
@@ -835,7 +836,7 @@ public void writeTo(StreamOutput out) throws IOException {
835836
}
836837

837838
// Each of these writables are version aware
838-
queryProvider.writeTo(out); // never null
839+
out.writeOptionalWriteable(queryProvider);
839840
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
840841
out.writeOptionalWriteable(aggProvider);
841842
out.writeOptionalString(esqlQuery);
@@ -1051,7 +1052,13 @@ public DatafeedConfig build() {
10511052
if (MlStrings.isValidId(id) == false) {
10521053
throw ExceptionsHelper.badRequestException(getMessage(INVALID_ID, ID.getPreferredName(), id));
10531054
}
1054-
if (indices == null || indices.isEmpty() || indices.contains("")) {
1055+
if (esqlQuery != null && queryProvider != null) {
1056+
throw ExceptionsHelper.badRequestException(getMessage(DATAFEED_CONFIG_INCOMPATIBLE_WITH_ESQL, QUERY.getPreferredName()));
1057+
}
1058+
if (esqlQuery != null && indices != null && indices.isEmpty() == false) {
1059+
throw ExceptionsHelper.badRequestException(getMessage(DATAFEED_CONFIG_INCOMPATIBLE_WITH_ESQL, INDICES.getPreferredName()));
1060+
}
1061+
if (esqlQuery == null && (indices == null || indices.isEmpty() || indices.contains(""))) {
10551062
throw invalidOptionValue(INDICES.getPreferredName(), indices);
10561063
}
10571064

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public final class Messages {
3030
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
3131
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed query is not parsable";
3232
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed aggregations are not parsable";
33-
33+
public static final String DATAFEED_CONFIG_INCOMPATIBLE_WITH_ESQL = "[{0}] and [esql] are incompatible";
3434
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
3535
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
3636
public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM = "A date_histogram (or histogram) aggregation is required";

x-pack/plugin/ml/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ dependencies {
7979
compileOnly project(path: xpackModule('core'))
8080
compileOnly project(path: xpackModule('autoscaling'))
8181
compileOnly project(path: xpackModule('ml-package-loader'))
82+
compileOnly project(path: xpackModule('ql'))
8283
testImplementation(testArtifact(project(xpackModule('core'))))
8384
testImplementation project(path: xpackModule('ilm'))
8485
testImplementation project(path: xpackModule('shutdown'))

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public List<BucketWithMissingData> detectMissingData(long latestFinalizedBucketM
9393
return Collections.emptyList();
9494
}
9595

96+
if (datafeedQuery == null) {
97+
// In case of an ES|QL datafeed, the query is null and this will fail.
98+
// TODO: implement DatafeedDelayedDataDetector for ES|QL datafeeds.
99+
return Collections.emptyList();
100+
}
101+
96102
List<Bucket> finalizedBuckets = checkBucketEvents(start, end);
97103
Map<Long, Long> indexedData = checkCurrentBucketEventCount(start, end);
98104
return finalizedBuckets.stream()

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ static void create(
7171

7272
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(response -> {
7373
if (hasEsqlQuery) {
74-
EsqlDataExtractorFactory.create(datafeed, factoryHandler);
74+
EsqlDataExtractorFactory.create(client, datafeed, job.getDataDescription().getTimeField(), factoryHandler);
7575
return;
7676
}
7777
final boolean hasRollup = response.getJobs().isEmpty() == false;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/esql/EsqlDataExtractor.java

Lines changed: 124 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,86 @@
77

88
package org.elasticsearch.xpack.ml.datafeed.extractor.esql;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
import org.elasticsearch.xpack.core.ClientHelper;
18+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
19+
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
20+
import org.elasticsearch.xpack.esql.action.ColumnInfo;
21+
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
22+
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
23+
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
1024
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
25+
import org.elasticsearch.xpack.ql.util.DateUtils;
1126

1227
import java.io.IOException;
28+
import java.util.Iterator;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.Optional;
1332

1433
public class EsqlDataExtractor implements DataExtractor {
1534

35+
private static final Logger logger = LogManager.getLogger(EsqlDataExtractor.class);
36+
37+
private final Client client;
38+
private final DatafeedConfig datafeed;
39+
private final String timeField;
40+
private final SearchInterval interval;
41+
private boolean isCancelled;
42+
43+
EsqlDataExtractor(Client client, DatafeedConfig datafeed, String timeField, long start, long end) {
44+
this.client = Objects.requireNonNull(client);
45+
this.datafeed = Objects.requireNonNull(datafeed);
46+
this.timeField = timeField;
47+
this.interval = new SearchInterval(start, end);
48+
this.isCancelled = false;
49+
}
50+
51+
// TODO: check whether these expressions facilitate injection attacks!
52+
private String esqlTimeFilter() {
53+
return Strings.format(
54+
" | WHERE %s >= \"%s\" AND %s < \"%s\"",
55+
timeField,
56+
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(
57+
Math.min(interval.startMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999)
58+
),
59+
timeField,
60+
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(
61+
Math.min(interval.endMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999)
62+
)
63+
);
64+
}
65+
66+
private String esqlSortByTime() {
67+
return Strings.format(" | SORT %s", timeField);
68+
}
69+
70+
private String esqlSummaryStats() {
71+
return Strings.format(" | STATS earliest_time=MIN(%s), latest_time=MAX(%s), total_hits=COUNT(*)", timeField, timeField);
72+
}
73+
1674
@Override
1775
public DataSummary getSummary() {
18-
return null;
76+
EsqlQueryRequest request = new EsqlQueryRequest();
77+
request.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSummaryStats());
78+
79+
try (EsqlQueryResponse response = execute(request)) {
80+
Iterator<Object> values = response.values().next();
81+
String earliestTime = (String) values.next();
82+
String latestTime = (String) values.next();
83+
long totalHits = (long) values.next();
84+
return new DataSummary(
85+
earliestTime == null ? null : DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis(earliestTime),
86+
latestTime == null ? null : DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis(latestTime),
87+
totalHits
88+
);
89+
}
1990
}
2091

2192
@Override
@@ -25,26 +96,71 @@ public boolean hasNext() {
2596

2697
@Override
2798
public Result next() throws IOException {
28-
return null;
99+
EsqlQueryRequest request = new EsqlQueryRequest();
100+
request.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSortByTime());
101+
102+
EsqlQueryResponse response = execute(request);
103+
104+
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
105+
XContentBuilder jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
106+
107+
List<ColumnInfo> columns = response.columns();
108+
int valueCount = 0;
109+
for (Iterator<Iterator<Object>> itValues = response.values(); itValues.hasNext();) {
110+
jsonBuilder.startObject();
111+
int index = 0;
112+
for (Iterator<Object> itValue = itValues.next(); itValue.hasNext();) {
113+
Object value = itValue.next();
114+
if ("date".equals(columns.get(index).type())) {
115+
if (value instanceof String && Strings.isNullOrEmpty((String) value) == false) {
116+
value = DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis((String) value);
117+
}
118+
// TODO: something with arrays of dates? (e.g. kibana_sample_data_ecommerce -> products.created_on)
119+
}
120+
jsonBuilder.field(columns.get(index).name(), value);
121+
index++;
122+
}
123+
jsonBuilder.endObject();
124+
valueCount++;
125+
}
126+
jsonBuilder.close();
127+
128+
logger.info(
129+
"query interval: {} - {}, valueCount: {}",
130+
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(interval.startMs()),
131+
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(interval.endMs()),
132+
valueCount
133+
);
134+
135+
return new Result(interval, Optional.of(outputStream.bytes().streamInput()));
136+
}
29137
}
30138

31139
@Override
32140
public boolean isCancelled() {
33-
return false;
141+
return isCancelled;
34142
}
35143

36144
@Override
37145
public void cancel() {
38-
146+
logger.trace("Data extractor received cancel request");
147+
isCancelled = true;
39148
}
40149

41150
@Override
42-
public void destroy() {
43-
44-
}
151+
public void destroy() {}
45152

46153
@Override
47154
public long getEndTime() {
48-
return 0;
155+
return interval.endMs();
156+
}
157+
158+
private EsqlQueryResponse execute(EsqlQueryRequest request) {
159+
return ClientHelper.executeWithHeaders(
160+
datafeed.getHeaders(),
161+
ClientHelper.ML_ORIGIN,
162+
client,
163+
() -> client.execute(EsqlQueryAction.INSTANCE, request).actionGet()
164+
);
49165
}
50166
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/esql/EsqlDataExtractorFactory.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,37 @@
88
package org.elasticsearch.xpack.ml.datafeed.extractor.esql;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.client.internal.Client;
1112
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1213
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
1314
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
14-
//
15-
//import java.time.ZoneOffset;
16-
//import java.util.List;
17-
//import java.util.Locale;
15+
16+
import java.util.Objects;
1817

1918
public class EsqlDataExtractorFactory implements DataExtractorFactory {
2019

21-
public static void create(DatafeedConfig datafeed, ActionListener<DataExtractorFactory> factoryHandler) {
20+
private final Client client;
21+
private final DatafeedConfig datafeed;
22+
private final String timeField;
23+
24+
private EsqlDataExtractorFactory(Client client, DatafeedConfig datafeed, String timeField) {
25+
this.client = Objects.requireNonNull(client);
26+
this.datafeed = Objects.requireNonNull(datafeed);
27+
this.timeField = timeField;
28+
}
29+
30+
public static void create(
31+
Client client,
32+
DatafeedConfig datafeed,
33+
String timeField,
34+
ActionListener<DataExtractorFactory> factoryHandler
35+
) {
36+
DataExtractorFactory factory = new EsqlDataExtractorFactory(client, datafeed, timeField);
37+
factoryHandler.onResponse(factory);
2238
}
2339

2440
@Override
2541
public DataExtractor newExtractor(long start, long end) {
26-
return null;
42+
return new EsqlDataExtractor(client, datafeed, timeField, start, end);
2743
}
2844
}

0 commit comments

Comments
 (0)