Partition rate query using tsid prefixes#144818
Conversation
| } | ||
| } | ||
|
|
||
| List<List<PartialLeafReaderContext>> partition(List<LeafReaderContext> leaves, int docsPerSlice) throws IOException { |
|
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
|
@kkrik-es I think there is a bug in the combine partitions that can drop some slices - didn't figure it out for a while (tests didn't catch it). I think the win should be much smaller (and more realistic). I am running the benchmark again. |
|
Buildkite benchmark this with tsdb-metricsgen-270m please |
💚 Build Succeeded
This build ran two tsdb-metricsgen-270m benchmarks to evaluate performance impact of this PR. History |
| out.writeByte(id); | ||
| byte val = id; | ||
| if (this == TIME_SERIES && out.getTransportVersion().supports(TIME_SERIES_PARTITIONING) == false) { | ||
| val = DOC.id; // make time-series as DOC |
There was a problem hiding this comment.
| val = DOC.id; // make time-series as DOC | |
| val = DOC.id; // fall back to DOC partitioning strategy for time-series |
| final Map<Integer, PrefixGroup> groups = new TreeMap<>(); // ordered by prefixes | ||
| PartitionedDocValues.PrefixPartitions prefixPartitions = null; | ||
| for (LeafReaderContext leaf : leaves) { | ||
| var tsid = leaf.reader().getSortedDocValues(TimeSeriesIdFieldMapper.NAME); |
There was a problem hiding this comment.
Super nit: rename to tsidValues?
| return combineGroups(groups.values().stream().toList(), docsPerSlice); | ||
| } | ||
|
|
||
| private List<List<PartialLeafReaderContext>> combineGroups(List<PrefixGroup> groups, int docsPerSlice) { |
There was a problem hiding this comment.
Nit: let's add a comment outlining what this does. Iiuc it combines groups to create chunkier slices so that they can be assigned to separate threads and be processed in parallel efficiently.
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java
Show resolved
Hide resolved
| .put("mode", "time_series") | ||
| .putList("routing_path", List.of("host", "cluster")) | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) |
There was a problem hiding this comment.
Shall we use a random(1,3) here? We can assert on the partitioning strategy only when this is 1.
|
Hm results show very modest wins.. Did the change apply? |
This change wires the prefix partitions introduced in #144617 to the compute engine.
Today, we partition the rate query by interval via replacing round_to with query_and_tags. With 10k time-series and a 5-minute bucket, each interval query reads all 10k time-series from every segment. In the rate aggregation, we buffer data points for all 10k time-series and maintain a priority queue across all of them within each interval. This approach increases concurrency to avoid underutilizing CPUs, but adds overhead and is not I/O friendly due to fragmented reads.
With prefix partitions, we partition data by groups of contiguous time-series instead. For example, 10k time-series can be split into 1024 groups of ~10 each. Each group reads all matching data points, and because these time-series are co-located in each segment, reads are sequential and I/O friendly. In the rate aggregation, the priority queue manages only ~10 time-series per group instead of 10k, significantly reducing overhead and memory usage. To avoid excessive overhead from tiny partitions, we merge adjacent partitions up to a target size (500k docs).
When prefix partitioning is not available (e.g., older codec without prefix layout), we fall back to the current behavior.