Skip to content

Commit 8fd9b86

Browse files
authored
Disable filter-by-filter for time-series indices (elastic#138852)
Time-series indices are sorted by _tsid, then timestamp. Replacing round_to causes fragmentation, leading to reading many small chunks of data. It is more efficient to query sequentially and apply round_to on the timestamp field (we can even push round_to to the field loading). For example, with 1,000 tsids over 90 minutes (tbucket=1m), replacing round_to would generate 90 queries. Each query necessitates 1,000 seeks, resulting in decompression and partial reads of many doc-value blocks. `avg_avgot_memory_by_host_5m` reduced from 70ms to 40ms However, if the EsQueryExec index mode is time-series (e.g., rate), we should continue to replace round_to with QueryAndTags when possible, as we currently lack a method to partition the data for increased parallelism.
1 parent de80605 commit 8fd9b86

File tree

8 files changed

+132
-11
lines changed

8 files changed

+132
-11
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.cluster.ClusterName;
1818
import org.elasticsearch.cluster.RemoteException;
19+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1920
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2021
import org.elasticsearch.cluster.project.ProjectResolver;
2122
import org.elasticsearch.cluster.service.ClusterService;
@@ -57,6 +58,7 @@
5758
import org.elasticsearch.index.mapper.MappedFieldType;
5859
import org.elasticsearch.index.mapper.RoutingPathFields;
5960
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
61+
import org.elasticsearch.index.shard.ShardId;
6062
import org.elasticsearch.license.XPackLicenseState;
6163
import org.elasticsearch.search.SearchService;
6264
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
@@ -400,6 +402,11 @@ public boolean isSingleValue(FieldName field) {
400402
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldName name, String value) {
401403
return false;
402404
}
405+
406+
@Override
407+
public Map<ShardId, IndexMetadata> targetShards() {
408+
return Map.of();
409+
}
403410
}
404411

