Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ public static Iterable<Object[]> parameters() {
) },
new Object[] {
new Test(
"TS idx | LIMIT 10",
"TS time_series_idx | LIMIT 10",
Build.current().isSnapshot() ? Map.ofEntries(Map.entry("TS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(),
Map.ofEntries(),
Build.current().isSnapshot()
) },
new Object[] {
new Test(
"TS idx | STATS max(id) BY host | LIMIT 10",
"TS time_series_idx | STATS max(id) BY host | LIMIT 10",
Build.current().isSnapshot()
? Map.ofEntries(Map.entry("TS", 1), Map.entry("STATS", 1), Map.entry("LIMIT", 1))
: Collections.emptyMap(),
Expand Down Expand Up @@ -314,6 +314,22 @@ private static void loadData(String nodeName) {
)
.setMapping("ip", "type=ip", "host", "type=keyword")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("time_series_idx")
.setSettings(Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build())
.setMapping(
"@timestamp",
"type=date",
"id",
"type=keyword",
"host",
"type=keyword,time_series_dimension=true",
"cpu",
"type=long,time_series_metric=gauge"
)
);
}

private DiscoveryNode randomDataNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -763,45 +764,59 @@ record RateKey(String cluster, String host) {
}

public void testIndexMode() {
createIndex("events");
createIndex("hosts-old");
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
index("events", Integer.toString(i), Map.of("v", i));
index("hosts-old", Integer.toString(i), Map.of("v", i));
}
refresh("events");
refresh("hosts-old");
List<ColumnInfoImpl> columns = List.of(
new ColumnInfoImpl("_index", DataType.KEYWORD, null),
new ColumnInfoImpl("_index_mode", DataType.KEYWORD, null)
);
try (EsqlQueryResponse resp = run("""
FROM events,hosts METADATA _index_mode, _index
for (String q : List.of("""
FROM hosts* METADATA _index_mode, _index
| WHERE _index_mode == "time_series"
| STATS BY _index, _index_mode
""")) {
assertThat(resp.columns(), equalTo(columns));
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(1));
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"))));
""", "TS hosts* METADATA _index_mode, _index | STATS BY _index, _index_mode")) {
try (EsqlQueryResponse resp = run(q)) {
assertThat(resp.columns(), equalTo(columns));
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(1));
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"))));
}
}
try (EsqlQueryResponse resp = run("""
FROM events,hosts METADATA _index_mode, _index
FROM hosts* METADATA _index_mode, _index
| WHERE _index_mode == "standard"
| STATS BY _index, _index_mode
""")) {
assertThat(resp.columns(), equalTo(columns));
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(1));
assertThat(values, equalTo(List.of(List.of("events", "standard"))));
assertThat(values, equalTo(List.of(List.of("hosts-old", "standard"))));
}
try (EsqlQueryResponse resp = run("""
FROM events,hosts METADATA _index_mode, _index
FROM hosts* METADATA _index_mode, _index
| STATS BY _index, _index_mode
| SORT _index
""")) {
assertThat(resp.columns(), equalTo(columns));
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(2));
assertThat(values, equalTo(List.of(List.of("events", "standard"), List.of("hosts", "time_series"))));
assertThat(values, equalTo(List.of(List.of("hosts", "time_series"), List.of("hosts-old", "standard"))));
}

Exception failure = expectThrows(Exception.class, () -> {
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("""
TS hosts-old METADATA _index_mode, _index
| STATS BY _index, _index_mode
| SORT _index
""");
request.allowPartialResults(false);
run(request).close();
});
assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.analysis;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand All @@ -27,19 +28,22 @@
public class PreAnalyzer {

public static class PreAnalysis {
public static final PreAnalysis EMPTY = new PreAnalysis(emptyList(), emptyList(), emptyList(), emptyList());
public static final PreAnalysis EMPTY = new PreAnalysis(null, emptyList(), emptyList(), emptyList(), emptyList());

public final IndexMode indexMode;
public final List<IndexPattern> indices;
public final List<Enrich> enriches;
public final List<InferencePlan> inferencePlans;
public final List<IndexPattern> lookupIndices;

public PreAnalysis(
IndexMode indexMode,
List<IndexPattern> indices,
List<Enrich> enriches,
List<InferencePlan> inferencePlans,
List<IndexPattern> lookupIndices
) {
this.indexMode = indexMode;
this.indices = indices;
this.enriches = enriches;
this.inferencePlans = inferencePlans;
Expand All @@ -61,14 +65,24 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
List<Enrich> unresolvedEnriches = new ArrayList<>();
List<IndexPattern> lookupIndices = new ArrayList<>();
List<InferencePlan> unresolvedInferencePlans = new ArrayList<>();
Holder<IndexMode> indexMode = new Holder<>();
plan.forEachUp(UnresolvedRelation.class, p -> {
if (p.indexMode() == IndexMode.LOOKUP) {
lookupIndices.add(p.indexPattern());
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
indexMode.set(p.indexMode());
indices.add(p.indexPattern());
} else {
throw new IllegalStateException("index mode is already set");
}
});

plan.forEachUp(UnresolvedRelation.class, p -> (p.indexMode() == IndexMode.LOOKUP ? lookupIndices : indices).add(p.indexPattern()));
plan.forEachUp(Enrich.class, unresolvedEnriches::add);
plan.forEachUp(InferencePlan.class, unresolvedInferencePlans::add);

// mark plan as preAnalyzed (if it were marked, there would be no analysis)
plan.forEachUp(LogicalPlan::setPreAnalyzed);

return new PreAnalysis(indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices);
return new PreAnalysis(indexMode.get(), indices.stream().toList(), unresolvedEnriches, unresolvedInferencePlans, lookupIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -381,7 +384,7 @@ public void analyzedPlan(
}
listener.<PreAnalysisResult>andThen((l, result) -> {
// resolve the main indices
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l);
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
}).<PreAnalysisResult>andThen((l, result) -> {
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
// invalid index resolution to updateExecutionInfo
Expand All @@ -404,7 +407,7 @@ public void analyzedPlan(
}

// here the requestFilter is set to null, performing the pre-analysis after the first step failed
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l);
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
}).<LogicalPlan>andThen((l, result) -> {
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
Expand Down Expand Up @@ -432,14 +435,15 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result,
// TODO: Verify that the resolved index actually has indexMode: "lookup"
}

private void preAnalyzeIndices(
List<IndexPattern> indices,
private void preAnalyzeMainIndices(
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
List<IndexPattern> indices = preAnalysis.indices;
if (indices.size() > 1) {
// Note: JOINs are not supported but we detect them when
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
Expand Down Expand Up @@ -486,6 +490,15 @@ private void preAnalyzeIndices(
);
} else {
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
if (preAnalysis.indexMode == IndexMode.TIME_SERIES) {
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
if (requestFilter != null) {
requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How fast is this? Do we need to worry about performance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an overhead with field-caps when using indexFilter, as we have to dispatch sub-requests for every copy instead of one copy per index without the filter. However, this overhead should be small, and we plan to work on minimizing the overhead for index filters that can be rewritten on the coordinators.

} else {
requestFilter = indexModeFilter;
}
}
indexResolver.resolveAsMergedMapping(
indexExpressionToResolve,
result.fieldNames,
Expand Down