diff --git a/README.md b/README.md index c0018e8..cfaf515 100644 --- a/README.md +++ b/README.md @@ -122,12 +122,24 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha | Endpoint | Description | | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. | -| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. The `step` parameter is parsed but ignored because the adapter streams raw log lines rather than resampled metrics. | +| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. | | `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. | | `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. | `/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order. +### Metric queries + +The adapter currently supports a narrow LogQL metric surface area: + +- Range functions: `count_over_time` and `rate`. The latter reports per-second values (`COUNT / window_seconds`). +- Optional outer aggregations: `sum`, `avg`, `min`, `max`, `count`, each with `by (...)`. `without` or other modifiers return `errorType:bad_data`. +- Pipelines: only `drop` stages are honored (labels are removed after aggregation to match Loki semantics). Any other stage still results in `errorType:bad_data`. +- `/loki/api/v1/query_range` metric calls must provide `step`, and `step` must equal the range selector duration so Databend can materialize every bucket in a single SQL statement; the adapter never fans out multiple queries or aggregates in memory. +- `/loki/api/v1/query` metric calls reuse the same expressions but evaluate them over `[time - range, time]`. + +Both schema adapters (loki VARIANT labels and flat wide tables) translate the metric expression into one SQL statement that joins generated buckets with the raw rows via `generate_series`, so all aggregation happens inside Databend. Non-metric queries continue to stream raw logs. + ## Logging By default the adapter configures `env_logger` with `databend_loki_adapter` at `info` level and every other module at `warn`. This keeps the startup flow visible without flooding the console with dependency logs. To override the levels, set `RUST_LOG` just like any other `env_logger` application, e.g.: diff --git a/src/app/handlers.rs b/src/app/handlers.rs index 629b4ff..c989ce6 100644 --- a/src/app/handlers.rs +++ b/src/app/handlers.rs @@ -26,12 +26,16 @@ use serde::Deserialize; use std::time::Instant; use crate::{ - databend::{LabelQueryBounds, QueryBounds, SqlOrder, execute_query}, + databend::{ + LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder, + execute_query, + }, error::AppError, + logql::DurationValue, }; use super::{ - responses::{LabelsResponse, LokiResponse, rows_to_streams}, + responses::{LabelsResponse, LokiResponse, metric_matrix, metric_vector, rows_to_streams}, state::{AppState, DEFAULT_LOOKBACK_NS}, }; @@ -72,9 +76,31 @@ async fn instant_query( Query(params): Query, ) -> Result, AppError> { let target_ns = params.time.unwrap_or_else(current_time_ns); + log::debug!( + "instant query received: query=`{}` limit={:?} time_ns={}", + params.query, + params.limit, + target_ns + ); if let Some(response) = try_constant_vector(¶ms.query, target_ns) { return Ok(Json(response)); } + if let Some(metric) = state.parse_metric(¶ms.query)? { + let duration_ns = metric.range.duration.as_nanoseconds(); + let start_ns = target_ns.saturating_sub(duration_ns); + let plan = state.schema().build_metric_query( + state.table(), + &metric, + &MetricQueryBounds { + start_ns, + end_ns: target_ns, + }, + )?; + log::debug!("instant metric SQL: {}", plan.sql); + let rows = execute_query(state.client(), &plan.sql).await?; + let samples = state.schema().parse_metric_rows(rows, &plan)?; + return Ok(Json(metric_vector(target_ns, samples))); + } let expr = state.parse(¶ms.query)?; let start_ns = target_ns.saturating_sub(DEFAULT_LOOKBACK_NS); let limit = state.clamp_limit(params.limit); @@ -90,6 +116,13 @@ async fn instant_query( }, )?; + log::debug!( + "instant query SQL (start_ns={}, end_ns={}, limit={}): {}", + start_ns, + target_ns, + limit, + sql + ); let rows = execute_query(state.client(), &sql).await?; let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?; Ok(Json(LokiResponse::success(streams))) @@ -99,8 +132,14 @@ async fn range_query( State(state): State, Query(params): Query, ) -> Result, AppError> { - let expr = state.parse(¶ms.query)?; - let _ = params.step; + log::debug!( + "range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?}", + params.query, + params.limit, + params.start, + params.end, + params.step + ); let start = params .start .ok_or_else(|| AppError::BadRequest("start is required".into()))?; @@ -114,6 +153,37 @@ async fn range_query( )); } + if let Some(metric) = state.parse_metric(¶ms.query)? { + let step_raw = params + .step + .as_deref() + .ok_or_else(|| AppError::BadRequest("step is required for metric queries".into()))?; + let step_duration = parse_step_duration(step_raw)?; + let step_ns = step_duration.as_nanoseconds(); + let window_ns = metric.range.duration.as_nanoseconds(); + if step_ns != window_ns { + return Err(AppError::BadRequest( + "metric range queries require step to match the range selector duration".into(), + )); + } + let plan = state.schema().build_metric_range_query( + state.table(), + &metric, + &MetricRangeQueryBounds { + start_ns: start, + end_ns: end, + step_ns, + window_ns, + }, + )?; + log::debug!("range metric SQL: {}", plan.sql); + let rows = execute_query(state.client(), &plan.sql).await?; + let samples = state.schema().parse_metric_matrix_rows(rows, &plan)?; + return Ok(Json(metric_matrix(samples))); + } + + let expr = state.parse(¶ms.query)?; + let limit = state.clamp_limit(params.limit); let sql = state.schema().build_query( state.table(), @@ -126,11 +196,56 @@ async fn range_query( }, )?; + log::debug!( + "range query SQL (start_ns={}, end_ns={}, limit={}): {}", + start, + end, + limit, + sql + ); let rows = execute_query(state.client(), &sql).await?; let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?; Ok(Json(LokiResponse::success(streams))) } +fn parse_step_duration(step_raw: &str) -> Result { + match DurationValue::parse_literal(step_raw) { + Ok(value) => Ok(value), + Err(literal_err) => match parse_numeric_step_seconds(step_raw) { + Ok(value) => Ok(value), + Err(numeric_err) => Err(AppError::BadRequest(format!( + "invalid step duration `{step_raw}`: {literal_err}; {numeric_err}" + ))), + }, + } +} + +fn parse_numeric_step_seconds(step_raw: &str) -> Result { + let trimmed = step_raw.trim(); + if trimmed.is_empty() { + return Err("numeric seconds cannot be empty".into()); + } + let seconds: f64 = trimmed + .parse() + .map_err(|err| format!("failed to parse numeric seconds: {err}"))?; + if !seconds.is_finite() { + return Err("numeric seconds must be finite".into()); + } + if seconds <= 0.0 { + return Err("numeric seconds must be positive".into()); + } + let nanos = seconds * 1_000_000_000.0; + if nanos <= 0.0 { + return Err("numeric seconds are too small".into()); + } + if nanos > i64::MAX as f64 { + return Err("numeric seconds exceed supported range".into()); + } + let nanos = nanos.round() as i64; + DurationValue::new(nanos) + .map_err(|err| format!("failed to convert numeric seconds to duration: {err}")) +} + async fn label_names( State(state): State, Query(params): Query, diff --git a/src/app/responses.rs b/src/app/responses.rs index 320c5bd..d31187d 100644 --- a/src/app/responses.rs +++ b/src/app/responses.rs @@ -17,7 +17,11 @@ use std::collections::BTreeMap; use databend_driver::Row; use serde::Serialize; -use crate::{databend::SchemaAdapter, error::AppError, logql::Pipeline}; +use crate::{ + databend::{MetricMatrixSample, MetricSample, SchemaAdapter}, + error::AppError, + logql::Pipeline, +}; pub(crate) fn rows_to_streams( schema: &SchemaAdapter, @@ -43,6 +47,34 @@ pub(crate) fn rows_to_streams( Ok(result) } +pub(crate) fn metric_vector(timestamp_ns: i64, samples: Vec) -> LokiResponse { + let mut vectors = Vec::with_capacity(samples.len()); + for sample in samples { + vectors.push(LokiVectorSample::new( + sample.labels, + timestamp_ns, + sample.value, + )); + } + LokiResponse::vector(vectors) +} + +pub(crate) fn metric_matrix(samples: Vec) -> LokiResponse { + let mut buckets: BTreeMap = BTreeMap::new(); + for sample in samples { + let key = serde_json::to_string(&sample.labels).unwrap_or_else(|_| "{}".to_string()); + let bucket = buckets + .entry(key) + .or_insert_with(|| MatrixBucket::new(sample.labels.clone())); + bucket.values.push((sample.timestamp_ns, sample.value)); + } + let mut series = Vec::with_capacity(buckets.len()); + for bucket in buckets.into_values() { + series.push(bucket.into_series()); + } + LokiResponse::matrix(series) +} + struct StreamBucket { labels: BTreeMap, values: Vec<(i128, String)>, @@ -70,6 +102,33 @@ impl StreamBucket { } } +struct MatrixBucket { + labels: BTreeMap, + values: Vec<(i64, f64)>, +} + +impl MatrixBucket { + fn new(labels: BTreeMap) -> Self { + Self { + labels, + values: Vec::new(), + } + } + + fn into_series(mut self) -> LokiMatrixSeries { + self.values.sort_by_key(|(ts, _)| *ts); + let values = self + .values + .into_iter() + .map(|(ts, value)| VectorValue::new(ts, value)) + .collect(); + LokiMatrixSeries { + metric: self.labels, + values, + } + } +} + #[derive(Serialize)] pub(crate) struct LokiResponse { status: &'static str, @@ -102,6 +161,15 @@ impl LokiResponse { }, } } + + pub(crate) fn matrix(series: Vec) -> Self { + Self { + status: "success", + data: LokiData { + result: LokiResult::Matrix { result: series }, + }, + } + } } #[derive(Serialize)] @@ -117,6 +185,8 @@ enum LokiResult { Streams { result: Vec }, #[serde(rename = "vector")] Vector { result: Vec }, + #[serde(rename = "matrix")] + Matrix { result: Vec }, } #[derive(Serialize)] @@ -138,6 +208,19 @@ impl LokiVectorSample { value: VectorValue::new(timestamp_ns, value), } } + + pub(crate) fn new(metric: BTreeMap, timestamp_ns: i64, value: f64) -> Self { + Self { + metric, + value: VectorValue::new(timestamp_ns, value), + } + } +} + +#[derive(Serialize)] +pub(crate) struct LokiMatrixSeries { + metric: BTreeMap, + values: Vec, } struct VectorValue { diff --git a/src/app/state.rs b/src/app/state.rs index 6148c5c..1398334 100644 --- a/src/app/state.rs +++ b/src/app/state.rs @@ -21,7 +21,7 @@ use crate::{ resolve_table_ref, }, error::AppError, - logql::{LogqlExpr, LogqlParser}, + logql::{LogqlExpr, LogqlParser, MetricExpr}, }; pub(crate) const DEFAULT_LIMIT: u64 = 500; @@ -84,6 +84,10 @@ impl AppState { self.parser.parse(query).map_err(AppError::from) } + pub fn parse_metric(&self, query: &str) -> Result, AppError> { + self.parser.parse_metric(query).map_err(AppError::from) + } + pub fn clamp_limit(&self, requested: Option) -> u64 { requested .and_then(|value| (value > 0).then_some(value)) diff --git a/src/databend/core.rs b/src/databend/core.rs index 6ac0cca..2cdb7c8 100644 --- a/src/databend/core.rs +++ b/src/databend/core.rs @@ -15,12 +15,12 @@ use std::{collections::BTreeMap, fmt::Display}; use chrono::{TimeZone, Utc}; -use databend_driver::{Client, Row, Value}; +use databend_driver::{Client, NumberValue, Row, Value}; use url::Url; use crate::{ error::AppError, - logql::{LineFilter, LineFilterOp, LogqlExpr}, + logql::{LineFilter, LineFilterOp, LogqlExpr, MetricExpr, RangeFunction, VectorAggregationOp}, }; use super::{flat::FlatSchema, loki::LokiSchema}; @@ -96,6 +96,46 @@ impl SchemaAdapter { } } + pub fn build_metric_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricQueryBounds, + ) -> Result { + match self { + SchemaAdapter::Loki(schema) => schema.build_metric_query(table, expr, bounds), + SchemaAdapter::Flat(schema) => schema.build_metric_query(table, expr, bounds), + } + } + + pub fn parse_metric_rows( + &self, + rows: Vec, + plan: &MetricQueryPlan, + ) -> Result, AppError> { + parse_metric_rows(rows, plan) + } + + pub fn build_metric_range_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricRangeQueryBounds, + ) -> Result { + match self { + SchemaAdapter::Loki(schema) => schema.build_metric_range_query(table, expr, bounds), + SchemaAdapter::Flat(schema) => schema.build_metric_range_query(table, expr, bounds), + } + } + + pub fn parse_metric_matrix_rows( + &self, + rows: Vec, + plan: &MetricRangeQueryPlan, + ) -> Result, AppError> { + parse_metric_matrix_rows(rows, plan) + } + pub async fn list_labels( &self, client: &Client, @@ -133,6 +173,18 @@ pub struct QueryBounds { pub order: SqlOrder, } +pub struct MetricQueryBounds { + pub start_ns: i64, + pub end_ns: i64, +} + +pub struct MetricRangeQueryBounds { + pub start_ns: i64, + pub end_ns: i64, + pub step_ns: i64, + pub window_ns: i64, +} + #[derive(Clone, Copy, Default)] pub struct LabelQueryBounds { pub start_ns: Option, @@ -161,6 +213,37 @@ pub struct LogEntry { pub line: String, } +#[derive(Clone)] +pub struct MetricSample { + pub labels: BTreeMap, + pub value: f64, +} + +#[derive(Clone)] +pub struct MetricMatrixSample { + pub labels: BTreeMap, + pub timestamp_ns: i64, + pub value: f64, +} + +#[derive(Clone)] +pub struct MetricQueryPlan { + pub sql: String, + pub labels: MetricLabelsPlan, +} + +#[derive(Clone)] +pub struct MetricRangeQueryPlan { + pub sql: String, + pub labels: MetricLabelsPlan, +} + +#[derive(Clone)] +pub enum MetricLabelsPlan { + LokiFull, + Columns(Vec), +} + #[derive(Clone)] pub(crate) struct TableColumn { pub(crate) name: String, @@ -403,3 +486,230 @@ fn escape_sql(value: &str) -> String { pub(crate) fn quote_ident(ident: &str) -> String { format!("`{}`", ident.replace('`', "``")) } + +pub(crate) fn format_float_literal(value: f64) -> String { + let mut text = format!("{value:.9}"); + if text.contains('.') { + while text.ends_with('0') { + text.pop(); + } + if text.ends_with('.') { + text.pop(); + } + } + if text.is_empty() { "0".into() } else { text } +} + +pub(crate) fn aggregate_value_select(op: VectorAggregationOp, value_ident: &str) -> String { + match op { + VectorAggregationOp::Sum => format!("SUM({value_ident}) AS value"), + VectorAggregationOp::Avg => format!("AVG({value_ident}) AS value"), + VectorAggregationOp::Min => format!("MIN({value_ident}) AS value"), + VectorAggregationOp::Max => format!("MAX({value_ident}) AS value"), + VectorAggregationOp::Count => "COUNT(*) AS value".to_string(), + } +} + +pub(crate) fn range_bucket_value_expression( + function: RangeFunction, + window_ns: i64, + ts_ident: &str, +) -> String { + match function { + RangeFunction::CountOverTime => format!("COUNT({ts_ident})"), + RangeFunction::Rate => { + let seconds = window_ns as f64 / 1_000_000_000_f64; + let literal = format_float_literal(seconds); + format!("COUNT({ts_ident}) / {literal}") + } + } +} + +pub(crate) fn metric_bucket_cte(bounds: &MetricRangeQueryBounds) -> Result { + let step_us = bounds + .step_ns + .checked_div(1_000) + .ok_or_else(|| AppError::BadRequest("metric step must be at least 1 microsecond".into()))?; + if step_us == 0 { + return Err(AppError::BadRequest( + "metric step must be at least 1 microsecond".into(), + )); + } + Ok(format!( + "SELECT generate_series AS bucket_start FROM generate_series({start}, {end}, {step})", + start = timestamp_literal(bounds.start_ns)?, + end = timestamp_literal(bounds.end_ns)?, + step = step_us + )) +} + +pub(crate) fn timestamp_offset_expr(base_expr: &str, offset_ns: i64) -> String { + let mut expr = base_expr.to_string(); + let seconds = offset_ns / 1_000_000_000; + let micros = (offset_ns % 1_000_000_000) / 1_000; + if seconds != 0 { + expr = format!("date_add('second', {seconds}, {expr})"); + } + if micros != 0 { + expr = format!("date_add('microsecond', {micros}, {expr})"); + } + expr +} + +fn timestamp_value_to_i64(value: &Value) -> Result { + let ns = value_to_timestamp(value)?; + i64::try_from(ns) + .map_err(|_| AppError::Internal("timestamp column is outside supported range".into())) +} + +fn parse_metric_rows( + rows: Vec, + plan: &MetricQueryPlan, +) -> Result, AppError> { + let mut samples = Vec::with_capacity(rows.len()); + for row in rows { + let values = row.values(); + if values.is_empty() { + continue; + } + let value_cell = values + .last() + .ok_or_else(|| AppError::Internal("metric row is missing value column".into()))?; + let value = metric_value(value_cell)?; + let labels = match &plan.labels { + MetricLabelsPlan::LokiFull => { + if values.len() < 2 { + return Err(AppError::Internal( + "metric row is missing labels column".into(), + )); + } + parse_labels_value(&values[0])? + } + MetricLabelsPlan::Columns(names) => { + if values.len() < names.len() + 1 { + return Err(AppError::Internal( + "metric row returned fewer columns than expected".into(), + )); + } + let mut labels = BTreeMap::new(); + for (idx, key) in names.iter().enumerate() { + if let Some(text) = metric_label_string(&values[idx]) { + labels.insert(key.clone(), text); + } + } + labels + } + }; + samples.push(MetricSample { labels, value }); + } + Ok(samples) +} + +fn parse_metric_matrix_rows( + rows: Vec, + plan: &MetricRangeQueryPlan, +) -> Result, AppError> { + let mut samples = Vec::with_capacity(rows.len()); + for row in rows { + let values = row.values(); + if values.len() < 2 { + continue; + } + let bucket_ns = timestamp_value_to_i64(&values[0])?; + let value_cell = values + .last() + .ok_or_else(|| AppError::Internal("metric row is missing value column".into()))?; + let value = metric_value(value_cell)?; + let labels = match &plan.labels { + MetricLabelsPlan::LokiFull => { + if values.len() < 3 { + return Err(AppError::Internal( + "metric row is missing labels column".into(), + )); + } + let label_value = &values[1]; + if matches!(label_value, Value::Null) { + continue; + } + parse_labels_value(label_value)? + } + MetricLabelsPlan::Columns(names) => { + if values.len() < names.len() + 2 { + return Err(AppError::Internal( + "metric row returned fewer columns than expected".into(), + )); + } + let mut labels = BTreeMap::new(); + for (idx, key) in names.iter().enumerate() { + if let Some(text) = metric_label_string(&values[idx + 1]) { + labels.insert(key.clone(), text); + } + } + labels + } + }; + samples.push(MetricMatrixSample { + labels, + timestamp_ns: bucket_ns, + value, + }); + } + Ok(samples) +} + +fn metric_label_string(value: &Value) -> Option { + match value { + Value::Null => None, + Value::String(text) => Some(text.clone()), + Value::Variant(text) => Some(text.clone()), + Value::Boolean(flag) => Some(flag.to_string()), + Value::Number(num) => Some(num.to_string()), + other => { + let text = other.to_string(); + (!text.is_empty()).then_some(text) + } + } +} + +fn metric_value(value: &Value) -> Result { + match value { + Value::Number(num) => metric_number_to_f64(num), + Value::String(text) => text + .parse::() + .map_err(|_| AppError::Internal("metric value is not numeric".into())), + other => Err(AppError::Internal(format!( + "metric value must be numeric, found {}", + other + ))), + } +} + +fn metric_number_to_f64(num: &NumberValue) -> Result { + let value = match num { + NumberValue::Int8(v) => *v as f64, + NumberValue::Int16(v) => *v as f64, + NumberValue::Int32(v) => *v as f64, + NumberValue::Int64(v) => *v as f64, + NumberValue::UInt8(v) => *v as f64, + NumberValue::UInt16(v) => *v as f64, + NumberValue::UInt32(v) => *v as f64, + NumberValue::UInt64(v) => *v as f64, + NumberValue::Float32(v) => *v as f64, + NumberValue::Float64(v) => *v, + NumberValue::Decimal64(v, size) => { + let scale = size.scale as i32; + let divisor = 10_f64.powi(scale); + *v as f64 / divisor + } + NumberValue::Decimal128(v, size) => { + let scale = size.scale as i32; + let divisor = 10_f64.powi(scale); + *v as f64 / divisor + } + NumberValue::Decimal256(_, _) => num + .to_string() + .parse() + .map_err(|_| AppError::Internal("metric decimal value overflowed".into()))?, + }; + Ok(value) +} diff --git a/src/databend/flat.rs b/src/databend/flat.rs index aa41803..011317f 100644 --- a/src/databend/flat.rs +++ b/src/databend/flat.rs @@ -19,14 +19,19 @@ use log::info; use crate::{ error::AppError, - logql::{LabelMatcher, LabelOp, LogqlExpr}, + logql::{ + GroupModifier, LabelMatcher, LabelOp, LogqlExpr, MetricExpr, RangeFunction, + VectorAggregation, + }, }; use super::core::{ - LabelQueryBounds, LogEntry, QueryBounds, SchemaConfig, TableColumn, TableRef, - ensure_line_column, ensure_timestamp_column, escape, execute_query, is_line_candidate, - is_numeric_type, line_filter_clause, matches_named_column, missing_required_column, - quote_ident, timestamp_literal, value_to_timestamp, + LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan, + MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef, + aggregate_value_select, ensure_line_column, ensure_timestamp_column, escape, execute_query, + format_float_literal, is_line_candidate, is_numeric_type, line_filter_clause, + matches_named_column, metric_bucket_cte, missing_required_column, quote_ident, + range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, value_to_timestamp, }; #[derive(Clone)] @@ -70,7 +75,7 @@ impl FlatSchema { } for matcher in &expr.selectors { - clauses.push(label_clause_flat(matcher, &self.label_cols)?); + clauses.push(label_clause_flat(matcher, &self.label_cols, None)?); } clauses.extend( expr.filters @@ -102,6 +107,426 @@ impl FlatSchema { )) } + pub(crate) fn build_metric_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricQueryBounds, + ) -> Result { + let drop_labels = expr + .range + .selector + .pipeline + .metric_drop_labels() + .map_err(AppError::BadRequest)?; + let mut clauses = Vec::new(); + let ts_col = quote_ident(&self.timestamp_col); + clauses.push(format!( + "{ts_col} >= {}", + timestamp_literal(bounds.start_ns)? + )); + clauses.push(format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?)); + for matcher in &expr.range.selector.selectors { + clauses.push(label_clause_flat(matcher, &self.label_cols, None)?); + } + clauses.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(quote_ident(&self.line_col), f)), + ); + let where_clause = if clauses.is_empty() { + "1=1".to_string() + } else { + clauses.join(" AND ") + }; + let value_expr = + range_value_expression(expr.range.function, expr.range.duration.as_nanoseconds()); + match &expr.aggregation { + None => self.metric_stream_sql(table, &where_clause, &value_expr, &drop_labels), + Some(aggregation) => { + self.metric_group_sql(table, &where_clause, &value_expr, aggregation, &drop_labels) + } + } + } + + pub(crate) fn build_metric_range_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricRangeQueryBounds, + ) -> Result { + let drop_labels = expr + .range + .selector + .pipeline + .metric_drop_labels() + .map_err(AppError::BadRequest)?; + let buckets_source = metric_bucket_cte(bounds)?; + let buckets_table = format!("({}) AS buckets", buckets_source); + let ts_col = format!("source.{}", quote_ident(&self.timestamp_col)); + let line_col = format!("source.{}", quote_ident(&self.line_col)); + let stream_ts_col = format!("stream_source.{}", quote_ident(&self.timestamp_col)); + let stream_line_col = format!("stream_source.{}", quote_ident(&self.line_col)); + let bucket_window_start = timestamp_offset_expr("bucket_start", -bounds.window_ns); + let mut join_conditions = vec![ + format!("{ts_col} >= {bucket_window_start}"), + format!("{ts_col} <= bucket_start"), + ]; + for matcher in &expr.range.selector.selectors { + join_conditions.push(label_clause_flat( + matcher, + &self.label_cols, + Some("source"), + )?); + } + join_conditions.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(line_col.clone(), f)), + ); + let join_clause = join_conditions.join(" AND "); + let stream_window_start_ns = bounds.start_ns.saturating_sub(bounds.window_ns); + let stream_start_literal = timestamp_literal(stream_window_start_ns)?; + let stream_end_literal = timestamp_literal(bounds.end_ns)?; + let mut stream_conditions = vec![ + format!("{stream_ts_col} >= {stream_start_literal}"), + format!("{stream_ts_col} <= {stream_end_literal}"), + ]; + for matcher in &expr.range.selector.selectors { + stream_conditions.push(label_clause_flat( + matcher, + &self.label_cols, + Some("stream_source"), + )?); + } + stream_conditions.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(stream_line_col.clone(), f)), + ); + let stream_where_clause = stream_conditions.join(" AND "); + let value_expr = + range_bucket_value_expression(expr.range.function, bounds.window_ns, &ts_col); + match &expr.aggregation { + None => self.metric_range_stream_sql( + table, + &buckets_table, + &join_clause, + &value_expr, + &drop_labels, + &stream_where_clause, + ), + Some(aggregation) => self.metric_range_group_sql( + table, + &buckets_table, + &join_clause, + &value_expr, + aggregation, + &drop_labels, + ), + } + } + + fn metric_stream_sql( + &self, + table: &TableRef, + where_clause: &str, + value_expr: &str, + drop_labels: &[String], + ) -> Result { + let retained: Vec<&FlatLabelColumn> = self + .label_cols + .iter() + .filter(|col| !is_dropped_label(&col.name, drop_labels)) + .collect(); + let mut select_parts: Vec = + retained.iter().map(|col| quote_ident(&col.name)).collect(); + select_parts.push(format!("{value_expr} AS value")); + let group_by_clause = if retained.is_empty() { + String::new() + } else { + format!( + " GROUP BY {}", + retained + .iter() + .map(|col| quote_ident(&col.name)) + .collect::>() + .join(", ") + ) + }; + let sql = format!( + "SELECT {select_clause} FROM {table} WHERE {where}{group_by}", + select_clause = select_parts.join(", "), + table = table.fq_name(), + where = where_clause, + group_by = group_by_clause + ); + Ok(MetricQueryPlan { + sql, + labels: MetricLabelsPlan::Columns( + retained.iter().map(|col| col.name.clone()).collect(), + ), + }) + } + + fn metric_group_sql( + &self, + table: &TableRef, + where_clause: &str, + value_expr: &str, + aggregation: &VectorAggregation, + drop_labels: &[String], + ) -> Result { + if let Some(GroupModifier::Without(labels)) = &aggregation.grouping { + return Err(AppError::BadRequest(format!( + "metric queries do not support `without` grouping ({labels:?})" + ))); + } + let group_labels = match &aggregation.grouping { + Some(GroupModifier::By(labels)) if !labels.is_empty() => labels.clone(), + _ => Vec::new(), + }; + let group_columns = self.resolve_group_columns(&group_labels, drop_labels)?; + let mut select_parts: Vec = group_columns + .iter() + .map(|column| format!("{} AS {}", column.expression(None), column.alias)) + .collect(); + select_parts.push(format!("{value_expr} AS value")); + let retained: Vec<&FlatLabelColumn> = self + .label_cols + .iter() + .filter(|col| !is_dropped_label(&col.name, drop_labels)) + .collect(); + let group_by_clause = if retained.is_empty() { + String::new() + } else { + format!( + " GROUP BY {}", + retained + .iter() + .map(|col| quote_ident(&col.name)) + .collect::>() + .join(", ") + ) + }; + let inner_sql = format!( + "SELECT {select_clause} FROM {table} WHERE {where}{group_by}", + select_clause = select_parts.join(", "), + table = table.fq_name(), + where = where_clause, + group_by = group_by_clause + ); + let alias_list: Vec = group_columns + .iter() + .map(|column| column.alias.clone()) + .collect(); + let alias_clause = alias_list.join(", "); + let aggregate = aggregate_value_select(aggregation.op, "value"); + let outer_select = if alias_list.is_empty() { + aggregate.clone() + } else { + format!("{alias_clause}, {aggregate}") + }; + let outer_group = if alias_list.is_empty() { + String::new() + } else { + format!(" GROUP BY {alias_clause}") + }; + let sql = format!( + "WITH stream_data AS ({inner}) SELECT {outer_select} FROM stream_data{outer_group}", + inner = inner_sql, + outer_select = outer_select, + outer_group = outer_group + ); + let labels = if group_labels.is_empty() { + MetricLabelsPlan::Columns(Vec::new()) + } else { + MetricLabelsPlan::Columns(group_labels) + }; + Ok(MetricQueryPlan { sql, labels }) + } + + fn metric_range_stream_sql( + &self, + table: &TableRef, + buckets_table: &str, + join_clause: &str, + value_expr: &str, + drop_labels: &[String], + stream_where_clause: &str, + ) -> Result { + let retained: Vec<&FlatLabelColumn> = self + .label_cols + .iter() + .filter(|col| !is_dropped_label(&col.name, drop_labels)) + .collect(); + let stream_projection = if retained.is_empty() { + "1 AS stream_exists".to_string() + } else { + retained + .iter() + .map(|col| qualified_column(Some("stream_source"), &col.name)) + .collect::>() + .join(", ") + }; + let stream_cte = format!( + "SELECT DISTINCT {projection} FROM {table} AS stream_source WHERE {where}", + projection = stream_projection, + table = table.fq_name(), + where = stream_where_clause, + ); + let mut select_parts = vec!["bucket_start AS bucket".to_string()]; + let mut group_parts = vec!["bucket_start".to_string()]; + for col in &retained { + let qualified = qualified_column(Some("stream_labels"), &col.name); + select_parts.push(qualified.clone()); + group_parts.push(qualified); + } + select_parts.push(format!("{value_expr} AS value")); + let label_match_clause = if retained.is_empty() { + "1=1".to_string() + } else { + retained + .iter() + .map(|col| { + format!( + "{source} = {stream}", + source = qualified_column(Some("source"), &col.name), + stream = qualified_column(Some("stream_labels"), &col.name), + ) + }) + .collect::>() + .join(" AND ") + }; + let order_clause = if retained.is_empty() { + "bucket".to_string() + } else { + let mut parts = vec!["bucket".to_string()]; + parts.extend( + retained + .iter() + .map(|col| qualified_column(Some("stream_labels"), &col.name)), + ); + parts.join(", ") + }; + let sql = format!( + "WITH stream_labels AS ({stream_cte}) \ + SELECT {select_clause} FROM {buckets} CROSS JOIN stream_labels \ + LEFT JOIN {table} AS source ON {join} AND {labels_match} \ + GROUP BY {group_by} ORDER BY {order_clause}", + stream_cte = stream_cte, + select_clause = select_parts.join(", "), + buckets = buckets_table, + table = table.fq_name(), + join = join_clause, + labels_match = label_match_clause, + group_by = group_parts.join(", "), + order_clause = order_clause + ); + Ok(MetricRangeQueryPlan { + sql, + labels: MetricLabelsPlan::Columns( + retained.iter().map(|col| col.name.clone()).collect(), + ), + }) + } + + fn metric_range_group_sql( + &self, + table: &TableRef, + buckets_table: &str, + join_clause: &str, + value_expr: &str, + aggregation: &VectorAggregation, + drop_labels: &[String], + ) -> Result { + if let Some(GroupModifier::Without(labels)) = &aggregation.grouping { + return Err(AppError::BadRequest(format!( + "metric queries do not support `without` grouping ({labels:?})" + ))); + } + let group_labels = match &aggregation.grouping { + Some(GroupModifier::By(labels)) if !labels.is_empty() => labels.clone(), + _ => Vec::new(), + }; + let group_columns = self.resolve_group_columns(&group_labels, drop_labels)?; + let mut select_parts = vec!["bucket_start AS bucket".to_string()]; + select_parts.extend( + group_columns + .iter() + .map(|column| format!("{} AS {}", column.expression(Some("source")), column.alias)), + ); + select_parts.push(format!("{value_expr} AS value")); + let mut group_parts = vec!["bucket_start".to_string()]; + group_columns.iter().for_each(|column| { + if let Some(expr) = column.group_expression(Some("source")) { + group_parts.push(expr); + } + }); + let inner_sql = format!( + "SELECT {select_clause} FROM {buckets} LEFT JOIN {table} AS source ON {join} GROUP BY {group_by}", + select_clause = select_parts.join(", "), + buckets = buckets_table, + table = table.fq_name(), + join = join_clause, + group_by = group_parts.join(", ") + ); + let alias_list: Vec = group_columns + .iter() + .map(|column| column.alias.clone()) + .collect(); + let alias_clause = alias_list.join(", "); + let aggregate = aggregate_value_select(aggregation.op, "value"); + let select_prefix = if alias_list.is_empty() { + format!("bucket, {aggregate}") + } else { + format!("bucket, {alias_clause}, {aggregate}") + }; + let group_suffix = if alias_list.is_empty() { + " GROUP BY bucket".to_string() + } else { + format!(" GROUP BY bucket, {alias_clause}") + }; + let sql = format!( + "SELECT {select_prefix} FROM ({inner}) AS stream_data{group_suffix} ORDER BY bucket", + inner = inner_sql, + select_prefix = select_prefix, + group_suffix = group_suffix + ); + let labels = if group_labels.is_empty() { + MetricLabelsPlan::Columns(Vec::new()) + } else { + MetricLabelsPlan::Columns(group_labels) + }; + Ok(MetricRangeQueryPlan { sql, labels }) + } + + fn resolve_group_columns( + &self, + labels: &[String], + drop_labels: &[String], + ) -> Result, AppError> { + let mut columns = Vec::with_capacity(labels.len()); + for (idx, label) in labels.iter().enumerate() { + let column = if is_dropped_label(label, drop_labels) { + None + } else { + find_label_column(&self.label_cols, label).map(|col| col.name.clone()) + }; + columns.push(FlatGroupColumn { + column, + alias: format!("group_{idx}"), + }); + } + Ok(columns) + } + pub(crate) fn parse_row(&self, row: &Row) -> Result { let values = row.values(); if values.len() < self.label_cols.len() + 2 { @@ -269,6 +694,7 @@ impl FlatSchema { fn label_clause_flat( matcher: &LabelMatcher, columns: &[FlatLabelColumn], + table_alias: Option<&str>, ) -> Result { let column = find_label_column(columns, &matcher.key).ok_or_else(|| { AppError::BadRequest(format!( @@ -277,9 +703,9 @@ fn label_clause_flat( )) })?; if column.is_numeric { - numeric_label_clause(column, matcher) + numeric_label_clause(column, matcher, table_alias) } else { - let column = quote_ident(&column.name); + let column = qualified_column(table_alias, &column.name); let value = escape(&matcher.value); Ok(match matcher.op { LabelOp::Eq => format!("{column} = '{value}'"), @@ -293,8 +719,9 @@ fn label_clause_flat( fn numeric_label_clause( column: &FlatLabelColumn, matcher: &LabelMatcher, + table_alias: Option<&str>, ) -> Result { - let column_ident = quote_ident(&column.name); + let column_ident = qualified_column(table_alias, &column.name); match matcher.op { LabelOp::Eq => Ok(format!( "{column_ident} = {}", @@ -398,9 +825,54 @@ fn find_label_column<'a>( .find(|col| col.name.eq_ignore_ascii_case(target)) } +fn is_dropped_label(target: &str, drop_labels: &[String]) -> bool { + drop_labels + .iter() + .any(|label| label.eq_ignore_ascii_case(target)) +} + +fn qualified_column(alias: Option<&str>, column: &str) -> String { + match alias { + Some(table) => format!("{table}.{}", quote_ident(column)), + None => quote_ident(column), + } +} + +struct FlatGroupColumn { + column: Option, + alias: String, +} + +impl FlatGroupColumn { + fn expression(&self, table_alias: Option<&str>) -> String { + match &self.column { + Some(name) => qualified_column(table_alias, name), + None => "CAST(NULL AS VARCHAR)".to_string(), + } + } + + fn group_expression(&self, table_alias: Option<&str>) -> Option { + self.column + .as_ref() + .map(|name| qualified_column(table_alias, name)) + } +} + +fn range_value_expression(function: RangeFunction, duration_ns: i64) -> String { + match function { + RangeFunction::CountOverTime => "COUNT(*)".to_string(), + RangeFunction::Rate => { + let seconds = duration_ns as f64 / 1_000_000_000_f64; + let literal = format_float_literal(seconds); + format!("COUNT(*) / {literal}") + } + } +} + #[cfg(test)] mod tests { use super::*; + use crate::logql::{DurationValue, Pipeline, RangeExpr, VectorAggregationOp}; fn matcher(key: &str, op: LabelOp, value: &str) -> LabelMatcher { LabelMatcher { @@ -425,8 +897,12 @@ mod tests { #[test] fn regex_on_string_column_uses_match() { - let clause = - label_clause_flat(&matcher("host", LabelOp::RegexEq, "api|edge"), &columns()).unwrap(); + let clause = label_clause_flat( + &matcher("host", LabelOp::RegexEq, "api|edge"), + &columns(), + None, + ) + .unwrap(); assert_eq!(clause, "match(`host`, 'api|edge')"); } @@ -435,6 +911,7 @@ mod tests { let clause = label_clause_flat( &matcher("status", LabelOp::RegexEq, "(200|202)"), &columns(), + None, ) .unwrap(); assert_eq!(clause, "`status` IN (200, 202)"); @@ -445,6 +922,7 @@ mod tests { let clause = label_clause_flat( &matcher("status", LabelOp::RegexNotEq, "(200|202)"), &columns(), + None, ) .unwrap(); assert_eq!(clause, "`status` NOT IN (200, 202)"); @@ -452,11 +930,57 @@ mod tests { #[test] fn invalid_numeric_regex_returns_error() { - let err = label_clause_flat(&matcher("status", LabelOp::RegexEq, "[23]+"), &columns()) - .unwrap_err(); + let err = label_clause_flat( + &matcher("status", LabelOp::RegexEq, "[23]+"), + &columns(), + None, + ) + .unwrap_err(); match err { AppError::BadRequest(_) => {} other => panic!("unexpected error: {other:?}"), } } + + #[test] + fn metric_group_allows_missing_label() { + let schema = FlatSchema { + timestamp_col: "timestamp".to_string(), + line_col: "line".to_string(), + label_cols: vec![FlatLabelColumn { + name: "host".to_string(), + is_numeric: false, + }], + }; + let expr = MetricExpr { + range: RangeExpr { + function: RangeFunction::CountOverTime, + selector: LogqlExpr { + selectors: Vec::new(), + filters: Vec::new(), + pipeline: Pipeline::default(), + }, + duration: DurationValue::new(1_000_000_000).unwrap(), + }, + aggregation: Some(VectorAggregation { + op: VectorAggregationOp::Sum, + grouping: Some(GroupModifier::By(vec!["level".to_string()])), + }), + }; + let table = TableRef { + database: "db".to_string(), + table: "tbl".to_string(), + }; + let plan = schema + .build_metric_query( + &table, + &expr, + &MetricQueryBounds { + start_ns: 0, + end_ns: 1_000_000_000, + }, + ) + .unwrap(); + assert!(plan.sql.contains("CAST(NULL AS VARCHAR) AS group_0")); + } } diff --git a/src/databend/loki.rs b/src/databend/loki.rs index cd3a00f..fd1591f 100644 --- a/src/databend/loki.rs +++ b/src/databend/loki.rs @@ -16,14 +16,20 @@ use databend_driver::{Client, Row}; use crate::{ error::AppError, - logql::{LabelMatcher, LabelOp, LogqlExpr}, + logql::{ + GroupModifier, LabelMatcher, LabelOp, LogqlExpr, MetricExpr, RangeFunction, + VectorAggregation, + }, }; use super::core::{ - LabelQueryBounds, LogEntry, QueryBounds, SchemaConfig, TableColumn, TableRef, - ensure_labels_column, ensure_line_column, ensure_timestamp_column, escape, execute_query, - line_filter_clause, matches_line_column, matches_named_column, missing_required_column, - parse_labels_value, quote_ident, timestamp_literal, value_to_timestamp, + LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan, + MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef, + aggregate_value_select, ensure_labels_column, ensure_line_column, ensure_timestamp_column, + escape, execute_query, format_float_literal, line_filter_clause, matches_line_column, + matches_named_column, metric_bucket_cte, missing_required_column, parse_labels_value, + quote_ident, range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, + value_to_timestamp, }; #[derive(Clone)] @@ -84,6 +90,327 @@ impl LokiSchema { )) } + pub(crate) fn build_metric_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricQueryBounds, + ) -> Result { + let drop_labels = expr + .range + .selector + .pipeline + .metric_drop_labels() + .map_err(AppError::BadRequest)?; + let mut clauses = Vec::new(); + let ts = quote_ident(&self.timestamp_col); + clauses.push(format!("{ts} >= {}", timestamp_literal(bounds.start_ns)?)); + clauses.push(format!("{ts} <= {}", timestamp_literal(bounds.end_ns)?)); + clauses.extend( + expr.range + .selector + .selectors + .iter() + .map(|m| label_clause_loki(m, quote_ident(&self.labels_col))), + ); + clauses.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(quote_ident(&self.line_col), f)), + ); + let where_clause = if clauses.is_empty() { + "1=1".to_string() + } else { + clauses.join(" AND ") + }; + let value_expr = + range_value_expression(expr.range.function, expr.range.duration.as_nanoseconds()); + match &expr.aggregation { + None => self.metric_streams_sql(table, &where_clause, &value_expr, &drop_labels), + Some(aggregation) => self.metric_aggregation_sql( + table, + &where_clause, + &value_expr, + aggregation, + &drop_labels, + ), + } + } + + pub(crate) fn build_metric_range_query( + &self, + table: &TableRef, + expr: &MetricExpr, + bounds: &MetricRangeQueryBounds, + ) -> Result { + let drop_labels = expr + .range + .selector + .pipeline + .metric_drop_labels() + .map_err(AppError::BadRequest)?; + let buckets_source = metric_bucket_cte(bounds)?; + let buckets_table = format!("({}) AS buckets", buckets_source); + let ts_col = format!("source.{}", quote_ident(&self.timestamp_col)); + let labels_col = format!("source.{}", quote_ident(&self.labels_col)); + let line_col = format!("source.{}", quote_ident(&self.line_col)); + let stream_ts_col = format!("stream_source.{}", quote_ident(&self.timestamp_col)); + let stream_labels_col = format!("stream_source.{}", quote_ident(&self.labels_col)); + let stream_line_col = format!("stream_source.{}", quote_ident(&self.line_col)); + let bucket_window_start = timestamp_offset_expr("bucket_start", -bounds.window_ns); + let mut join_conditions = vec![ + format!("{ts_col} >= {bucket_window_start}"), + format!("{ts_col} <= bucket_start"), + ]; + join_conditions.extend( + expr.range + .selector + .selectors + .iter() + .map(|m| label_clause_loki(m, labels_col.clone())), + ); + join_conditions.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(line_col.clone(), f)), + ); + let join_clause = join_conditions.join(" AND "); + let stream_window_start_ns = bounds.start_ns.saturating_sub(bounds.window_ns); + let stream_start_literal = timestamp_literal(stream_window_start_ns)?; + let stream_end_literal = timestamp_literal(bounds.end_ns)?; + let mut stream_conditions = vec![ + format!("{stream_ts_col} >= {stream_start_literal}"), + format!("{stream_ts_col} <= {stream_end_literal}"), + ]; + stream_conditions.extend( + expr.range + .selector + .selectors + .iter() + .map(|m| label_clause_loki(m, stream_labels_col.clone())), + ); + stream_conditions.extend( + expr.range + .selector + .filters + .iter() + .map(|f| line_filter_clause(stream_line_col.clone(), f)), + ); + let stream_where_clause = stream_conditions.join(" AND "); + let value_expr = + range_bucket_value_expression(expr.range.function, bounds.window_ns, &ts_col); + match &expr.aggregation { + None => self.metric_range_stream_sql(&MetricRangeStreamContext { + table, + buckets_table: &buckets_table, + labels_column: &labels_col, + stream_labels_column: &stream_labels_col, + value_expr: &value_expr, + join_clause: &join_clause, + stream_where_clause: &stream_where_clause, + drop_labels: &drop_labels, + }), + Some(aggregation) => { + self.metric_range_aggregation_sql(&MetricRangeAggregationContext { + table, + buckets_table: &buckets_table, + labels_column: &labels_col, + value_expr: &value_expr, + join_clause: &join_clause, + aggregation, + drop_labels: &drop_labels, + }) + } + } + } + + fn metric_streams_sql( + &self, + table: &TableRef, + where_clause: &str, + value_expr: &str, + drop_labels: &[String], + ) -> Result { + let labels_column = quote_ident(&self.labels_col); + let labels_expr = drop_labels_expr(&labels_column, drop_labels); + let sql = format!( + "SELECT {labels_expr} AS labels, {value_expr} AS value FROM {table} \ + WHERE {where} GROUP BY {labels_expr}", + table = table.fq_name(), + where = where_clause, + labels_expr = labels_expr + ); + Ok(MetricQueryPlan { + sql, + labels: MetricLabelsPlan::LokiFull, + }) + } + + fn metric_aggregation_sql( + &self, + table: &TableRef, + where_clause: &str, + value_expr: &str, + aggregation: &VectorAggregation, + drop_labels: &[String], + ) -> Result { + if let Some(GroupModifier::Without(labels)) = &aggregation.grouping { + return Err(AppError::BadRequest(format!( + "metric queries do not support `without` grouping ({labels:?})" + ))); + } + let label_column = quote_ident(&self.labels_col); + let grouped_labels_expr = drop_labels_expr(&label_column, drop_labels); + let grouping_labels = match &aggregation.grouping { + Some(GroupModifier::By(labels)) if !labels.is_empty() => labels.clone(), + _ => Vec::new(), + }; + let group_columns = build_loki_group_columns(&grouping_labels, &grouped_labels_expr); + + let mut select_parts: Vec = group_columns + .iter() + .map(|column| format!("{} AS {}", column.expr, column.alias)) + .collect(); + select_parts.push(format!("{value_expr} AS value")); + let mut group_parts = vec![grouped_labels_expr.clone()]; + group_parts.extend(group_columns.iter().map(|column| column.expr.clone())); + let inner_sql = format!( + "SELECT {select_clause} FROM {table} WHERE {where} GROUP BY {group_by}", + select_clause = select_parts.join(", "), + table = table.fq_name(), + where = where_clause, + group_by = group_parts.join(", ") + ); + + let alias_list: Vec = group_columns + .iter() + .map(|column| column.alias.clone()) + .collect(); + let alias_clause = alias_list.join(", "); + let aggregate = aggregate_value_select(aggregation.op, "value"); + let outer_select = if alias_list.is_empty() { + aggregate.clone() + } else { + format!("{alias_clause}, {aggregate}") + }; + let outer_group = if alias_list.is_empty() { + String::new() + } else { + format!(" GROUP BY {alias_clause}") + }; + let sql = format!( + "WITH stream_data AS ({inner}) SELECT {outer_select} FROM stream_data{outer_group}", + inner = inner_sql, + outer_select = outer_select, + outer_group = outer_group + ); + + let labels = if grouping_labels.is_empty() { + MetricLabelsPlan::Columns(Vec::new()) + } else { + MetricLabelsPlan::Columns(grouping_labels) + }; + Ok(MetricQueryPlan { sql, labels }) + } + + fn metric_range_stream_sql( + &self, + ctx: &MetricRangeStreamContext<'_>, + ) -> Result { + let labels_expr = drop_labels_expr(ctx.labels_column, ctx.drop_labels); + let stream_labels_expr = drop_labels_expr(ctx.stream_labels_column, ctx.drop_labels); + let stream_cte = format!( + "SELECT DISTINCT {stream_labels_expr} AS labels FROM {table} AS stream_source WHERE {where}", + table = ctx.table.fq_name(), + where = ctx.stream_where_clause, + ); + let labels_match_clause = format!("{labels_expr} = stream_labels.labels"); + let sql = format!( + "WITH stream_labels AS ({stream_cte}) \ + SELECT bucket_start AS bucket, stream_labels.labels AS labels, {value_expr} AS value \ + FROM {buckets} CROSS JOIN stream_labels \ + LEFT JOIN {table} AS source ON {join} AND {labels_match} \ + GROUP BY bucket_start, stream_labels.labels \ + ORDER BY bucket, stream_labels.labels", + stream_cte = stream_cte, + value_expr = ctx.value_expr, + buckets = ctx.buckets_table, + table = ctx.table.fq_name(), + join = ctx.join_clause, + labels_match = labels_match_clause + ); + Ok(MetricRangeQueryPlan { + sql, + labels: MetricLabelsPlan::LokiFull, + }) + } + + fn metric_range_aggregation_sql( + &self, + ctx: &MetricRangeAggregationContext<'_>, + ) -> Result { + if let Some(GroupModifier::Without(labels)) = &ctx.aggregation.grouping { + return Err(AppError::BadRequest(format!( + "metric queries do not support `without` grouping ({labels:?})" + ))); + } + let grouping_labels = match &ctx.aggregation.grouping { + Some(GroupModifier::By(labels)) if !labels.is_empty() => labels.clone(), + _ => Vec::new(), + }; + let grouped_labels_expr = drop_labels_expr(ctx.labels_column, ctx.drop_labels); + let group_columns = build_loki_group_columns(&grouping_labels, &grouped_labels_expr); + let mut select_parts = vec!["bucket_start AS bucket".to_string()]; + select_parts.extend( + group_columns + .iter() + .map(|column| format!("{} AS {}", column.expr, column.alias)), + ); + select_parts.push(format!("{} AS value", ctx.value_expr)); + let mut group_parts = vec!["bucket_start".to_string(), grouped_labels_expr.clone()]; + group_parts.extend(group_columns.iter().map(|column| column.expr.clone())); + let inner_sql = format!( + "SELECT {select_clause} FROM {buckets} LEFT JOIN {table} AS source ON {join} GROUP BY {group_by}", + select_clause = select_parts.join(", "), + buckets = ctx.buckets_table, + table = ctx.table.fq_name(), + join = ctx.join_clause, + group_by = group_parts.join(", ") + ); + let alias_list: Vec = group_columns + .iter() + .map(|column| column.alias.clone()) + .collect(); + let alias_clause = alias_list.join(", "); + let aggregate = aggregate_value_select(ctx.aggregation.op, "value"); + let select_prefix = if alias_list.is_empty() { + format!("bucket, {aggregate}") + } else { + format!("bucket, {alias_clause}, {aggregate}") + }; + let group_suffix = if alias_list.is_empty() { + " GROUP BY bucket".to_string() + } else { + format!(" GROUP BY bucket, {alias_clause}") + }; + let sql = format!( + "SELECT {select_prefix} FROM ({inner}) AS stream_data{group_suffix} ORDER BY bucket", + inner = inner_sql, + select_prefix = select_prefix, + group_suffix = group_suffix + ); + let labels = if grouping_labels.is_empty() { + MetricLabelsPlan::Columns(Vec::new()) + } else { + MetricLabelsPlan::Columns(grouping_labels) + }; + Ok(MetricRangeQueryPlan { sql, labels }) + } + pub(crate) fn parse_row(&self, row: &Row) -> Result { if row.values().len() < 3 { return Err(AppError::Internal( @@ -254,6 +581,27 @@ impl LokiSchema { } } +struct MetricRangeStreamContext<'a> { + table: &'a TableRef, + buckets_table: &'a str, + labels_column: &'a str, + stream_labels_column: &'a str, + value_expr: &'a str, + join_clause: &'a str, + stream_where_clause: &'a str, + drop_labels: &'a [String], +} + +struct MetricRangeAggregationContext<'a> { + table: &'a TableRef, + buckets_table: &'a str, + labels_column: &'a str, + value_expr: &'a str, + join_clause: &'a str, + aggregation: &'a VectorAggregation, + drop_labels: &'a [String], +} + fn label_clause_loki(matcher: &LabelMatcher, column: String) -> String { let value = escape(&matcher.value); match matcher.op { @@ -265,3 +613,46 @@ fn label_clause_loki(matcher: &LabelMatcher, column: String) -> String { } } } + +struct LokiGroupColumn { + expr: String, + alias: String, +} + +fn build_loki_group_columns(labels: &[String], column: &str) -> Vec { + let base = format!("({column})"); + labels + .iter() + .enumerate() + .map(|(idx, label)| { + let escaped = escape(label); + LokiGroupColumn { + expr: format!("{base}['{escaped}']"), + alias: format!("group_{idx}"), + } + }) + .collect() +} + +fn drop_labels_expr(column: &str, drop_labels: &[String]) -> String { + if drop_labels.is_empty() { + return column.to_string(); + } + let mut expr = column.to_string(); + for label in drop_labels { + let escaped = escape(label); + expr = format!("OBJECT_DELETE({expr}, '{escaped}')"); + } + expr +} + +fn range_value_expression(function: RangeFunction, duration_ns: i64) -> String { + match function { + RangeFunction::CountOverTime => "COUNT(*)".to_string(), + RangeFunction::Rate => { + let seconds = duration_ns as f64 / 1_000_000_000_f64; + let literal = format_float_literal(seconds); + format!("COUNT(*) / {literal}") + } + } +} diff --git a/src/logql/mod.rs b/src/logql/mod.rs index 6833b31..1f33ca8 100644 --- a/src/logql/mod.rs +++ b/src/logql/mod.rs @@ -17,7 +17,8 @@ mod pipeline; pub use pipeline::{LineTemplate, Pipeline, PipelineStage}; use pipeline::{ - JsonPath, JsonSelector, JsonStage, LabelFormatRule, LabelFormatStage, LabelFormatValue, + DropStage, JsonPath, JsonSelector, JsonStage, LabelFormatRule, LabelFormatStage, + LabelFormatValue, }; use nom::{ @@ -27,7 +28,7 @@ use nom::{ character::complete::{char, multispace0, multispace1, none_of}, combinator::{all_consuming, cut, map, map_res, opt, recognize}, error::{Error as NomError, context}, - multi::{fold_many0, separated_list0, separated_list1}, + multi::{fold_many0, fold_many1, separated_list0, separated_list1}, sequence::{delimited, pair, preceded}, }; use thiserror::Error; @@ -39,6 +40,84 @@ pub struct LogqlExpr { pub pipeline: Pipeline, } +#[derive(Debug, Clone)] +pub struct MetricExpr { + pub range: RangeExpr, + pub aggregation: Option, +} + +#[derive(Debug, Clone)] +pub struct RangeExpr { + pub function: RangeFunction, + pub selector: LogqlExpr, + pub duration: DurationValue, +} + +#[derive(Debug, Clone, Copy)] +pub enum RangeFunction { + CountOverTime, + Rate, +} + +#[derive(Debug, Clone)] +pub struct VectorAggregation { + pub op: VectorAggregationOp, + pub grouping: Option, +} + +#[derive(Debug, Clone, Copy)] +pub enum VectorAggregationOp { + Sum, + Avg, + Min, + Max, + Count, +} + +#[derive(Debug, Clone)] +pub enum GroupModifier { + By(Vec), + Without(Vec), +} + +#[derive(Debug, Clone, Copy)] +pub struct DurationValue { + nanoseconds: i64, +} + +impl DurationValue { + pub fn new(nanoseconds: i64) -> Result { + if nanoseconds <= 0 { + return Err("range duration must be positive".into()); + } + Ok(Self { nanoseconds }) + } + + pub fn as_nanoseconds(&self) -> i64 { + self.nanoseconds + } + + pub fn parse_literal(input: &str) -> Result { + let mut total: i128 = 0; + let mut rest = input.trim(); + if rest.is_empty() { + return Err("duration literal cannot be empty".into()); + } + while !rest.is_empty() { + let (value, unit_len) = parse_duration_segment(rest)?; + total = total + .checked_add(value as i128) + .ok_or_else(|| "duration value overflowed".to_string())?; + rest = &rest[unit_len..]; + rest = rest.trim_start(); + } + let nanos: i64 = total + .try_into() + .map_err(|_| "duration exceeds supported range".to_string())?; + DurationValue::new(nanos) + } +} + #[derive(Debug, Clone)] pub struct LabelMatcher { pub key: String, @@ -75,6 +154,10 @@ impl LogqlParser { pub fn parse(&self, input: &str) -> Result { parse_logql(input) } + + pub fn parse_metric(&self, input: &str) -> Result, LogqlError> { + parse_metric_expr(input) + } } #[derive(Debug, Error)] @@ -92,6 +175,38 @@ fn parse_logql(input: &str) -> Result { .map_err(|err| LogqlError::Invalid(err.to_string())) } +fn parse_metric_expr(input: &str) -> Result, LogqlError> { + if !looks_like_metric(input) { + return Ok(None); + } + all_consuming(delimited(multispace0, metric_expression, multispace0)) + .parse(input) + .map(|(_, expr)| Some(expr)) + .map_err(|err| LogqlError::Invalid(err.to_string())) +} + +fn looks_like_metric(input: &str) -> bool { + let trimmed = input.trim_start(); + let ident = trimmed + .chars() + .take_while(|ch| ch.is_ascii_alphabetic() || *ch == '_') + .collect::() + .to_ascii_lowercase(); + if ident.is_empty() { + return false; + } + let metric_tokens = [ + "sum", + "avg", + "min", + "max", + "count", + "rate", + "count_over_time", + ]; + metric_tokens.iter().any(|token| *token == ident) +} + fn query(input: &str) -> NomResult<'_, LogqlExpr> { let (input, selectors) = selector(input)?; let (input, filters) = many_filters(input)?; @@ -106,6 +221,206 @@ fn query(input: &str) -> NomResult<'_, LogqlExpr> { )) } +fn metric_expression(input: &str) -> NomResult<'_, MetricExpr> { + alt(( + vector_aggregation_expr, + map(range_function_expr, |range| MetricExpr { + range, + aggregation: None, + }), + )) + .parse(input) +} + +fn vector_aggregation_expr(input: &str) -> NomResult<'_, MetricExpr> { + map( + pair( + aggregation_op, + pair(opt(group_modifier), aggregation_argument), + ), + |(op, (grouping, range))| MetricExpr { + range, + aggregation: Some(VectorAggregation { op, grouping }), + }, + ) + .parse(input) +} + +fn aggregation_argument(input: &str) -> NomResult<'_, RangeExpr> { + delimited( + preceded(multispace0, char('(')), + cut(range_function_expr), + preceded(multispace0, char(')')), + ) + .parse(input) +} + +fn aggregation_op(input: &str) -> NomResult<'_, VectorAggregationOp> { + context( + "aggregation operator", + preceded( + multispace0, + alt(( + map(tag("sum"), |_| VectorAggregationOp::Sum), + map(tag("avg"), |_| VectorAggregationOp::Avg), + map(tag("min"), |_| VectorAggregationOp::Min), + map(tag("max"), |_| VectorAggregationOp::Max), + map(tag("count"), |_| VectorAggregationOp::Count), + )), + ), + ) + .parse(input) +} + +fn group_modifier(input: &str) -> NomResult<'_, GroupModifier> { + context( + "group modifier", + map( + pair( + preceded(multispace1, alt((tag("by"), tag("without")))), + delimited( + preceded(multispace0, char('(')), + cut(label_list), + preceded(multispace0, char(')')), + ), + ), + |(modifier, labels)| match modifier { + "by" => GroupModifier::By(labels), + "without" => GroupModifier::Without(labels), + _ => unreachable!(), + }, + ), + ) + .parse(input) +} + +fn label_list(input: &str) -> NomResult<'_, Vec> { + separated_list1( + preceded(multispace0, char(',')), + preceded(multispace0, label_identifier), + ) + .parse(input) +} + +fn range_function_expr(input: &str) -> NomResult<'_, RangeExpr> { + map( + pair( + range_function, + delimited( + preceded(multispace0, char('(')), + cut(pair(query, preceded(multispace0, range_selector))), + preceded(multispace0, char(')')), + ), + ), + |(function, (selector, duration))| RangeExpr { + function, + selector, + duration, + }, + ) + .parse(input) +} + +fn range_function(input: &str) -> NomResult<'_, RangeFunction> { + context( + "range function", + preceded( + multispace0, + alt(( + map(tag("count_over_time"), |_| RangeFunction::CountOverTime), + map(tag("rate"), |_| RangeFunction::Rate), + )), + ), + ) + .parse(input) +} + +fn range_selector(input: &str) -> NomResult<'_, DurationValue> { + context( + "range selector", + delimited(char('['), cut(duration_literal), char(']')), + ) + .parse(input) +} + +fn duration_literal(input: &str) -> NomResult<'_, DurationValue> { + context( + "duration literal", + map_res( + recognize(fold_many1(duration_segment_token, || (), |_, _| ())), + DurationValue::parse_literal, + ), + ) + .parse(input) +} + +fn duration_segment_token(input: &str) -> NomResult<'_, ()> { + map( + pair( + take_while1(|ch: char| ch.is_ascii_digit()), + alt(( + tag("ns"), + tag("us"), + tag("µs"), + tag("ms"), + tag("s"), + tag("m"), + tag("h"), + tag("d"), + tag("w"), + )), + ), + |_| (), + ) + .parse(input) +} + +fn parse_duration_segment(input: &str) -> Result<(i64, usize), String> { + if input.is_empty() { + return Err("duration literal cannot be empty".into()); + } + let mut digit_len = 0; + for ch in input.chars() { + if ch.is_ascii_digit() { + digit_len += ch.len_utf8(); + } else { + break; + } + } + if digit_len == 0 { + return Err("duration segment is missing digits".into()); + } + let number = input[..digit_len] + .parse::() + .map_err(|_| "duration value is not a valid integer".to_string())?; + let rest = &input[digit_len..]; + let (multiplier, unit_len) = duration_unit_multiplier(rest) + .ok_or_else(|| "duration segment is missing a valid unit".to_string())?; + let total = number + .checked_mul(multiplier) + .ok_or_else(|| "duration value overflowed".to_string())?; + Ok((total, digit_len + unit_len)) +} + +fn duration_unit_multiplier(input: &str) -> Option<(i64, usize)> { + let units: [(&str, i64); 9] = [ + ("ns", 1), + ("us", 1_000), + ("µs", 1_000), + ("ms", 1_000_000), + ("s", 1_000_000_000), + ("m", 60 * 1_000_000_000), + ("h", 3_600 * 1_000_000_000), + ("d", 86_400 * 1_000_000_000), + ("w", 604_800 * 1_000_000_000), + ]; + for (token, multiplier) in units { + if input.starts_with(token) { + return Some((multiplier, token.len())); + } + } + None +} fn selector(input: &str) -> NomResult<'_, Vec> { context( "label selector", @@ -219,6 +534,7 @@ fn pipeline_stage(input: &str) -> NomResult<'_, PipelineStage> { json_stage, line_format_stage, label_format_stage, + drop_stage, )), ), ), @@ -312,6 +628,26 @@ fn label_format_stage(input: &str) -> NomResult<'_, PipelineStage> { .parse(input) } +fn drop_stage(input: &str) -> NomResult<'_, PipelineStage> { + context( + "drop stage", + map( + preceded( + tag("drop"), + preceded( + multispace1, + separated_list1( + preceded(multispace0, char(',')), + preceded(multispace0, label_identifier), + ), + ), + ), + |targets| PipelineStage::Drop(DropStage { targets }), + ), + ) + .parse(input) +} + fn label_format_rule(input: &str) -> NomResult<'_, LabelFormatRule> { context( "label format rule", @@ -500,4 +836,110 @@ mod tests { _ => panic!("expected label_format stage"), } } + + #[test] + fn parse_drop_stage() { + let expr = LogqlParser + .parse("{job=\"api\"} | drop __error__,temp_label") + .unwrap(); + assert_eq!(expr.pipeline.stages().len(), 1); + match &expr.pipeline.stages()[0] { + PipelineStage::Drop(stage) => { + assert_eq!( + stage.targets, + vec!["__error__".to_string(), "temp_label".to_string()] + ); + } + other => panic!("unexpected stage: {other:?}"), + } + } + + #[test] + fn metric_drop_labels_permit_only_drop_stage() { + let expr = LogqlParser + .parse("{job=\"api\"} | drop __error__,temp_label | drop foo") + .unwrap(); + let drops = expr.pipeline.metric_drop_labels().unwrap(); + assert_eq!( + drops, + vec![ + "__error__".to_string(), + "foo".to_string(), + "temp_label".to_string() + ] + ); + } + + #[test] + fn metric_drop_labels_rejects_other_stages() { + let expr = LogqlParser + .parse("{job=\"api\"} | json | drop __error__") + .unwrap(); + let err = expr.pipeline.metric_drop_labels().unwrap_err(); + assert!(err.contains("drop"), "unexpected error: {}", err); + } + + #[test] + fn duration_literal_with_multiple_segments() { + let value = DurationValue::parse_literal("1h30m15s").unwrap(); + assert_eq!( + value.as_nanoseconds(), + (3_600 + 30 * 60 + 15) * 1_000_000_000 + ); + let short = DurationValue::parse_literal("10ms").unwrap(); + assert_eq!(short.as_nanoseconds(), 10_000_000); + } + + #[test] + fn duration_literal_rejects_invalid_units() { + assert!(DurationValue::parse_literal("").is_err()); + assert!(DurationValue::parse_literal("10x").is_err()); + assert!(DurationValue::parse_literal("ms").is_err()); + } + + #[test] + fn parse_metric_rate_without_aggregation() { + let parsed = LogqlParser + .parse_metric("rate({app=\"api\",env!=\"prod\"}[5m])") + .unwrap() + .unwrap(); + assert!(matches!(parsed.range.function, RangeFunction::Rate)); + assert_eq!(parsed.range.selector.selectors.len(), 2); + assert!(parsed.range.selector.pipeline.is_empty()); + assert_eq!( + parsed.range.duration.as_nanoseconds(), + 5 * 60 * 1_000_000_000 + ); + assert!(parsed.aggregation.is_none()); + } + + #[test] + fn parse_metric_sum_by_labels() { + let parsed = LogqlParser + .parse_metric("sum by (app,instance) (count_over_time({job=\"svc\"}[1h]))") + .unwrap() + .unwrap(); + let agg = parsed.aggregation.unwrap(); + match agg.grouping { + Some(GroupModifier::By(labels)) => { + assert_eq!(labels, vec!["app".to_string(), "instance".to_string()]) + } + other => panic!("unexpected grouping: {other:?}"), + } + assert!(matches!(agg.op, VectorAggregationOp::Sum)); + assert_eq!( + parsed.range.duration.as_nanoseconds(), + 3_600 * 1_000_000_000 + ); + } + + #[test] + fn parse_metric_non_metric_returns_none() { + assert!( + LogqlParser + .parse_metric("{app=\"loki\"}") + .unwrap() + .is_none() + ); + } } diff --git a/src/logql/pipeline.rs b/src/logql/pipeline.rs index 1594eee..ce47a08 100644 --- a/src/logql/pipeline.rs +++ b/src/logql/pipeline.rs @@ -26,6 +26,29 @@ impl Pipeline { Self { stages } } + #[cfg(test)] + pub fn is_empty(&self) -> bool { + self.stages.is_empty() + } + + pub fn metric_drop_labels(&self) -> Result, String> { + let mut labels = Vec::new(); + for stage in &self.stages { + match stage { + PipelineStage::Drop(stage) => labels.extend(stage.targets.iter().cloned()), + _ => { + return Err( + "metric queries only support `drop` stages inside the selector pipeline" + .into(), + ); + } + } + } + labels.sort(); + labels.dedup(); + Ok(labels) + } + #[cfg(test)] pub fn stages(&self) -> &[PipelineStage] { &self.stages @@ -58,6 +81,7 @@ pub enum PipelineStage { Json(JsonStage), LineFormat(LineTemplate), LabelFormat(LabelFormatStage), + Drop(DropStage), } impl PipelineStage { @@ -67,6 +91,7 @@ impl PipelineStage { PipelineStage::Json(stage) => ctx.extract_json(stage), PipelineStage::LineFormat(template) => ctx.apply_template(template), PipelineStage::LabelFormat(stage) => ctx.format_labels(stage), + PipelineStage::Drop(stage) => ctx.drop_labels(stage), } } } @@ -216,6 +241,11 @@ pub enum LabelFormatValue { Source(String), } +#[derive(Debug, Clone)] +pub struct DropStage { + pub targets: Vec, +} + #[derive(Debug, Clone)] pub struct LineTemplate { segments: Vec, @@ -330,6 +360,13 @@ impl<'a> StageContext<'a> { } } + fn drop_labels(&mut self, stage: &DropStage) { + for target in &stage.targets { + self.labels.remove(target); + self.extracted.remove(target); + } + } + fn apply_template(&mut self, template: &LineTemplate) { self.line = template.render(self); }