-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Description
Our current execution path for rate aggregation, particularly for calculating counter resets, requires data in each bucket to be strictly ordered by timestamp. This necessitates a specialized execution path for time-series data, which I believe is unnecessarily complex.
This proposes an alternative model: instead of pre-sorting data in the source operator, we buffer data in each bucket and perform a merge-sort just before emitting the output. This would eliminate the need for specialized time-series code and allow us to leverage existing ES|QL optimizations.
The main downside is the memory usage for buffering rate points. Each data point requires about 16 bytes; typical queries over a few million points would use less than 100MB, but worst-case scenarios could consume up to 32GB, potentially causing circuit breaking errors.
We can mitigate this with the following enhancements:
-
Execute segments in descending max_timestamp order: By processing segments this way, the source operator can provide a "high-water mark" (the maximum timestamp that may appear in the current or subsequent segments). This allows the rate aggregator to safely flush any buffered data that is more recent than this mark, keeping the buffer size minimal or avoid buffering data points.
-
Dynamically split shards by time interval: For large time ranges with interleaved data, we can partition execution into smaller time intervals based on min and max timestamps. This limits buffer size and improves parallelism.
Sub-tasks:
- Cut over from the current rate to the new rate with buffer. This new rate still delegates to the old rate after flushing the buffer: New execution model for rates #134267
- Fold the old rate implementation to the new implementation: Merge old rate implementation to new implementation #134603
- Provide the timestamp watermark in time-series source operator.
- Integrate with the old into the new rate, and remove it.
- Dynamically split shards by time interval.