Skip to content

Commit 0a94dc0

Browse files
AlexGPlayelasticsearchmachine
andauthored
[ESQL] Add informative timestamps to ESQL async (#137957)
* Add informative timestamps to ESQL async * [CI] Auto commit changes from spotless * Add TransportVersion * [CI] Auto commit changes from spotless * Add missing fields to MapMatcher * Fix failing tests * Update more tests --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 5a97891 commit 0a94dc0

File tree

13 files changed

+164
-32
lines changed

13 files changed

+164
-32
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9228000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
aggregate_metric_double_typed_block,9227000
1+
esql_timestamps_info,9228000

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ public void testSortByManyLongsGiantTopN() throws IOException {
169169
.entry("values", List.of(List.of(9)))
170170
.entry("documents_found", greaterThan(0))
171171
.entry("values_loaded", greaterThan(0))
172+
.entry("completion_time_in_millis", greaterThan(0L))
173+
.entry("expiration_time_in_millis", greaterThan(0L))
174+
.entry("start_time_in_millis", greaterThan(0L))
172175
);
173176
}
174177

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2767,13 +2767,20 @@ protected static MapMatcher getProfileMatcher() {
27672767
.entry("plans", instanceOf(List.class));
27682768
}
27692769

2770-
protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound) {
2770+
protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound, boolean includeTimestamps) {
27712771
MapMatcher mapMatcher = matchesMap();
27722772
if (includeDocumentsFound) {
27732773
// Older versions may not return documents_found and values_loaded.
27742774
mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0));
27752775
mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0));
27762776
}
2777+
if (includeTimestamps) {
2778+
// Older versions may not return start_time_in_millis, completion_time_in_millis and expiration_time_in_millis
2779+
mapMatcher = mapMatcher.entry("start_time_in_millis", greaterThanOrEqualTo(0L));
2780+
mapMatcher = mapMatcher.entry("completion_time_in_millis", greaterThanOrEqualTo(0L));
2781+
mapMatcher = mapMatcher.entry("expiration_time_in_millis", greaterThanOrEqualTo(0L));
2782+
}
2783+
27772784
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
27782785
// Older version may not have is_partial
27792786
if (includePartial) {
@@ -2786,7 +2793,11 @@ protected static MapMatcher getResultMatcher(boolean includePartial, boolean inc
27862793
* Create empty result matcher from result, taking into account all metadata items.
27872794
*/
27882795
protected static MapMatcher getResultMatcher(Map<String, Object> result) {
2789-
return getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found"));
2796+
return getResultMatcher(
2797+
result.containsKey("is_partial"),
2798+
result.containsKey("documents_found"),
2799+
result.containsKey("start_time_in_millis")
2800+
);
27902801
}
27912802

27922803
/**

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,11 @@ private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities
311311
}
312312

313313
private void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, Map<String, Object> expectedResult) {
314-
MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk();
314+
MapMatcher mapMatcher = getResultMatcher(
315+
result.containsKey("is_partial"),
316+
result.containsKey("documents_found"),
317+
result.containsKey("start_time_in_millis")
318+
).extraOk();
315319
if (includeCCSMetadata) {
316320
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
317321
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,11 @@ private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities
262262
}
263263

264264
private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
265-
MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk();
265+
MapMatcher mapMatcher = getResultMatcher(
266+
result.containsKey("is_partial"),
267+
result.containsKey("documents_found"),
268+
result.containsKey("start_time_in_millis")
269+
).extraOk();
266270
if (includeCCSMetadata) {
267271
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
268272
}
@@ -523,7 +527,11 @@ public void testLookupJoinAliasesSkipOld() throws IOException {
523527
var columns = List.of(Map.of("name", "c", "type", "long"));
524528
var values = List.of(List.of(localDocs.size()));
525529

526-
MapMatcher mapMatcher = getResultMatcher(false, result.containsKey("documents_found")).extraOk();
530+
MapMatcher mapMatcher = getResultMatcher(
531+
false,
532+
result.containsKey("documents_found"),
533+
result.containsKey("start_time_in_millis")
534+
).extraOk();
527535
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
528536
mapMatcher = mapMatcher.entry("is_partial", true);
529537
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.hamcrest.Matchers.either;
7575
import static org.hamcrest.Matchers.emptyOrNullString;
7676
import static org.hamcrest.Matchers.equalTo;
77+
import static org.hamcrest.Matchers.greaterThan;
7778
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
7879
import static org.hamcrest.Matchers.hasSize;
7980
import static org.hamcrest.Matchers.is;
@@ -288,7 +289,7 @@ public static RequestObjectBuilder jsonBuilder() throws IOException {
288289

289290
public void testGetAnswer() throws IOException {
290291
Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
291-
assertEquals(6, answer.size());
292+
assertEquals(9, answer.size());
292293
assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
293294
Map<String, String> colA = Map.of("name", "a", "type", "integer");
294295
Map<String, String> colB = Map.of("name", "b", "type", "integer");
@@ -300,6 +301,9 @@ public void testGetAnswer() throws IOException {
300301
.entry("values_loaded", 0)
301302
.entry("columns", List.of(colA, colB))
302303
.entry("values", List.of(List.of(1, 2)))
304+
.entry("completion_time_in_millis", greaterThan(0L))
305+
.entry("expiration_time_in_millis", greaterThan(0L))
306+
.entry("start_time_in_millis", greaterThan(0L))
303307
);
304308
}
305309

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
4848
"esql_documents_found_and_values_loaded"
4949
);
5050
private static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = TransportVersion.fromName("esql_profile_include_plan");
51+
private static final TransportVersion ESQL_TIMESTAMPS_INFO = TransportVersion.fromName("esql_timestamps_info");
5152

5253
public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns";
5354

@@ -63,6 +64,9 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
6364
private final boolean isAsync;
6465
private final EsqlExecutionInfo executionInfo;
6566

67+
private final long startTimeMillis;
68+
private final long expirationTimeMillis;
69+
6670
public EsqlQueryResponse(
6771
List<ColumnInfoImpl> columns,
6872
List<Page> pages,
@@ -73,6 +77,8 @@ public EsqlQueryResponse(
7377
@Nullable String asyncExecutionId,
7478
boolean isRunning,
7579
boolean isAsync,
80+
long startTimeMillis,
81+
long expirationTimeMillis,
7682
EsqlExecutionInfo executionInfo
7783
) {
7884
this.columns = columns;
@@ -84,6 +90,8 @@ public EsqlQueryResponse(
8490
this.asyncExecutionId = asyncExecutionId;
8591
this.isRunning = isRunning;
8692
this.isAsync = isAsync;
93+
this.startTimeMillis = startTimeMillis;
94+
this.expirationTimeMillis = expirationTimeMillis;
8795
this.executionInfo = executionInfo;
8896
}
8997

@@ -95,9 +103,24 @@ public EsqlQueryResponse(
95103
@Nullable Profile profile,
96104
boolean columnar,
97105
boolean isAsync,
106+
long startTimeMillis,
107+
long expirationTimeMillis,
98108
EsqlExecutionInfo executionInfo
99109
) {
100-
this(columns, pages, documentsFound, valuesLoaded, profile, columnar, null, false, isAsync, executionInfo);
110+
this(
111+
columns,
112+
pages,
113+
documentsFound,
114+
valuesLoaded,
115+
profile,
116+
columnar,
117+
null,
118+
false,
119+
isAsync,
120+
startTimeMillis,
121+
expirationTimeMillis,
122+
executionInfo
123+
);
101124
}
102125

103126
/**
@@ -121,6 +144,14 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
121144
long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
122145
Profile profile = in.readOptionalWriteable(Profile::readFrom);
123146
boolean columnar = in.readBoolean();
147+
148+
long startTimeMillis = 0L;
149+
long expirationTimeMillis = 0L;
150+
if (in.getTransportVersion().supports(ESQL_TIMESTAMPS_INFO)) {
151+
startTimeMillis = in.readLong();
152+
expirationTimeMillis = in.readLong();
153+
}
154+
124155
EsqlExecutionInfo executionInfo = in.readOptionalWriteable(EsqlExecutionInfo::new);
125156
return new EsqlQueryResponse(
126157
columns,
@@ -132,6 +163,8 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
132163
asyncExecutionId,
133164
isRunning,
134165
isAsync,
166+
startTimeMillis,
167+
expirationTimeMillis,
135168
executionInfo
136169
);
137170
}
@@ -149,6 +182,12 @@ public void writeTo(StreamOutput out) throws IOException {
149182
}
150183
out.writeOptionalWriteable(profile);
151184
out.writeBoolean(columnar);
185+
186+
if (out.getTransportVersion().supports(ESQL_TIMESTAMPS_INFO)) {
187+
out.writeLong(startTimeMillis);
188+
out.writeLong(expirationTimeMillis);
189+
}
190+
152191
out.writeOptionalWriteable(executionInfo);
153192
}
154193

@@ -237,21 +276,36 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
237276
}));
238277
}
239278
if (executionInfo != null && executionInfo.overallTook() != null) {
240-
content.add(
241-
ChunkedToXContentHelper.chunk(
242-
(builder, p) -> builder //
243-
.field("took", executionInfo.overallTook().millis())
244-
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial())
245-
)
246-
);
279+
content.add(ChunkedToXContentHelper.chunk((builder, p) -> {
280+
builder //
281+
.field("took", executionInfo.overallTook().millis())
282+
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial());
283+
284+
if (startTimeMillis != 0L) {
285+
builder.timestampFieldsFromUnixEpochMillis(
286+
"completion_time_in_millis",
287+
"completion_time",
288+
startTimeMillis + executionInfo.overallTook().millis()
289+
);
290+
}
291+
292+
return builder;
293+
}));
247294
}
248-
content.add(
249-
ChunkedToXContentHelper.chunk(
250-
(builder, p) -> builder //
251-
.field("documents_found", documentsFound)
252-
.field("values_loaded", valuesLoaded)
253-
)
254-
);
295+
content.add(ChunkedToXContentHelper.chunk((builder, p) -> {
296+
builder //
297+
.field("documents_found", documentsFound)
298+
.field("values_loaded", valuesLoaded);
299+
300+
if (startTimeMillis != 0L) {
301+
builder.timestampFieldsFromUnixEpochMillis("start_time_in_millis", "start_time", startTimeMillis);
302+
}
303+
if (expirationTimeMillis != 0L) {
304+
builder.timestampFieldsFromUnixEpochMillis("expiration_time_in_millis", "expiration_time", expirationTimeMillis);
305+
}
306+
307+
return builder;
308+
}));
255309
if (dropNullColumns) {
256310
content.add(ResponseXContentUtils.allColumns(columns, "all_columns"));
257311
content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns"));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ public EsqlExecutionInfo executionInfo() {
4545
@Override
4646
public EsqlQueryResponse getCurrentResult() {
4747
// TODO it'd be nice to have the number of documents we've read from completed drivers here
48-
return new EsqlQueryResponse(List.of(), List.of(), 0, 0, null, false, getExecutionId().getEncoded(), true, true, executionInfo);
48+
return new EsqlQueryResponse(
49+
List.of(),
50+
List.of(),
51+
0,
52+
0,
53+
null,
54+
false,
55+
getExecutionId().getEncoded(),
56+
true,
57+
true,
58+
getStartTime(),
59+
getExpirationTimeMillis(),
60+
executionInfo
61+
);
4962
}
5063
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea
403403
asyncExecutionId,
404404
false,
405405
request.async(),
406+
task.getStartTime(),
407+
((EsqlQueryTask) task).getExpirationTimeMillis(),
406408
result.executionInfo()
407409
);
408410
}
@@ -414,6 +416,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea
414416
profile,
415417
request.columnar(),
416418
request.async(),
419+
task.getStartTime(),
420+
threadPool.absoluteTimeInMillis() + request.keepAlive().millis(),
417421
result.executionInfo()
418422
);
419423
}
@@ -479,6 +483,8 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
479483
asyncExecutionId,
480484
true, // is_running
481485
true, // isAsync
486+
task.getStartTime(),
487+
task.getExpirationTimeMillis(),
482488
task.executionInfo()
483489
);
484490
}

0 commit comments

Comments
 (0)