405412
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -576,34 +576,37 @@ public void testProfile() {
576576
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
577577
assertThat(dataProfiles, hasSize(1));
578578
List<OperatorStatus> ops = dataProfiles.get(0).operators();
579-
assertThat(ops, hasSize(8));
579+
assertThat(ops, hasSize(9));
580580
assertThat(ops.get(0).operator(), containsString("LuceneSourceOperator"));
581581
assertThat(ops.get(0).status(), Matchers.instanceOf(LuceneSourceOperator.Status.class));
582582
LuceneSourceOperator.Status status = (LuceneSourceOperator.Status) ops.get(0).status();
583583
assertThat(status.processedShards().size(), Matchers.lessThanOrEqualTo(3));
584-
assertThat(ops.get(1).operator(), containsString("EvalOperator"));
584+
// read _timestamp
585+
assertThat(ops.get(1).operator(), containsString("ValuesSourceReaderOperator"));
586+
// round _timestamp
587+
assertThat(ops.get(2).operator(), containsString("EvalOperator"));
585588
// read _tisd, cpu
586-
assertThat(ops.get(2).operator(), containsString("ValuesSourceReaderOperator"));
587-
var readMetrics = (ValuesSourceReaderOperatorStatus) ops.get(2).status();
589+
assertThat(ops.get(3).operator(), containsString("ValuesSourceReaderOperator"));
590+
var readMetrics = (ValuesSourceReaderOperatorStatus) ops.get(3).status();
588591
assertThat(
589592
readMetrics.readersBuilt().keySet(),
590593
equalTo(
591594
Set.of("_tsid:column_at_a_time:BytesRefsFromOrds.Singleton", "cpu:column_at_a_time:DoublesFromDocValues.Singleton")
592595
)
593596
);
594-
assertThat(ops.get(3).operator(), containsString("TimeSeriesAggregationOperator"));
595-
assertThat(ops.get(4).operator(), containsString("ValuesSourceReaderOperator"));
596-
var readDimensions = (ValuesSourceReaderOperatorStatus) ops.get(4).status();
597+
assertThat(ops.get(4).operator(), containsString("TimeSeriesAggregationOperator"));
598+
assertThat(ops.get(5).operator(), containsString("ValuesSourceReaderOperator"));
599+
var readDimensions = (ValuesSourceReaderOperatorStatus) ops.get(5).status();
597600
assertThat(readDimensions.readersBuilt(), aMapWithSize(1));
598601
assertThat(
599602
Iterables.get(readDimensions.readersBuilt().keySet(), 0),
600603
either(equalTo("cluster:row_stride:BytesRefsFromOrds.Singleton")).or(
601604
equalTo("cluster:column_at_a_time:BytesRefsFromOrds.Singleton")
602605
)
603606
);
604-
assertThat(ops.get(5).operator(), containsString("EvalOperator"));
605-
assertThat(ops.get(6).operator(), containsString("ProjectOperator"));
606-
assertThat(ops.get(7).operator(), containsString("ExchangeSinkOperator"));
607+
assertThat(ops.get(6).operator(), containsString("EvalOperator"));
608+
assertThat(ops.get(7).operator(), containsString("ProjectOperator"));
609+
assertThat(ops.get(8).operator(), containsString("ExchangeSinkOperator"));
607610
}
608611
}
609612
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.xpack.esql.core.expression.Expression;
2727
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
2828
import org.elasticsearch.xpack.esql.core.expression.Literal;
29+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
2930
import org.elasticsearch.xpack.esql.core.querydsl.query.Query;
3031
import org.elasticsearch.xpack.esql.core.tree.Source;
3132
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -303,6 +304,21 @@ protected PhysicalPlan rule(EvalExec evalExec, LocalPhysicalOptimizerContext ctx
303304
);
304305
return evalExec;
305306
}
307+
// Time-series indices are sorted by _tsid, then @timestamp. Replacing round_to causes fragmentation,
308+
// leading to reading many small chunks of data. It is more efficient to query sequentially
309+
// and apply round_to on the timestamp field.
310+
//
311+
// For example, with 1,000 TSIDs over 15 minutes (tbucket=1m), replacing round_to would generate
312+
// 15 queries. Each query would necessitate 1,000 seeks, requiring decompression and partial reads
313+
// of many doc-value blocks.
314+
//
315+
// However, if the EsQueryExec index mode is time-series (e.g., rate), we should replace round_to with
316+
// QueryAndTags when possible, as we currently lack a method to partition the data for increased parallelism.
317+
if (queryExec.indexMode() != IndexMode.TIME_SERIES
318+
&& ((FieldAttribute) roundTo.field()).name().equals(MetadataAttribute.TIMESTAMP_FIELD)
319+
&& ctx.searchStats().targetShards().values().stream().allMatch(imd -> imd.getIndexMode() == IndexMode.TIME_SERIES)) {
320+
return evalExec;
321+
}
306322
plan = planRoundTo(roundTo, evalExec, queryExec, ctx);
307323
}
308324
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.lucene.search.IndexSearcher;
2222
import org.apache.lucene.util.BytesRef;
2323
import org.apache.lucene.util.NumericUtils;
24+
import org.elasticsearch.cluster.metadata.IndexMetadata;
25+
import org.elasticsearch.common.util.Maps;
2426
import org.elasticsearch.index.mapper.ConstantFieldType;
2527
import org.elasticsearch.index.mapper.DocCountFieldMapper.DocCountFieldType;
2628
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -30,6 +32,7 @@
3032
import org.elasticsearch.index.mapper.TextFieldMapper;
3133
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
3234
import org.elasticsearch.index.query.SearchExecutionContext;
35+
import org.elasticsearch.index.shard.ShardId;
3336
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
3437
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
3538
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute.FieldName;
@@ -498,4 +501,15 @@ private boolean doWithContexts(IndexReaderConsumer consumer, boolean acceptsDele
498501
throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
499502
}
500503
}
504+
505+
@Override
506+
public Map<ShardId, IndexMetadata> targetShards() {
507+
Map<ShardId, IndexMetadata> shards = Maps.newHashMapWithExpectedSize(contexts.size());
508+
for (SearchExecutionContext context : contexts) {
509+
IndexMetadata indexMetadata = context.getIndexSettings().getIndexMetadata();
510+
ShardId shardId = new ShardId(context.index(), context.getShardId());
511+
shards.putIfAbsent(shardId, indexMetadata);
512+
}
513+
return shards;
514+
}
501515
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchStats.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
package org.elasticsearch.xpack.esql.stats;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1112
import org.elasticsearch.index.mapper.MappedFieldType;
1213
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
14+
import org.elasticsearch.index.shard.ShardId;
1315
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute.FieldName;
1416

17+
import java.util.Map;
18+
1519
/**
1620
* Interface for determining information about fields in the index.
1721
* This is used by the optimizer to make decisions about how to optimize queries.
@@ -61,6 +65,11 @@ default MappedFieldType fieldType(FieldName name) {
6165
return null;
6266
}
6367

68+
/**
69+
* Returns the target shards and their index metadata.
70+
*/
71+
Map<ShardId, IndexMetadata> targetShards();
72+
6473
/**
6574
* When there are no search stats available, for example when there are no search contexts, we have static results.
6675
*/
@@ -129,6 +138,11 @@ public boolean isSingleValue(FieldName field) {
129138
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldName name, String value) {
130139
return false;
131140
}
141+
142+
@Override
143+
public Map<ShardId, IndexMetadata> targetShards() {
144+
return Map.of();
145+
}
132146
}
133147

