Skip to content

Conversation

@dnhatn
Copy link
Member

@dnhatn dnhatn commented Sep 7, 2025

Our current execution path for rate aggregation, particularly for calculating counter resets, requires data in each bucket to be strictly ordered by timestamp. This necessitates a specialized execution path for time-series data, which I believe is unnecessarily complex.

I've explored an alternative model: instead of pre-sorting data in the source operator, we buffer data in each bucket and perform a merge-sort just before emitting the output. This would eliminate the need for specialized time-series code and allow us to leverage existing ES|QL optimizations.

The main downside is the memory usage for buffering rate points. Each data point requires about 16 bytes; typical queries over a few million points would use less than 100MB, but worst-case scenarios could consume up to 32GB, potentially causing circuit breaking errors.

We can mitigate this with the following enhancements:

  1. Execute segments in descending max_timestamp order: By processing segments this way, the source operator can provide a "high-water mark" (the maximum timestamp that may appear in the current or subsequent segments). This allows the rate aggregator to safely flush any buffered data that is more recent than this mark, keeping the buffer size minimal or avoid buffering data points.

  2. Dynamically split shards by time interval: For large time ranges with interleaved data, we can partition execution into smaller time intervals based on min and max timestamps. This limits buffer size and improves parallelism.

This PR is the first step, it cut over from the current to the new rate with buffer. This new rate still delegates to the old rate after merging the buffer.

I benchmarked this change for the below query, the executiton time reduced from 405ms -> 270ms.

TS my* 
| WHERE `metrics.system.cpu.time` IS NOT NULL AND @timestamp >= \"2025-07-25T14:55:59.000Z\" AND @timestamp <= \"2025-07-25T16:25:59.000Z\"
| STATS AVG(RATE(`metrics.system.cpu.time`)) BY host.name, BUCKET(@timestamp, 1h) 
| LIMIT 10000

I expect more to come with high-water mark.

Relates #134324

@dnhatn dnhatn force-pushed the buffered-rate branch 9 times, most recently from 6ba4367 to 681169a Compare September 8, 2025 19:13
@dnhatn dnhatn added >non-issue :StorageEngine/TSDB You know, for Metrics labels Sep 8, 2025

import java.util.List;

public final class RateDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new rate classes are the main change; the rest removes the specialized execution path for rates.

@dnhatn dnhatn mentioned this pull request Sep 8, 2025
5 tasks
/**
* {@link GroupingAggregatorFunction} implementation for {@link OldRateDoubleAggregator}.
*/
public final class OldRateDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved because Javadoc does not work with code-generated classes. This class will be removed in a follow-up.

@dnhatn dnhatn requested review from kkrik-es and martijnvg September 9, 2025 00:50
@dnhatn dnhatn marked this pull request as ready for review September 9, 2025 00:50
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

@elasticsearchmachine elasticsearchmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:StorageEngine labels Sep 9, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

}
return;
}
class Slice {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move outside the method as a private class for readability.

pq.updateTop();
}
var val = buffer.values.get(position);
reset += dv(val, prevValue) + dv(prevValue, lastValue) - dv(val, lastValue);
Copy link
Contributor

@kkrik-es kkrik-es Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused with this one.. Mind adding a comment about how this works? I'd think (naively) that we only need to keep adding dv(val, prevValue) to the delta as that one detects resets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this formula allows tracking (a) the delta before resetting, (b) using 0 as the new low bound for values. If so, worth documenting in a comment.

}
}

PriorityQueue<Slice> pq = new PriorityQueue<>(buffer.sliceOffsets.length + 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the definition and initialization of the priority queue can also be moved into Slice for readability.

return state;
}

static class Buffer implements Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a comment outlining how the buffer works, i.e. storing multiple slices in a single array and tracking the start offset of each one.

int newSize = totalCount + count;
timestamps = bigArrays.grow(timestamps, newSize);
values = bigArrays.grow(values, newSize);
if (totalCount > 0 && firstTimestamp > timestamps.get(totalCount - 1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that timestamps appearing out of order indicate the start of a new slice.

};
{
int startOffset = 0;
for (int sliceOffset : buffer.sliceOffsets) {
Copy link
Contributor

@kkrik-es kkrik-es Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this loop seems to belong more to Buffer :)

It can return an array of slices to be fed then to the priority queue.


@Override
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
flushBuffers(selected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm my understanding, this runs in a single thread per shard per bucket, after we collect all the data from all segments for this bucket?

Copy link
Contributor

@kkrik-es kkrik-es left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really, really cool - and you get to clean up so much special logic as a bonus. Just some nits to improve readability.

The comment about in-mem buffering is legit. I wonder if we can fall back to the slow path if we estimate that the buffers will exceed e.g. 1% of available memory.

@dnhatn
Copy link
Member Author

dnhatn commented Sep 9, 2025

Thanks Kostas! I have addressed all of your comments.

@dnhatn dnhatn merged commit d9f36b3 into elastic:main Sep 9, 2025
34 checks passed
@dnhatn dnhatn deleted the buffered-rate branch September 9, 2025 21:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >non-issue :StorageEngine/TSDB You know, for Metrics Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:StorageEngine v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants