Skip to content

Commit 030f662

Browse files
authored
feat: make metric bucket cap configurable (#14)
1 parent 076e0bb commit 030f662

File tree

4 files changed

+73
-8
lines changed

4 files changed

+73
-8
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The adapter listens on `--bind` (default `0.0.0.0:3100`) and exposes a minimal s
2222
| `--timestamp-column` | `TIMESTAMP_COLUMN` | auto-detect | Override the timestamp column name. |
2323
| `--line-column` | `LINE_COLUMN` | auto-detect | Override the log line column name. |
2424
| `--labels-column` | `LABELS_COLUMN` | auto-detect (loki only) | Override the labels column name. |
25+
| `--max-metric-buckets` | `MAX_METRIC_BUCKETS` | `240` | Maximum bucket count per metric range query before clamping `step`. |
2526

2627
## Schema Support
2728

@@ -122,7 +123,7 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha
122123
| Endpoint | Description |
123124
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
124125
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
125-
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. |
126+
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require a `step` value (the adapter may clamp it to keep bucket counts bounded, default cap 240 buckets). |
126127
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
127128
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |
128129
| `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. |
@@ -148,7 +149,7 @@ The adapter currently supports a narrow LogQL metric surface area:
148149
- Range functions: `count_over_time` and `rate`. The latter reports per-second values (`COUNT / window_seconds`).
149150
- Optional outer aggregations: `sum`, `avg`, `min`, `max`, `count`, each with `by (...)`. `without` or other modifiers return `errorType:bad_data`.
150151
- Pipelines: only `drop` stages are honored (labels are removed after aggregation to match Loki semantics). Any other stage still results in `errorType:bad_data`.
151-
- `/loki/api/v1/query_range` metric calls must provide `step`, and `step` must equal the range selector duration so Databend can materialize every bucket in a single SQL statement; the adapter never fans out multiple queries or aggregates in memory.
152+
- `/loki/api/v1/query_range` metric calls must provide `step`. When the requested `(end - start) / step` would exceed the configured bucket cap (default 240, tweak via `--max-metric-buckets`), the adapter automatically increases the effective step to keep the SQL result size manageable; the adapter never fans out multiple queries or aggregates in memory.
152153
- `/loki/api/v1/query` metric calls reuse the same expressions but evaluate them over `[time - range, time]`.
153154

154155
Both schema adapters (loki VARIANT labels and flat wide tables) translate the metric expression into one SQL statement that joins generated buckets with the raw rows via `generate_series`, so all aggregation happens inside Databend. Non-metric queries continue to stream raw logs.

src/app/handlers.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,18 @@ async fn range_query(
197197
.as_deref()
198198
.ok_or_else(|| AppError::BadRequest("step is required for metric queries".into()))?;
199199
let step_duration = parse_step_duration(step_raw)?;
200-
let step_ns = step_duration.as_nanoseconds();
200+
let requested_step_ns = step_duration.as_nanoseconds();
201201
let window_ns = metric.range.duration.as_nanoseconds();
202-
if step_ns != window_ns {
203-
return Err(AppError::BadRequest(
204-
"metric range queries require step to match the range selector duration".into(),
205-
));
202+
let range_ns = end - start;
203+
let step_ns = clamp_metric_step_ns(range_ns, requested_step_ns, state.max_metric_buckets());
204+
if step_ns != requested_step_ns {
205+
log::info!(
206+
"metric step clamped to limit buckets: range={:.3}s requested_step={:.3}s effective_step={:.3}s max_buckets={}",
207+
(range_ns as f64) / 1_000_000_000_f64,
208+
(requested_step_ns as f64) / 1_000_000_000_f64,
209+
(step_ns as f64) / 1_000_000_000_f64,
210+
state.max_metric_buckets()
211+
);
206212
}
207213
let plan = state.schema().build_metric_range_query(
208214
state.table(),
@@ -340,6 +346,16 @@ fn parse_numeric_step_seconds(step_raw: &str) -> Result<DurationValue, String> {
340346
.map_err(|err| format!("failed to convert numeric seconds to duration: {err}"))
341347
}
342348

349+
fn clamp_metric_step_ns(range_ns: i64, requested_step_ns: i64, max_buckets: i64) -> i64 {
350+
if range_ns <= 0 || requested_step_ns <= 0 {
351+
return requested_step_ns;
352+
}
353+
let max_buckets = max_buckets.max(1);
354+
let numerator = i128::from(range_ns) + i128::from(max_buckets - 1);
355+
let min_step_ns = (numerator / i128::from(max_buckets)) as i64;
356+
requested_step_ns.max(min_step_ns.max(1))
357+
}
358+
343359
async fn label_names(
344360
State(state): State<AppState>,
345361
Query(params): Query<LabelsQueryParams>,
@@ -633,7 +649,10 @@ impl TailRequest {
633649

634650
#[cfg(test)]
635651
mod tests {
636-
use super::{ProcessedEntry, TailCursor, filter_tail_entries, parse_constant_vector_expr};
652+
use super::{
653+
ProcessedEntry, TailCursor, clamp_metric_step_ns, filter_tail_entries,
654+
parse_constant_vector_expr,
655+
};
637656
use std::collections::BTreeMap;
638657

639658
#[test]
@@ -694,4 +713,31 @@ mod tests {
694713
.is_empty()
695714
);
696715
}
716+
717+
#[test]
718+
fn clamps_when_bucket_count_exceeds_limit() {
719+
let range_ns = 3_600_000_000_000;
720+
let requested_step_ns = 1_000_000_000;
721+
assert_eq!(
722+
clamp_metric_step_ns(range_ns, requested_step_ns, 600),
723+
6_000_000_000
724+
);
725+
}
726+
727+
#[test]
728+
fn leaves_large_steps_unchanged() {
729+
let range_ns = 10_800_000_000_000;
730+
let requested_step_ns = 60_000_000_000;
731+
assert_eq!(
732+
clamp_metric_step_ns(range_ns, requested_step_ns, 600),
733+
requested_step_ns
734+
);
735+
}
736+
737+
#[test]
738+
fn handles_zero_or_negative_ranges() {
739+
assert_eq!(clamp_metric_step_ns(0, 1_000_000, 600), 1_000_000);
740+
assert_eq!(clamp_metric_step_ns(-10, 1_000_000, 600), 1_000_000);
741+
assert_eq!(clamp_metric_step_ns(1_000, 0, 600), 0);
742+
}
697743
}

src/app/state.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct AppState {
3434
table: TableRef,
3535
parser: LogqlParser,
3636
schema: SchemaAdapter,
37+
max_metric_buckets: i64,
3738
}
3839

3940
impl AppState {
@@ -43,6 +44,7 @@ impl AppState {
4344
table,
4445
schema_type,
4546
schema_config,
47+
max_metric_buckets,
4648
} = config;
4749
info!("resolving table reference for `{table}`");
4850
let table = resolve_table_ref(&dsn, &table)?;
@@ -65,6 +67,7 @@ impl AppState {
6567
table,
6668
parser: LogqlParser,
6769
schema,
70+
max_metric_buckets: i64::from(max_metric_buckets.max(1)),
6871
})
6972
}
7073

@@ -80,6 +83,10 @@ impl AppState {
8083
&self.schema
8184
}
8285

86+
pub fn max_metric_buckets(&self) -> i64 {
87+
self.max_metric_buckets
88+
}
89+
8390
pub fn parse(&self, query: &str) -> Result<LogqlExpr, AppError> {
8491
self.parser.parse(query).map_err(AppError::from)
8592
}
@@ -117,6 +124,7 @@ pub struct AppConfig {
117124
pub table: String,
118125
pub schema_type: SchemaType,
119126
pub schema_config: SchemaConfig,
127+
pub max_metric_buckets: u32,
120128
}
121129

122130
async fn verify_connection(client: &Client) -> Result<(), AppError> {

src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ mod databend;
2525
mod error;
2626
mod logql;
2727

28+
const DEFAULT_MAX_METRIC_BUCKETS: u32 = 240;
29+
2830
#[derive(Debug, Parser)]
2931
#[command(author, version, about, disable_help_subcommand = true)]
3032
struct Args {
@@ -54,6 +56,13 @@ struct Args {
5456
/// Override the column storing labels (loki schema only)
5557
#[arg(long = "labels-column", env = "LABELS_COLUMN")]
5658
labels_column: Option<String>,
59+
/// Maximum number of buckets per metric range query
60+
#[arg(
61+
long = "max-metric-buckets",
62+
env = "MAX_METRIC_BUCKETS",
63+
default_value_t = DEFAULT_MAX_METRIC_BUCKETS
64+
)]
65+
max_metric_buckets: u32,
5766
}
5867

5968
#[derive(Copy, Clone, Debug, ValueEnum)]
@@ -97,6 +106,7 @@ async fn main() -> Result<(), AppError> {
97106
line_column: args.line_column.clone(),
98107
labels_column: args.labels_column.clone(),
99108
},
109+
max_metric_buckets: args.max_metric_buckets,
100110
};
101111
info!("bootstrapping application state");
102112
let state = AppState::bootstrap(config).await?;

0 commit comments

Comments
 (0)