Skip to content

Commit d697fc3

Browse files
authored
feat: support metrics query (#7)
1 parent fade002 commit d697fc3

File tree

9 files changed

+1947
-29
lines changed

9 files changed

+1947
-29
lines changed

README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,24 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha
122122
| Endpoint | Description |
123123
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
124124
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
125-
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. The `step` parameter is parsed but ignored because the adapter streams raw log lines rather than resampled metrics. |
125+
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. |
126126
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
127127
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |
128128

129129
`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.
130130

131+
### Metric queries
132+
133+
The adapter currently supports a narrow LogQL metric surface area:
134+
135+
- Range functions: `count_over_time` and `rate`. The latter reports per-second values (`COUNT / window_seconds`).
136+
- Optional outer aggregations: `sum`, `avg`, `min`, `max`, `count`, each with `by (...)`. `without` or other modifiers return `errorType:bad_data`.
137+
- Pipelines: only `drop` stages are honored (labels are removed after aggregation to match Loki semantics). Any other stage still results in `errorType:bad_data`.
138+
- `/loki/api/v1/query_range` metric calls must provide `step`, and `step` must equal the range selector duration so Databend can materialize every bucket in a single SQL statement; the adapter never fans out multiple queries or aggregates in memory.
139+
- `/loki/api/v1/query` metric calls reuse the same expressions but evaluate them over `[time - range, time]`.
140+
141+
Both schema adapters (loki VARIANT labels and flat wide tables) translate the metric expression into one SQL statement that joins generated buckets with the raw rows via `generate_series`, so all aggregation happens inside Databend. Non-metric queries continue to stream raw logs.
142+
131143
## Logging
132144

133145
By default the adapter configures `env_logger` with `databend_loki_adapter` at `info` level and every other module at `warn`. This keeps the startup flow visible without flooding the console with dependency logs. To override the levels, set `RUST_LOG` just like any other `env_logger` application, e.g.:

src/app/handlers.rs

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ use serde::Deserialize;
2626
use std::time::Instant;
2727

2828
use crate::{
29-
databend::{LabelQueryBounds, QueryBounds, SqlOrder, execute_query},
29+
databend::{
30+
LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder,
31+
execute_query,
32+
},
3033
error::AppError,
34+
logql::DurationValue,
3135
};
3236

3337
use super::{
34-
responses::{LabelsResponse, LokiResponse, rows_to_streams},
38+
responses::{LabelsResponse, LokiResponse, metric_matrix, metric_vector, rows_to_streams},
3539
state::{AppState, DEFAULT_LOOKBACK_NS},
3640
};
3741

@@ -72,9 +76,31 @@ async fn instant_query(
7276
Query(params): Query<InstantQueryParams>,
7377
) -> Result<Json<LokiResponse>, AppError> {
7478
let target_ns = params.time.unwrap_or_else(current_time_ns);
79+
log::debug!(
80+
"instant query received: query=`{}` limit={:?} time_ns={}",
81+
params.query,
82+
params.limit,
83+
target_ns
84+
);
7585
if let Some(response) = try_constant_vector(&params.query, target_ns) {
7686
return Ok(Json(response));
7787
}
88+
if let Some(metric) = state.parse_metric(&params.query)? {
89+
let duration_ns = metric.range.duration.as_nanoseconds();
90+
let start_ns = target_ns.saturating_sub(duration_ns);
91+
let plan = state.schema().build_metric_query(
92+
state.table(),
93+
&metric,
94+
&MetricQueryBounds {
95+
start_ns,
96+
end_ns: target_ns,
97+
},
98+
)?;
99+
log::debug!("instant metric SQL: {}", plan.sql);
100+
let rows = execute_query(state.client(), &plan.sql).await?;
101+
let samples = state.schema().parse_metric_rows(rows, &plan)?;
102+
return Ok(Json(metric_vector(target_ns, samples)));
103+
}
78104
let expr = state.parse(&params.query)?;
79105
let start_ns = target_ns.saturating_sub(DEFAULT_LOOKBACK_NS);
80106
let limit = state.clamp_limit(params.limit);
@@ -90,6 +116,13 @@ async fn instant_query(
90116
},
91117
)?;
92118

119+
log::debug!(
120+
"instant query SQL (start_ns={}, end_ns={}, limit={}): {}",
121+
start_ns,
122+
target_ns,
123+
limit,
124+
sql
125+
);
93126
let rows = execute_query(state.client(), &sql).await?;
94127
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
95128
Ok(Json(LokiResponse::success(streams)))
@@ -99,8 +132,14 @@ async fn range_query(
99132
State(state): State<AppState>,
100133
Query(params): Query<RangeQueryParams>,
101134
) -> Result<Json<LokiResponse>, AppError> {
102-
let expr = state.parse(&params.query)?;
103-
let _ = params.step;
135+
log::debug!(
136+
"range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?}",
137+
params.query,
138+
params.limit,
139+
params.start,
140+
params.end,
141+
params.step
142+
);
104143
let start = params
105144
.start
106145
.ok_or_else(|| AppError::BadRequest("start is required".into()))?;
@@ -114,6 +153,37 @@ async fn range_query(
114153
));
115154
}
116155

156+
if let Some(metric) = state.parse_metric(&params.query)? {
157+
let step_raw = params
158+
.step
159+
.as_deref()
160+
.ok_or_else(|| AppError::BadRequest("step is required for metric queries".into()))?;
161+
let step_duration = parse_step_duration(step_raw)?;
162+
let step_ns = step_duration.as_nanoseconds();
163+
let window_ns = metric.range.duration.as_nanoseconds();
164+
if step_ns != window_ns {
165+
return Err(AppError::BadRequest(
166+
"metric range queries require step to match the range selector duration".into(),
167+
));
168+
}
169+
let plan = state.schema().build_metric_range_query(
170+
state.table(),
171+
&metric,
172+
&MetricRangeQueryBounds {
173+
start_ns: start,
174+
end_ns: end,
175+
step_ns,
176+
window_ns,
177+
},
178+
)?;
179+
log::debug!("range metric SQL: {}", plan.sql);
180+
let rows = execute_query(state.client(), &plan.sql).await?;
181+
let samples = state.schema().parse_metric_matrix_rows(rows, &plan)?;
182+
return Ok(Json(metric_matrix(samples)));
183+
}
184+
185+
let expr = state.parse(&params.query)?;
186+
117187
let limit = state.clamp_limit(params.limit);
118188
let sql = state.schema().build_query(
119189
state.table(),
@@ -126,11 +196,56 @@ async fn range_query(
126196
},
127197
)?;
128198

199+
log::debug!(
200+
"range query SQL (start_ns={}, end_ns={}, limit={}): {}",
201+
start,
202+
end,
203+
limit,
204+
sql
205+
);
129206
let rows = execute_query(state.client(), &sql).await?;
130207
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
131208
Ok(Json(LokiResponse::success(streams)))
132209
}
133210

211+
fn parse_step_duration(step_raw: &str) -> Result<DurationValue, AppError> {
212+
match DurationValue::parse_literal(step_raw) {
213+
Ok(value) => Ok(value),
214+
Err(literal_err) => match parse_numeric_step_seconds(step_raw) {
215+
Ok(value) => Ok(value),
216+
Err(numeric_err) => Err(AppError::BadRequest(format!(
217+
"invalid step duration `{step_raw}`: {literal_err}; {numeric_err}"
218+
))),
219+
},
220+
}
221+
}
222+
223+
fn parse_numeric_step_seconds(step_raw: &str) -> Result<DurationValue, String> {
224+
let trimmed = step_raw.trim();
225+
if trimmed.is_empty() {
226+
return Err("numeric seconds cannot be empty".into());
227+
}
228+
let seconds: f64 = trimmed
229+
.parse()
230+
.map_err(|err| format!("failed to parse numeric seconds: {err}"))?;
231+
if !seconds.is_finite() {
232+
return Err("numeric seconds must be finite".into());
233+
}
234+
if seconds <= 0.0 {
235+
return Err("numeric seconds must be positive".into());
236+
}
237+
let nanos = seconds * 1_000_000_000.0;
238+
if nanos <= 0.0 {
239+
return Err("numeric seconds are too small".into());
240+
}
241+
if nanos > i64::MAX as f64 {
242+
return Err("numeric seconds exceed supported range".into());
243+
}
244+
let nanos = nanos.round() as i64;
245+
DurationValue::new(nanos)
246+
.map_err(|err| format!("failed to convert numeric seconds to duration: {err}"))
247+
}
248+
134249
async fn label_names(
135250
State(state): State<AppState>,
136251
Query(params): Query<LabelsQueryParams>,

src/app/responses.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ use std::collections::BTreeMap;
1717
use databend_driver::Row;
1818
use serde::Serialize;
1919

20-
use crate::{databend::SchemaAdapter, error::AppError, logql::Pipeline};
20+
use crate::{
21+
databend::{MetricMatrixSample, MetricSample, SchemaAdapter},
22+
error::AppError,
23+
logql::Pipeline,
24+
};
2125

2226
pub(crate) fn rows_to_streams(
2327
schema: &SchemaAdapter,
@@ -43,6 +47,34 @@ pub(crate) fn rows_to_streams(
4347
Ok(result)
4448
}
4549

50+
pub(crate) fn metric_vector(timestamp_ns: i64, samples: Vec<MetricSample>) -> LokiResponse {
51+
let mut vectors = Vec::with_capacity(samples.len());
52+
for sample in samples {
53+
vectors.push(LokiVectorSample::new(
54+
sample.labels,
55+
timestamp_ns,
56+
sample.value,
57+
));
58+
}
59+
LokiResponse::vector(vectors)
60+
}
61+
62+
pub(crate) fn metric_matrix(samples: Vec<MetricMatrixSample>) -> LokiResponse {
63+
let mut buckets: BTreeMap<String, MatrixBucket> = BTreeMap::new();
64+
for sample in samples {
65+
let key = serde_json::to_string(&sample.labels).unwrap_or_else(|_| "{}".to_string());
66+
let bucket = buckets
67+
.entry(key)
68+
.or_insert_with(|| MatrixBucket::new(sample.labels.clone()));
69+
bucket.values.push((sample.timestamp_ns, sample.value));
70+
}
71+
let mut series = Vec::with_capacity(buckets.len());
72+
for bucket in buckets.into_values() {
73+
series.push(bucket.into_series());
74+
}
75+
LokiResponse::matrix(series)
76+
}
77+
4678
struct StreamBucket {
4779
labels: BTreeMap<String, String>,
4880
values: Vec<(i128, String)>,
@@ -70,6 +102,33 @@ impl StreamBucket {
70102
}
71103
}
72104

105+
struct MatrixBucket {
106+
labels: BTreeMap<String, String>,
107+
values: Vec<(i64, f64)>,
108+
}
109+
110+
impl MatrixBucket {
111+
fn new(labels: BTreeMap<String, String>) -> Self {
112+
Self {
113+
labels,
114+
values: Vec::new(),
115+
}
116+
}
117+
118+
fn into_series(mut self) -> LokiMatrixSeries {
119+
self.values.sort_by_key(|(ts, _)| *ts);
120+
let values = self
121+
.values
122+
.into_iter()
123+
.map(|(ts, value)| VectorValue::new(ts, value))
124+
.collect();
125+
LokiMatrixSeries {
126+
metric: self.labels,
127+
values,
128+
}
129+
}
130+
}
131+
73132
#[derive(Serialize)]
74133
pub(crate) struct LokiResponse {
75134
status: &'static str,
@@ -102,6 +161,15 @@ impl LokiResponse {
102161
},
103162
}
104163
}
164+
165+
pub(crate) fn matrix(series: Vec<LokiMatrixSeries>) -> Self {
166+
Self {
167+
status: "success",
168+
data: LokiData {
169+
result: LokiResult::Matrix { result: series },
170+
},
171+
}
172+
}
105173
}
106174

107175
#[derive(Serialize)]
@@ -117,6 +185,8 @@ enum LokiResult {
117185
Streams { result: Vec<LokiStream> },
118186
#[serde(rename = "vector")]
119187
Vector { result: Vec<LokiVectorSample> },
188+
#[serde(rename = "matrix")]
189+
Matrix { result: Vec<LokiMatrixSeries> },
120190
}
121191

122192
#[derive(Serialize)]
@@ -138,6 +208,19 @@ impl LokiVectorSample {
138208
value: VectorValue::new(timestamp_ns, value),
139209
}
140210
}
211+
212+
pub(crate) fn new(metric: BTreeMap<String, String>, timestamp_ns: i64, value: f64) -> Self {
213+
Self {
214+
metric,
215+
value: VectorValue::new(timestamp_ns, value),
216+
}
217+
}
218+
}
219+
220+
#[derive(Serialize)]
221+
pub(crate) struct LokiMatrixSeries {
222+
metric: BTreeMap<String, String>,
223+
values: Vec<VectorValue>,
141224
}
142225

143226
struct VectorValue {

src/app/state.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
resolve_table_ref,
2222
},
2323
error::AppError,
24-
logql::{LogqlExpr, LogqlParser},
24+
logql::{LogqlExpr, LogqlParser, MetricExpr},
2525
};
2626

2727
pub(crate) const DEFAULT_LIMIT: u64 = 500;
@@ -84,6 +84,10 @@ impl AppState {
8484
self.parser.parse(query).map_err(AppError::from)
8585
}
8686

87+
pub fn parse_metric(&self, query: &str) -> Result<Option<MetricExpr>, AppError> {
88+
self.parser.parse_metric(query).map_err(AppError::from)
89+
}
90+
8791
pub fn clamp_limit(&self, requested: Option<u64>) -> u64 {
8892
requested
8993
.and_then(|value| (value > 0).then_some(value))

0 commit comments

Comments
 (0)