Skip to content

Commit d35aa81

Browse files
authored
Fix now in millis for ESQL search contexts (#103474) (#103480) (#103766)
Currently, ESQL search contexts do not have the correct actual 'now' in milliseconds. Ideally, we should use a consistent 'now' in millis for all nodes participating in the execution of a search request. I will follow up with a PR to deserialize that value between nodes for version 8.13 and later. Closes #103455
1 parent 2113ff3 commit d35aa81

File tree

4 files changed

+81
-5
lines changed

4 files changed

+81
-5
lines changed

docs/changelog/103474.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 103474
2+
summary: Fix now in millis for ESQL search contexts
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 103455
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.core.TimeValue;
14+
import org.elasticsearch.index.query.RangeQueryBuilder;
15+
16+
import java.util.List;
17+
18+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
19+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
20+
import static org.hamcrest.Matchers.hasSize;
21+
22+
public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase {
23+
24+
public void testFilter() {
25+
long epoch = System.currentTimeMillis();
26+
assertAcked(client().admin().indices().prepareCreate("test").setMapping("@timestamp", "type=date", "value", "type=long"));
27+
BulkRequestBuilder bulk = client().prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
28+
int oldDocs = between(10, 100);
29+
for (int i = 0; i < oldDocs; i++) {
30+
long timestamp = epoch - TimeValue.timeValueHours(between(1, 2)).millis();
31+
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", -i));
32+
}
33+
int newDocs = between(10, 100);
34+
for (int i = 0; i < newDocs; i++) {
35+
long timestamp = epoch + TimeValue.timeValueHours(between(1, 2)).millis();
36+
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", i));
37+
}
38+
bulk.get();
39+
{
40+
EsqlQueryRequest request = new EsqlQueryRequest();
41+
request.query("FROM test | limit 1000");
42+
request.filter(new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now"));
43+
try (var resp = run(request)) {
44+
List<List<Object>> values = getValuesList(resp);
45+
assertThat(values, hasSize(oldDocs));
46+
}
47+
}
48+
{
49+
EsqlQueryRequest request = new EsqlQueryRequest();
50+
request.query("FROM test | limit 1000");
51+
request.filter(new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis()));
52+
try (var resp = run(request)) {
53+
List<List<Object>> values = getValuesList(resp);
54+
assertThat(values, hasSize(newDocs));
55+
}
56+
}
57+
}
58+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
299299

300300
private void acquireSearchContexts(
301301
List<ShardId> shardIds,
302+
EsqlConfiguration configuration,
302303
Map<Index, AliasFilter> aliasFilters,
303304
ActionListener<List<SearchContext>> listener
304305
) {
@@ -322,11 +323,12 @@ private void acquireSearchContexts(
322323
try {
323324
for (IndexShard shard : targetShards) {
324325
var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
325-
ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest(shard.shardId(), 0, aliasFilter);
326-
SearchContext context = searchService.createSearchContext(
327-
shardSearchLocalRequest,
328-
SearchService.NO_TIMEOUT
326+
var shardRequest = new ShardSearchRequest(
327+
shard.shardId(),
328+
configuration.absoluteStartedTimeInMillis(),
329+
aliasFilter
329330
);
331+
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
330332
searchContexts.add(context);
331333
}
332334
for (SearchContext searchContext : searchContexts) {
@@ -453,7 +455,8 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
453455
final var exchangeSink = exchangeService.getSinkHandler(sessionId);
454456
parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
455457
final ActionListener<Void> listener = new ChannelActionListener<>(channel).map(nullValue -> new DataNodeResponse());
456-
acquireSearchContexts(request.shardIds(), request.aliasFilters(), ActionListener.wrap(searchContexts -> {
458+
final EsqlConfiguration configuration = request.configuration();
459+
acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
457460
var computeContext = new ComputeContext(sessionId, searchContexts, request.configuration(), null, exchangeSink);
458461
runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(unused -> {
459462
// don't return until all pages are fetched

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@ public Locale locale() {
9595
return locale;
9696
}
9797

98+
/**
99+
* Returns the current time in milliseconds from the time epoch for the execution of this request.
100+
* It ensures consistency by using the same value on all nodes involved in the search request.
101+
* Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes.
102+
*/
103+
public long absoluteStartedTimeInMillis() {
104+
return System.currentTimeMillis();
105+
}
106+
98107
public String query() {
99108
return query;
100109
}

0 commit comments

Comments
 (0)