Skip to content

Commit 29c54d4

Browse files
committed
New ES|QL API
1 parent b245417 commit 29c54d4

File tree

1 file changed

+27
-33
lines changed

1 file changed

+27
-33
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/esql/EsqlDataExtractor.java

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,29 @@
1212
import org.elasticsearch.client.internal.Client;
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.io.stream.BytesStreamOutput;
15+
import org.elasticsearch.common.time.DateFormatter;
1516
import org.elasticsearch.xcontent.XContentBuilder;
1617
import org.elasticsearch.xcontent.json.JsonXContent;
1718
import org.elasticsearch.xpack.core.ClientHelper;
19+
import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
20+
import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequest;
21+
import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder;
22+
import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
1823
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1924
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
20-
import org.elasticsearch.xpack.esql.action.ColumnInfo;
21-
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
22-
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
23-
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
2425
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
25-
import org.elasticsearch.xpack.ql.util.DateUtils;
2626

2727
import java.io.IOException;
28+
import java.time.ZoneOffset;
2829
import java.util.Iterator;
2930
import java.util.List;
3031
import java.util.Objects;
3132
import java.util.Optional;
3233

3334
public class EsqlDataExtractor implements DataExtractor {
3435

36+
private static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strict_date_optional_time").withZone(ZoneOffset.UTC);
37+
3538
private static final Logger logger = LogManager.getLogger(EsqlDataExtractor.class);
3639

3740
private final Client client;
@@ -53,13 +56,9 @@ private String esqlTimeFilter() {
5356
return Strings.format(
5457
" | WHERE %s >= \"%s\" AND %s < \"%s\"",
5558
timeField,
56-
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(
57-
Math.min(interval.startMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999)
58-
),
59+
DATE_TIME_FORMATTER.formatMillis(Math.min(interval.startMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999)),
5960
timeField,
60-
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(
61-
Math.min(interval.endMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999)
62-
)
61+
DATE_TIME_FORMATTER.formatMillis(Math.min(interval.endMs(), org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999))
6362
);
6463
}
6564

@@ -73,17 +72,18 @@ private String esqlSummaryStats() {
7372

7473
@Override
7574
public DataSummary getSummary() {
76-
EsqlQueryRequest request = new EsqlQueryRequest();
77-
request.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSummaryStats());
75+
EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> request = EsqlQueryRequestBuilder
76+
.newRequestBuilder(client)
77+
.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSummaryStats());
7878

7979
try (EsqlQueryResponse response = execute(request)) {
80-
Iterator<Object> values = response.values().next();
80+
Iterator<Object> values = response.response().rows().iterator().next().iterator();
8181
String earliestTime = (String) values.next();
8282
String latestTime = (String) values.next();
8383
long totalHits = (long) values.next();
8484
return new DataSummary(
85-
earliestTime == null ? null : DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis(earliestTime),
86-
latestTime == null ? null : DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis(latestTime),
85+
earliestTime == null ? null : DATE_TIME_FORMATTER.parseMillis(earliestTime),
86+
latestTime == null ? null : DATE_TIME_FORMATTER.parseMillis(latestTime),
8787
totalHits
8888
);
8989
}
@@ -96,24 +96,23 @@ public boolean hasNext() {
9696

9797
@Override
9898
public Result next() throws IOException {
99-
EsqlQueryRequest request = new EsqlQueryRequest();
100-
request.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSortByTime());
101-
99+
EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> request = EsqlQueryRequestBuilder
100+
.newRequestBuilder(client)
101+
.query(datafeed.getEsqlQuery() + esqlTimeFilter() + esqlSortByTime());
102102
EsqlQueryResponse response = execute(request);
103103

104104
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
105105
XContentBuilder jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
106106

107-
List<ColumnInfo> columns = response.columns();
107+
List<? extends ColumnInfo> columns = response.response().columns();
108108
int valueCount = 0;
109-
for (Iterator<Iterator<Object>> itValues = response.values(); itValues.hasNext();) {
109+
for (Iterable<Object> row : response.response().rows()) {
110110
jsonBuilder.startObject();
111111
int index = 0;
112-
for (Iterator<Object> itValue = itValues.next(); itValue.hasNext();) {
113-
Object value = itValue.next();
112+
for (Object value : row) {
114113
if ("date".equals(columns.get(index).type())) {
115114
if (value instanceof String && Strings.isNullOrEmpty((String) value) == false) {
116-
value = DateUtils.UTC_DATE_TIME_FORMATTER.parseMillis((String) value);
115+
value = DATE_TIME_FORMATTER.parseMillis((String) value);
117116
}
118117
// TODO: something with arrays of dates? (e.g. kibana_sample_data_ecommerce -> products.created_on)
119118
}
@@ -127,8 +126,8 @@ public Result next() throws IOException {
127126

128127
logger.info(
129128
"query interval: {} - {}, valueCount: {}",
130-
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(interval.startMs()),
131-
DateUtils.UTC_DATE_TIME_FORMATTER.formatMillis(interval.endMs()),
129+
DATE_TIME_FORMATTER.formatMillis(interval.startMs()),
130+
DATE_TIME_FORMATTER.formatMillis(interval.endMs()),
132131
valueCount
133132
);
134133

@@ -155,12 +154,7 @@ public long getEndTime() {
155154
return interval.endMs();
156155
}
157156

158-
private EsqlQueryResponse execute(EsqlQueryRequest request) {
159-
return ClientHelper.executeWithHeaders(
160-
datafeed.getHeaders(),
161-
ClientHelper.ML_ORIGIN,
162-
client,
163-
() -> client.execute(EsqlQueryAction.INSTANCE, request).actionGet()
164-
);
157+
private EsqlQueryResponse execute(EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> request) {
158+
return ClientHelper.executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> request.execute().actionGet());
165159
}
166160
}

0 commit comments

Comments
 (0)