|
| 1 | +--- |
| 2 | +title: Streaming Aggregation in OpenObserve |
| 3 | +description: Learn how Streaming Aggregation works in OpenObserve Enterprise. |
| 4 | +--- |
| 5 | +This page explains what Streaming Aggregation is and shows how to use it to improve query performance with aggregation cache in OpenObserve. |
| 6 | + |
| 7 | +> This is an enterprise feature. |
| 8 | +
|
| 9 | +=== "Overview" |
| 10 | + |
| 11 | + ## What is streaming aggregation? |
| 12 | + |
| 13 | + Streaming aggregation in OpenObserve enables **aggregation cache**. When streaming aggregation is enabled, OpenObserve begins caching the factors required to compute aggregates for each time partition during query execution. These cached values can then be reused for later queries that cover the same or overlapping time ranges. |
| 14 | + |
| 15 | + ??? "Why aggregation cache matters" |
| 16 | + |
| 17 | + Aggregation queries often scan large volumes of historical data. Without caching, every query recomputes all partitions of the requested time range, even if the results were already computed before. |
| 18 | + Aggregation cache works by: |
| 19 | + |
| 20 | + |
| 21 | + - Caching the factors required to compute aggregates, such as sums and counts, rather than the final aggregate values. |
| 22 | + - Reusing these stored values when a later query requests the same partitions. |
| 23 | + - Computing only the missing partitions and combining them with cached results. |
| 24 | + |
| 25 | + This approach reduces repeated computation and lowers dashboard latency while preserving accuracy. |
| 26 | + |
| 27 | + ??? "Relationship between streaming aggregation and aggregation cache" |
| 28 | + |
| 29 | + - **Streaming aggregation** is the feature toggle in Enterprise settings. |
| 30 | + - **Aggregation cache** is the mechanism that becomes active when streaming aggregation is enabled. |
| 31 | + |
| 32 | + !!! Note "Who can use it" |
| 33 | + All Enterprise users. |
| 34 | + |
| 35 | + !!! Note "Where to find it" |
| 36 | + To enable aggregation cache: |
| 37 | + |
| 38 | + 1. Go to **Management > General Settings**. |
| 39 | + 2. Turn on the **Enable Streaming Aggregation** toggle. |
| 40 | + 3. Select **Save**. |
| 41 | + |
| 42 | + !!! Note "Environment variables" |
| 43 | + |
| 44 | + | Environment Variable | Description | Default Value | |
| 45 | + | ------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------- | |
| 46 | + | `ZO_FEATURE_QUERY_STREAMING_AGGS` | Enables or disables streaming aggregation. When set to `true`, aggregation queries use the aggregation cache. | `true` | |
| 47 | + | `ZO_DATAFUSION_STREAMING_AGGS_CACHE_MAX_ENTRIES` | Defines the maximum number of cache entries stored for streaming aggregations. Controls how many partition results are retained. | `10000` | |
| 48 | + | `ZO_DISK_AGGREGATION_CACHE_MAX_SIZE` | Sets the maximum size for record batch cache on disk. By default, it is 10 percent of the local volume space, capped at 20 GB. | 10 percent of volume, up to 20 GB | |
| 49 | + | `ZO_CACHE_DELAY_SECS` | Defines the number of seconds to wait before aggregation results become eligible to cache. | 300 secs | |
| 50 | + | `ZO_AGGREGATION_TOPK_ENABLED` | Enables the `approx_topk` function. | true | |
| 51 | + |
| 52 | + |
| 53 | + --- |
| 54 | + |
| 55 | + ## How it work |
| 56 | + |
| 57 | + **First run: partitioning and caching aggregate factors** <br> |
| 58 | + When an aggregation query runs for the first time, OpenObserve divides the requested time range into fixed-size partitions. Each partition is processed separately. Instead of storing the final aggregates, OpenObserve caches the factors required to compute the aggregate. For example, it caches sums and counts, which can later be combined to produce averages. |
| 59 | + |
| 60 | + These results are cached on disk. This creates the initial **Aggregation cache** for the query stream. <br> |
| 61 | + |
| 62 | + **Later runs: reuse of cached partitions**<br> |
| 63 | + When another query runs with the same stream, filters, and grouping, OpenObserve checks the cache. If the requested time range overlaps with existing partitions, it reuses the cached results and computes only the missing partitions. Results remain accurate because cached sums, counts, and other stored values can be combined with new results to compute the final aggregates. |
| 64 | + |
| 65 | + --- |
| 66 | + |
| 67 | + ## How it handles late-arriving data |
| 68 | + To handle late-arriving data, OpenObserve applies a delay window before marking aggregation results as eligible to cache. |
| 69 | + The system compares the query time with the end of the selected time range. If the end of the range falls within the delay window, the result is not cached. This ensures that results include all delayed records before being stored. |
| 70 | + The delay window is configured through the environment variable `ZO_CACHE_DELAY_SECS`. The default value is 300 secs (5 minutes). You can adjust this value to match the ingestion delay in your environment. For example, if logs typically arrive with up to 10 minutes of delay, set the variable to 600 secs. |
| 71 | + |
| 72 | + --- |
| 73 | + |
| 74 | + ## Query Behavior |
| 75 | + When aggregation cache is enabled, OpenObserve writes intermediate results into disk as Arrow IPC files. These files store values that can be safely combined later instead of full raw logs. |
| 76 | + Example query |
| 77 | + ```sql |
| 78 | + SELECT avg(response_time), sum(bytes), count(*) |
| 79 | + FROM access_logs |
| 80 | + WHERE status = 200 |
| 81 | + ``` |
| 82 | + |
| 83 | + **First run** <br> |
| 84 | + Suppose the query time range covers two partitions, A and B. The system scans both partitions fully. It caches sums and counts for each partition. |
| 85 | + |
| 86 | + | Partition | SUM(bytes) | COUNT(*) | SUM(response_time) | AVG(response_time) | |
| 87 | + | --------- | ---------- | --------- | ------------------- | ------------------- | |
| 88 | + | A | 3000 | 30 | 1500 | 50 | |
| 89 | + | B | 5000 | 50 | 2500 | 50 | |
| 90 | + |
| 91 | + **Later runs** <br> |
| 92 | + |
| 93 | + - When you run the same query again for the same or overlapping range, the system fetches results from cached partitions. |
| 94 | + - If there is a new partition C, only C is scanned. Cached results for A and B are reused. |
| 95 | + - Results are then merged safely. |
| 96 | + |
| 97 | + | Partition | SUM(bytes) | COUNT(*) | SUM(response_time) | |
| 98 | + | --------- | ---------- | --------- | ------------------- | |
| 99 | + | A | 3000 | 30 | 1500 | |
| 100 | + | B | 5000 | 50 | 2500 | |
| 101 | + | C | 4000 | 40 | 1600 | |
| 102 | + | **Final** | **12000** | **120** | **5600** | |
| 103 | + |
| 104 | + **How averages are calculated** <br> |
| 105 | + |
| 106 | + - **SUM(bytes)** is computed as `3000 + 5000 + 4000 = 12000`. |
| 107 | + - **COUNT(*)** is computed as `30 + 50 + 40 = 120`. |
| 108 | + - **AVG(response_time)** is computed from merged sums and counts: `5600 ÷ 120 = 46.67`. |
| 109 | + |
| 110 | + --- |
| 111 | + |
| 112 | + ## Cached file names |
| 113 | + Cached file names reflect the start and end timestamps of the partition. For example: |
| 114 | + ``` |
| 115 | + /data/openobserve/cache/aggregations/default/logs/oly/ |
| 116 | + 13018130667245808899_30/ |
| 117 | + 1756116000000000_1756117800000000.arrow |
| 118 | + ``` |
| 119 | + --- |
| 120 | + |
| 121 | + ## Guarantee of accuracy |
| 122 | + Accuracy is guaranteed by [Apache Arrow DataFusion](https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/). OpenObserve caches DataFusion’s internal intermediate representation on disk and DataFusion later combines and finalizes it, producing the same results as a fresh run over the same stream, filters, grouping, and time range. |
| 123 | + |
| 124 | + --- |
| 125 | + |
| 126 | + ## Cacheability of Queries |
| 127 | + Not all queries can benefit from aggregation cache. For a query to be cacheable, OpenObserve must be able to store and safely merge intermediate results across partitions. |
| 128 | + |
| 129 | + ## Supported aggregate functions |
| 130 | + The following aggregates are directly supported for caching: |
| 131 | + |
| 132 | + - min |
| 133 | + - max |
| 134 | + - sum |
| 135 | + - count |
| 136 | + - avg |
| 137 | + - median |
| 138 | + - array_agg |
| 139 | + - percentile_cont |
| 140 | + - summary_percentile |
| 141 | + - first_value |
| 142 | + - last_value |
| 143 | + - [approx_distinct](https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#approx-distinct) |
| 144 | + - [approx_median](https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#approx-median) |
| 145 | + - [approx_percentile_cont](https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#approx-percentile-cont) |
| 146 | + - [approx_percentile_cont_with_weight](https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#approx-percentile-cont-with-weight) |
| 147 | + - [approx_topk](https://openobserve.ai/docs/sql-functions/approximate-aggregate/approx-topk/) |
| 148 | + - [approx_topk_distinct](http://openobserve.ai/docs/sql-functions/approximate-aggregate/approx-topk-distinct/) |
| 149 | + |
| 150 | +=== "How to use" |
| 151 | + |
| 152 | + ## How to use streaming aggregation |
| 153 | + **Example query** |
| 154 | + ```sql |
| 155 | + SELECT |
| 156 | + k8s_deployment_name as "x_axis_1", |
| 157 | + count(_timestamp) as "y_axis_1" |
| 158 | + FROM "default" |
| 159 | + GROUP BY x_axis_1 |
| 160 | + ``` |
| 161 | + |
| 162 | + You can apply the aggregation query in any place where queries are executed, such as Logs or Dashboards. To measure load time, check cacheability, and verify cache usage, use your browser’s developer tools. Right-click the browser, select Inspect, open the Network tab, and filter by Fetch/XHR. |
| 163 | + |
| 164 | + The following example is performed with Streaming Search enabled. Aggregation cache works the same when Streaming Search is disabled. |
| 165 | + |
| 166 | + **Step 1: Run the aggregation query** <br> |
| 167 | + |
| 168 | + 1. Go to the **Logs** page. |
| 169 | + 2. In the SQL query editor, enter the aggregation query. |
| 170 | + 4. Select a time range, for example Past 6 days. |
| 171 | + 5. Select **Run Query**. |
| 172 | + The results show event counts per deployment. |
| 173 | +  |
| 174 | + |
| 175 | + **Step 2: Check if the query is eligible for caching** <br> |
| 176 | + After the first run, if `streaming_aggs` is true and `streaming_id` has a value, the query is eligible for caching. You can check these fields in the query response by using your browser developer tools. Open **Network**, select the `_search_partition` entry, and view the **Response** tab to confirm the values. |
| 177 | + |
| 178 | +  |
| 179 | + |
| 180 | + **Step 3: Run the query again with overlapping time** <br> |
| 181 | + |
| 182 | + Select a time window that overlaps with the earlier run, for example Past 6 days or Past 1 week, and run the query again. Cached partitions are reused and only new partitions are computed, which reduces execution time. |
| 183 | + |
| 184 | + Second run shows the result for the Past 6 days. |
| 185 | +  |
| 186 | + |
| 187 | + Third run shows the result for the Past 1 week. |
| 188 | +  |
| 189 | + |
| 190 | + **Step 4: Confirm cache reuse on later runs** |
| 191 | + |
| 192 | + Use your browser developer tools. <br> |
| 193 | + |
| 194 | + 1. In the Network tab of your developer tool, select one of the later runs. |
| 195 | + 2. Open **Response**. |
| 196 | + 3. Capture the following details: |
| 197 | + |
| 198 | + - `streaming_aggs` must be true |
| 199 | + - `streaming_id` must have a value |
| 200 | + - `streaming_output` must be true |
| 201 | + - `result_cache_ratio` per partition: |
| 202 | + |
| 203 | + - `100` means the partition came from aggregation cache. |
| 204 | + - `0` means the partition was computed on this run. |
| 205 | + |
| 206 | + **Notes:** |
| 207 | + |
| 208 | + - The first successful run does not reuse aggregation cache because nothing has been stored yet. |
| 209 | + - Ratios other than 0 or 100 indicate general result caching, not aggregation cache. |
| 210 | + |
| 211 | + --- |
| 212 | + |
| 213 | +## Performance benefits |
| 214 | +Streaming aggregation is enabled in all the following test runs: |
| 215 | + |
| 216 | +**Test run 1**: |
| 217 | + |
| 218 | +- Time range: `2025-08-13 00:00:00 - 2025-08-20 00:00:00` |
| 219 | +- Time taken to load the dashboard: `6.84 s` |
| 220 | +- `result_cache_ratio` is `0` in all partitions |
| 221 | + |
| 222 | + |
| 223 | +**Test run 2**: |
| 224 | + |
| 225 | +- Time range: `2025-08-13 00:00:00 - 2025-08-20 00:00:00` |
| 226 | +- Time taken to load the dashboard: `3.00 s` |
| 227 | +- `result_cache_ratio` is `100` in all partitions |
| 228 | + |
| 229 | + |
| 230 | +**Test run 3**: |
| 231 | + |
| 232 | +- Time range: `2025-08-6 00:00:00 - 2025-08-20 00:00:00` |
| 233 | +- Time taken to load the dashboard: `6.36 s` |
| 234 | +- `result_cache_ratio` is `100` for partitions that cover the time range `2025-08-13 00:00:00 → 2025-08-20 00:00:00` and `result_cache_ratio` is `0` for the rest of the partitions |
| 235 | + |
| 236 | + |
| 237 | +**Test run 4**: |
| 238 | + |
| 239 | +- Time range: `2025-08-6 00:00:00 - 2025-08-20 00:00:00` |
| 240 | +- Time taken to load the dashboard: `3.38 s` |
| 241 | +- `result_cache_ratio` is `100` for all partitions |
| 242 | + |
| 243 | + |
| 244 | +--- |
| 245 | + |
| 246 | +## Limitations |
| 247 | + |
| 248 | +- Very complex queries may not be eligible for cache reuse yet. Examples include joins, nested subqueries, heavy window functions, and large unions |
| 249 | +- The first run pays full computation cost to populate the cache |
| 250 | +- Reuse depends on partition availability. Eviction due to capacity limits can reduce reuse. |
| 251 | + |
| 252 | +--- |
| 253 | + |
| 254 | +## Troubleshooting |
| 255 | + |
| 256 | +1. **Second run is not faster** |
| 257 | + |
| 258 | + - **Cause**: The query was not cacheable or the first run did not complete. |
| 259 | + - **Fix**: Align time windows and filters with the first run. Verify `streaming_aggs` and `streaming_id`. After a successful first run, confirm `result_cache_ratio` equals `100` on some partitions. |
| 260 | + |
| 261 | +2. **Different panels do not benefit** |
| 262 | + |
| 263 | + - **Cause**: Time windows or filters differ. |
| 264 | + - **Fix**: Use the same windows and the same filters across panels that analyze the same stream. |
| 265 | + |
| 266 | + |
| 267 | + |
| 268 | + |
0 commit comments