Skip to content

Commit c1a3065

Browse files
authored
Select time-series indices with TS command (#126436)
With this change, the TS command will target only time_series indexes.
1 parent 94c385d commit c1a3065

File tree

4 files changed

+81
-23
lines changed

4 files changed

+81
-23
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ public static Iterable<Object[]> parameters() {
148148
) },
149149
new Object[] {
150150
new Test(
151-
"TS idx | LIMIT 10",
151+
"TS time_series_idx | LIMIT 10",
152152
Build.current().isSnapshot() ? Map.ofEntries(Map.entry("TS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(),
153153
Map.ofEntries(),
154154
Build.current().isSnapshot()
155155
) },
156156
new Object[] {
157157
new Test(
158-
"TS idx | STATS max(id) BY host | LIMIT 10",
158+
"TS time_series_idx | STATS max(id) BY host | LIMIT 10",
159159
Build.current().isSnapshot()
160160
? Map.ofEntries(Map.entry("TS", 1), Map.entry("STATS", 1), Map.entry("LIMIT", 1))
161161
: Collections.emptyMap(),
@@ -314,6 +314,22 @@ private static void loadData(String nodeName) {
314314
)
315315
.setMapping("ip", "type=ip", "host", "type=keyword")
316316
);
317+
assertAcked(
318+
client().admin()
319+
.indices()
320+
.prepareCreate("time_series_idx")
321+
.setSettings(Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build())
322+
.setMapping(
323+
"@timestamp",
324+
"type=date",
325+
"id",
326+
"type=keyword",
327+
"host",
328+
"type=keyword,time_series_dimension=true",
329+
"cpu",
330+
"type=long,time_series_metric=gauge"
331+
)
332+
);
317333
}
318334

319335
private DiscoveryNode randomDataNode() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
2828
import static org.hamcrest.Matchers.closeTo;
29+
import static org.hamcrest.Matchers.containsString;
2930
import static org.hamcrest.Matchers.equalTo;
3031
import static org.hamcrest.Matchers.hasSize;
3132

@@ -763,45 +764,59 @@ record RateKey(String cluster, String host) {
763764
}
764765

765766
public void testIndexMode() {
766-
createIndex("events");
767+
createIndex("hosts-old");
767768
int numDocs = between(1, 10);
768769
for (int i = 0; i < numDocs; i++) {
769-
index("events", Integer.toString(i), Map.of("v", i));
770+
index("hosts-old", Integer.toString(i), Map.of("v", i));
770771
}
771-
refresh("events");
772+
refresh("hosts-old");
772773
List<ColumnInfoImpl> columns = List.of(
773774
new ColumnInfoImpl("_index", DataType.KEYWORD, null),
774775
new ColumnInfoImpl("_index_mode", DataType.KEYWORD, null)
775776
);
776-
try (EsqlQueryResponse resp = run("""
777-
FROM events,hosts METADATA _index_mode, _index
777+
for (String q : List.of("""
778+
FROM hosts* METADATA _index_mode, _index
778779
| WHERE _index_mode == "time_series"
779780
| STATS BY _index, _index_mode
780-
""")) {
781-
assertThat(resp.columns(), equalTo(columns));
782-
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
783-
assertThat(values, hasSize(1));
784-
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"))));
781+
""", "TS hosts* METADATA _index_mode, _index | STATS BY _index, _index_mode")) {
782+
try (EsqlQueryResponse resp = run(q)) {
783+
assertThat(resp.columns(), equalTo(columns));
784+
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
785+
assertThat(values, hasSize(1));
786+
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"))));
787+
}
785788
}
786789
try (EsqlQueryResponse resp = run("""
787-
FROM events,hosts METADATA _index_mode, _index
790+
FROM hosts* METADATA _index_mode, _index
788791
| WHERE _index_mode == "standard"
789792
| STATS BY _index, _index_mode
790793
""")) {
791794
assertThat(resp.columns(), equalTo(columns));
792795
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
793796
assertThat(values, hasSize(1));
794-
assertThat(values, equalTo(List.of(List.of("events", "standard"))));
797+
assertThat(values, equalTo(List.of(List.of("hosts-old", "standard"))));
795798
}
796799
try (EsqlQueryResponse resp = run("""
797-
FROM events,hosts METADATA _index_mode, _index
800+
FROM hosts* METADATA _index_mode, _index
798801
| STATS BY _index, _index_mode
799802
| SORT _index
800803
""")) {
801804
assertThat(resp.columns(), equalTo(columns));
802805
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
803806
assertThat(values, hasSize(2));
804-
assertThat(values, equalTo(List.of(List.of("events", "standard"), List.of("hosts", "time_series"))));
807+
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"), List.of("hosts-old", "standard"))));
805808
}
809+
810+
Exception failure = expectThrows(Exception.class, () -> {
811+
EsqlQueryRequest request = new EsqlQueryRequest();
812+
request.query("""
813+
TS hosts-old METADATA _index_mode, _index
814+
| STATS BY _index, _index_mode
815+
| SORT _index
816+
""");
817+
request.allowPartialResults(false);
818+
run(request).close();
819+
});
820+
assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]"));
806821
}
807822
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.analysis;
99

1010
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.core.util.Holder;
1112
import org.elasticsearch.xpack.esql.plan.IndexPattern;
1213
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1314
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -27,19 +28,22 @@
2728
public class PreAnalyzer {
2829

2930
public static class PreAnalysis {
30-
public static final PreAnalysis EMPTY = new PreAnalysis(emptyList(), emptyList(), emptyList(), emptyList());
31+
public static final PreAnalysis EMPTY = new PreAnalysis(null, emptyList(), emptyList(), emptyList(), emptyList());
3132

33+
public final IndexMode indexMode;
3234
public final List<IndexPattern> indices;
3335
public final List<Enrich> enriches;
3436
public final List<InferencePlan> inferencePlans;
3537
public final List<IndexPattern> lookupIndices;
3638

3739
public PreAnalysis(
40+
IndexMode indexMode,
3841
List<IndexPattern> indices,
3942
List<Enrich> enriches,
4043
List<InferencePlan> inferencePlans,
4144
List<IndexPattern> lookupIndices
4245
) {
46+
this.indexMode = indexMode;
4347
this.indices = indices;
4448
this.enriches = enriches;
4549
this.inferencePlans = inferencePlans;
@@ -61,14 +65,24 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
6165
List<Enrich> unresolvedEnriches = new ArrayList<>();
6266
List<IndexPattern> lookupIndices = new ArrayList<>();
6367
List<InferencePlan> unresolvedInferencePlans = new ArrayList<>();
68+
Holder<IndexMode> indexMode = new Holder<>();
69+
plan.forEachUp(UnresolvedRelation.class, p -> {
70+
if (p.indexMode() == IndexMode.LOOKUP) {
71+
lookupIndices.add(p.indexPattern());
72+
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
73+
indexMode.set(p.indexMode());
74+
indices.add(p.indexPattern());
75+
} else {
76+
throw new IllegalStateException("index mode is already set");
77+
}
78+
});
6479

65-
plan.forEachUp(UnresolvedRelation.class, p -> (p.indexMode() == IndexMode.LOOKUP ? lookupIndices : indices).add(p.indexPattern()));
6680
plan.forEachUp(Enrich.class, unresolvedEnriches::add);
6781
plan.forEachUp(InferencePlan.class, unresolvedInferencePlans::add);
6882

6983
// mark plan as preAnalyzed (if it were marked, there would be no analysis)
7084
plan.forEachUp(LogicalPlan::setPreAnalyzed);
7185

72-
return new PreAnalysis(indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices);
86+
return new PreAnalysis(indexMode.get(), indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices);
7387
}
7488
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import org.elasticsearch.core.Releasables;
2222
import org.elasticsearch.core.TimeValue;
2323
import org.elasticsearch.index.IndexMode;
24+
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
25+
import org.elasticsearch.index.query.BoolQueryBuilder;
2426
import org.elasticsearch.index.query.QueryBuilder;
27+
import org.elasticsearch.index.query.TermQueryBuilder;
2528
import org.elasticsearch.indices.IndicesExpressionGrouper;
2629
import org.elasticsearch.logging.LogManager;
2730
import org.elasticsearch.logging.Logger;
@@ -381,7 +384,7 @@ public void analyzedPlan(
381384
}
382385
listener.<PreAnalysisResult>andThen((l, result) -> {
383386
// resolve the main indices
384-
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l);
387+
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
385388
}).<PreAnalysisResult>andThen((l, result) -> {
386389
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
387390
// invalid index resolution to updateExecutionInfo
@@ -404,7 +407,7 @@ public void analyzedPlan(
404407
}
405408

406409
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
407-
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l);
410+
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
408411
}).<LogicalPlan>andThen((l, result) -> {
409412
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
410413
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
@@ -432,14 +435,15 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result,
432435
// TODO: Verify that the resolved index actually has indexMode: "lookup"
433436
}
434437

435-
private void preAnalyzeIndices(
436-
List<IndexPattern> indices,
438+
private void preAnalyzeMainIndices(
439+
PreAnalyzer.PreAnalysis preAnalysis,
437440
EsqlExecutionInfo executionInfo,
438441
PreAnalysisResult result,
439442
QueryBuilder requestFilter,
440443
ActionListener<PreAnalysisResult> listener
441444
) {
442445
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
446+
List<IndexPattern> indices = preAnalysis.indices;
443447
if (indices.size() > 1) {
444448
// Note: JOINs are not supported but we detect them when
445449
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
@@ -486,6 +490,15 @@ private void preAnalyzeIndices(
486490
);
487491
} else {
488492
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
493+
if (preAnalysis.indexMode == IndexMode.TIME_SERIES) {
494+
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
495+
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
496+
if (requestFilter != null) {
497+
requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
498+
} else {
499+
requestFilter = indexModeFilter;
500+
}
501+
}
489502
indexResolver.resolveAsMergedMapping(
490503
indexExpressionToResolve,
491504
result.fieldNames,

0 commit comments

Comments
 (0)