134148
/**
@@ -199,5 +213,10 @@ public boolean isSingleValue(FieldName field) {
199213
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldName name, String value) {
200214
throw new UnsupportedOperationException();
201215
}
216+
217+
@Override
218+
public Map<ShardId, IndexMetadata> targetShards() {
219+
throw new UnsupportedOperationException();
220+
}
202221
}
203222
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLocalPhysicalPlanOptimizerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class AbstractLocalPhysicalPlanOptimizerTests extends MapperServiceTestCa
5555
protected TestPlannerOptimizer plannerOptimizer;
5656
protected TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes;
5757
protected TestPlannerOptimizer plannerOptimizerTimeSeries;
58-
private Analyzer timeSeriesAnalyzer;
58+
protected Analyzer timeSeriesAnalyzer;
5959

6060
private static final String PARAM_FORMATTING = "%1$s";
6161

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,22 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
99

10+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1011
import org.elasticsearch.common.logging.LoggerMessageFormat;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.util.CollectionUtils;
1314
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.index.Index;
16+
import org.elasticsearch.index.IndexMode;
17+
import org.elasticsearch.index.IndexSettings;
18+
import org.elasticsearch.index.IndexVersion;
19+
import org.elasticsearch.index.mapper.DateFieldMapper;
1420
import org.elasticsearch.index.query.BoolQueryBuilder;
1521
import org.elasticsearch.index.query.MatchQueryBuilder;
1622
import org.elasticsearch.index.query.QueryBuilder;
1723
import org.elasticsearch.index.query.RangeQueryBuilder;
24+
import org.elasticsearch.index.shard.ShardId;
25+
import org.elasticsearch.test.ESTestCase;
1826
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1927
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
2028
import org.elasticsearch.xpack.esql.core.expression.Alias;
@@ -904,6 +912,51 @@ public void testSubqueryWithCountStarAndDateTrunc() {
904912
assertThat(byStat.queryBuilderAndTags(), is(not(empty())));
905913
}
906914

915+
public void testRoundToWithTimeSeriesIndices() {
916+
Map<String, Object> minValue = Map.of(
917+
"@timestamp",
918+
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-10-20T12:15:03.360Z")
919+
);
920+
Map<String, Object> maxValue = Map.of(
921+
"@timestamp",
922+
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-10-20T14:55:01.543Z")
923+
);
924+
SearchStats searchStats = new EsqlTestUtils.TestSearchStatsWithMinMax(minValue, maxValue) {
925+
@Override
926+
public Map<ShardId, IndexMetadata> targetShards() {
927+
var indexMetadata = IndexMetadata.builder("test_index")
928+
.settings(
929+
ESTestCase.indexSettings(IndexVersion.current(), 1, 1)
930+
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name())
931+
)
932+
.build();
933+
return Map.of(new ShardId(new Index("id", "n/a"), 1), indexMetadata);
934+
}
935+
};
936+
// enable filter-by-filter for rate aggregations
937+
{
938+
String q = """
939+
TS k8s
940+
| STATS max(rate(network.total_bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour)
941+
| LIMIT 10
942+
""";
943+
PhysicalPlan plan = plannerOptimizerTimeSeries.plan(q, searchStats, timeSeriesAnalyzer);
944+
int queryAndTags = plainQueryAndTags(plan);
945+
assertThat(queryAndTags, equalTo(4));
946+
}
947+
// disable filter-by-filter for non-rate aggregations
948+
{
949+
String q = """
950+
TS k8s
951+
| STATS max(avg_over_time(network.bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour)
952+
| LIMIT 10
953+
""";
954+
PhysicalPlan plan = plannerOptimizerTimeSeries.plan(q, searchStats, timeSeriesAnalyzer);
955+
int queryAndTags = plainQueryAndTags(plan);
956+
assertThat(queryAndTags, equalTo(1));
957+
}
958+
}
959+
907960
private static SearchStats searchStats() {
908961
// create a SearchStats with min and max in milliseconds
909962
Map<String, Object> minValue = Map.of("date", 1697804103360L); // 2023-10-20T12:15:03.360Z

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
package org.elasticsearch.xpack.esql.stats;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1112
import org.elasticsearch.index.mapper.MappedFieldType;
1213
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
14+
import org.elasticsearch.index.shard.ShardId;
1315
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute.FieldName;
1416

17+
import java.util.Map;
18+
1519
public class DisabledSearchStats implements SearchStats {
1620

1721
@Override
@@ -79,4 +83,9 @@ public boolean isSingleValue(FieldName field) {
7983
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldName name, String value) {
8084
return false;
8185
}
86+
87+
@Override
88+
public Map<ShardId, IndexMetadata> targetShards() {
89+
return Map.of();
90+
}
8291
}

0 commit comments

Comments
 (0)