diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java index 1a4721a07adab..a79bf06faed2b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java @@ -148,14 +148,14 @@ public static Iterable parameters() { ) }, new Object[] { new Test( - "TS idx | LIMIT 10", + "TS time_series_idx | LIMIT 10", Build.current().isSnapshot() ? Map.ofEntries(Map.entry("TS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(), Map.ofEntries(), Build.current().isSnapshot() ) }, new Object[] { new Test( - "TS idx | STATS max(id) BY host | LIMIT 10", + "TS time_series_idx | STATS max(id) BY host | LIMIT 10", Build.current().isSnapshot() ? Map.ofEntries(Map.entry("TS", 1), Map.entry("STATS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(), @@ -314,6 +314,22 @@ private static void loadData(String nodeName) { ) .setMapping("ip", "type=ip", "host", "type=keyword") ); + assertAcked( + client().admin() + .indices() + .prepareCreate("time_series_idx") + .setSettings(Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build()) + .setMapping( + "@timestamp", + "type=date", + "id", + "type=keyword", + "host", + "type=keyword,time_series_dimension=true", + "cpu", + "type=long,time_series_metric=gauge" + ) + ); } private DiscoveryNode randomDataNode() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index 7ae986df28782..7ebfeb1634fb3 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -26,6 +26,7 @@ import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -763,45 +764,59 @@ record RateKey(String cluster, String host) { } public void testIndexMode() { - createIndex("events"); + createIndex("hosts-old"); int numDocs = between(1, 10); for (int i = 0; i < numDocs; i++) { - index("events", Integer.toString(i), Map.of("v", i)); + index("hosts-old", Integer.toString(i), Map.of("v", i)); } - refresh("events"); + refresh("hosts-old"); List columns = List.of( new ColumnInfoImpl("_index", DataType.KEYWORD, null), new ColumnInfoImpl("_index_mode", DataType.KEYWORD, null) ); - try (EsqlQueryResponse resp = run(""" - FROM events,hosts METADATA _index_mode, _index + for (String q : List.of(""" + FROM hosts* METADATA _index_mode, _index | WHERE _index_mode == "time_series" | STATS BY _index, _index_mode - """)) { - assertThat(resp.columns(), equalTo(columns)); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(1)); - assertThat(values, equalTo(List.of(List.of("hosts", "time_series")))); + """, "TS hosts* METADATA _index_mode, _index | STATS BY _index, _index_mode")) { + try (EsqlQueryResponse resp = run(q)) { + assertThat(resp.columns(), equalTo(columns)); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values, equalTo(List.of(List.of("hosts", "time_series")))); + } } try (EsqlQueryResponse resp = run(""" - FROM events,hosts METADATA _index_mode, _index + FROM hosts* METADATA _index_mode, _index | WHERE _index_mode == "standard" | STATS BY _index, _index_mode """)) { assertThat(resp.columns(), equalTo(columns)); List> values = EsqlTestUtils.getValuesList(resp); assertThat(values, hasSize(1)); - assertThat(values, equalTo(List.of(List.of("events", "standard")))); + assertThat(values, equalTo(List.of(List.of("hosts-old", "standard")))); } try (EsqlQueryResponse resp = run(""" - FROM events,hosts METADATA _index_mode, _index + FROM hosts* METADATA _index_mode, _index | STATS BY _index, _index_mode | SORT _index """)) { assertThat(resp.columns(), equalTo(columns)); List> values = EsqlTestUtils.getValuesList(resp); assertThat(values, hasSize(2)); - assertThat(values, equalTo(List.of(List.of("events", "standard"), List.of("hosts", "time_series")))); + assertThat(values, equalTo(List.of(List.of("hosts", "time_series"), List.of("hosts-old", "standard")))); } + + Exception failure = expectThrows(Exception.class, () -> { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query(""" + TS hosts-old METADATA _index_mode, _index + | STATS BY _index, _index_mode + | SORT _index + """); + request.allowPartialResults(false); + run(request).close(); + }); + assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java index 92c0e548e579e..944230f8034f1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.analysis; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -27,19 +28,22 @@ public class PreAnalyzer { public static class PreAnalysis { - public static final PreAnalysis EMPTY = new PreAnalysis(emptyList(), emptyList(), emptyList(), emptyList()); + public static final PreAnalysis EMPTY = new PreAnalysis(null, emptyList(), emptyList(), emptyList(), emptyList()); + public final IndexMode indexMode; public final List indices; public final List enriches; public final List inferencePlans; public final List lookupIndices; public PreAnalysis( + IndexMode indexMode, List indices, List enriches, List inferencePlans, List lookupIndices ) { + this.indexMode = indexMode; this.indices = indices; this.enriches = enriches; this.inferencePlans = inferencePlans; @@ -61,14 +65,24 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) { List unresolvedEnriches = new ArrayList<>(); List lookupIndices = new ArrayList<>(); List unresolvedInferencePlans = new ArrayList<>(); + Holder indexMode = new Holder<>(); + plan.forEachUp(UnresolvedRelation.class, p -> { + if (p.indexMode() == IndexMode.LOOKUP) { + lookupIndices.add(p.indexPattern()); + } else if (indexMode.get() == null || indexMode.get() == p.indexMode()) { + indexMode.set(p.indexMode()); + indices.add(p.indexPattern()); + } else { + throw new IllegalStateException("index mode is already set"); + } + }); - plan.forEachUp(UnresolvedRelation.class, p -> (p.indexMode() == IndexMode.LOOKUP ? lookupIndices : indices).add(p.indexPattern())); plan.forEachUp(Enrich.class, unresolvedEnriches::add); plan.forEachUp(InferencePlan.class, unresolvedInferencePlans::add); // mark plan as preAnalyzed (if it were marked, there would be no analysis) plan.forEachUp(LogicalPlan::setPreAnalyzed); - return new PreAnalysis(indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices); + return new PreAnalysis(indexMode.get(), indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b218435b03c36..6d2f10d390c57 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -21,7 +21,10 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.mapper.IndexModeFieldMapper; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -381,7 +384,7 @@ public void analyzedPlan( } listener.andThen((l, result) -> { // resolve the main indices - preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l); + preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l); }).andThen((l, result) -> { // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for // invalid index resolution to updateExecutionInfo @@ -404,7 +407,7 @@ public void analyzedPlan( } // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l); + preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l); }).andThen((l, result) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; LOGGER.debug("Analyzing the plan (second attempt, without filter)"); @@ -432,14 +435,15 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, // TODO: Verify that the resolved index actually has indexMode: "lookup" } - private void preAnalyzeIndices( - List indices, + private void preAnalyzeMainIndices( + PreAnalyzer.PreAnalysis preAnalysis, EsqlExecutionInfo executionInfo, PreAnalysisResult result, QueryBuilder requestFilter, ActionListener listener ) { // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one + List indices = preAnalysis.indices; if (indices.size() > 1) { // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); @@ -486,6 +490,15 @@ private void preAnalyzeIndices( ); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + if (preAnalysis.indexMode == IndexMode.TIME_SERIES) { + // TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message. + var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); + if (requestFilter != null) { + requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter); + } else { + requestFilter = indexModeFilter; + } + } indexResolver.resolveAsMergedMapping( indexExpressionToResolve, result.fieldNames,