diff --git a/README.md b/README.md index 7b680ae..d2ce297 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ Ensure the table matches one of the schemas above (including indexes) so Grafana ## HTTP API -All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins. +All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins. Refer to the upstream [Loki HTTP API reference](https://grafana.com/docs/loki/latest/reference/loki-http-api/) for the detailed contract of each endpoint. | Endpoint | Description | | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | @@ -125,6 +125,7 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha | `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. | | `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. | | `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. | +| `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. | | `GET /loki/api/v1/tail` | WebSocket tail endpoint that streams live logs for a LogQL query; compatible with Grafana Explore and `logcli --tail`. | `/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order. diff --git a/src/app/handlers.rs b/src/app/handlers.rs index db27a59..9285ab7 100644 --- a/src/app/handlers.rs +++ b/src/app/handlers.rs @@ -25,14 +25,15 @@ use axum::{ routing::get, }; use chrono::Utc; -use serde::Deserialize; +use databend_driver::{NumberValue, Row, Value}; +use serde::{Deserialize, Serialize}; use std::{collections::HashSet, time::Instant}; use tokio::time::{Duration, sleep}; use crate::{ databend::{ LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder, - execute_query, + StatsQueryBounds, execute_query, }, error::AppError, logql::{DurationValue, LogqlExpr}, @@ -57,6 +58,7 @@ pub fn router(state: AppState) -> Router { .route("/loki/api/v1/label/{label}/values", get(label_values)) .route("/loki/api/v1/query", get(instant_query)) .route("/loki/api/v1/query_range", get(range_query)) + .route("/loki/api/v1/index/stats", get(index_stats)) .route("/loki/api/v1/tail", get(tail_logs)) .with_state(state) .layer(middleware::from_fn(log_requests)) @@ -92,6 +94,21 @@ struct TailQueryParams { delay_for: Option, } +#[derive(Debug, Deserialize)] +struct StatsQueryParams { + query: Option, + start: Option, + end: Option, +} + +#[derive(Debug, Serialize)] +struct IndexStatsResponse { + streams: u64, + chunks: u64, + entries: u64, + bytes: u64, +} + async fn instant_query( State(state): State, Query(params): Query, @@ -229,6 +246,50 @@ async fn range_query( Ok(Json(LokiResponse::success(streams))) } +async fn index_stats( + State(state): State, + Query(params): Query, +) -> Result, AppError> { + let query = params + .query + .as_deref() + .filter(|value| !value.is_empty()) + .ok_or_else(|| AppError::BadRequest("query is required".into()))?; + let start = params + .start + .ok_or_else(|| AppError::BadRequest("start is required".into()))?; + let end = params + .end + .ok_or_else(|| AppError::BadRequest("end is required".into()))?; + if start >= end { + return Err(AppError::BadRequest( + "start must be smaller than end".into(), + )); + } + let expr = state.parse(query)?; + let plan = state.schema().build_index_stats_query( + state.table(), + &expr, + &StatsQueryBounds { + start_ns: start, + end_ns: end, + }, + )?; + log::debug!( + "index stats SQL (start_ns={}, end_ns={}): {}", + start, + end, + plan.sql + ); + let rows = execute_query(state.client(), &plan.sql).await?; + let row = rows + .into_iter() + .next() + .ok_or_else(|| AppError::Internal("index stats query returned no rows".into()))?; + let stats = parse_index_stats_row(row)?; + Ok(Json(stats)) +} + async fn tail_logs( State(state): State, Query(params): Query, @@ -361,6 +422,52 @@ fn parse_vector_term(segment: &str) -> Option { Some(value) } +fn parse_index_stats_row(row: Row) -> Result { + if row.len() < 4 { + return Err(AppError::Internal( + "index stats query must return streams, chunks, entries, bytes".into(), + )); + } + let values = row.values(); + let streams = value_to_u64(&values[0], "streams")?; + let chunks = value_to_u64(&values[1], "chunks")?; + let entries = value_to_u64(&values[2], "entries")?; + let bytes = value_to_u64(&values[3], "bytes")?; + Ok(IndexStatsResponse { + streams, + chunks, + entries, + bytes, + }) +} + +fn value_to_u64(value: &Value, context: &str) -> Result { + match value { + Value::Null => Ok(0), + Value::Number(number) => match number { + NumberValue::UInt8(v) => Ok(*v as u64), + NumberValue::UInt16(v) => Ok(*v as u64), + NumberValue::UInt32(v) => Ok(*v as u64), + NumberValue::UInt64(v) => Ok(*v), + NumberValue::Int8(v) if *v >= 0 => Ok(*v as u64), + NumberValue::Int16(v) if *v >= 0 => Ok(*v as u64), + NumberValue::Int32(v) if *v >= 0 => Ok(*v as u64), + NumberValue::Int64(v) if *v >= 0 => Ok(*v as u64), + NumberValue::Float32(v) if *v >= 0.0 => Ok(v.trunc() as u64), + NumberValue::Float64(v) if *v >= 0.0 => Ok(v.trunc() as u64), + other => Err(AppError::Internal(format!( + "unexpected {context} numeric value: {other:?}" + ))), + }, + Value::String(text) => text + .parse::() + .map_err(|err| AppError::Internal(format!("failed to parse {context} as u64: {err}"))), + other => Err(AppError::Internal(format!( + "unexpected {context} value type: {other:?}" + ))), + } +} + async fn log_requests(req: Request, next: Next) -> Response { let method = req.method().clone(); let uri = req.uri().clone(); diff --git a/src/databend/core.rs b/src/databend/core.rs index fe5d5ee..a225b34 100644 --- a/src/databend/core.rs +++ b/src/databend/core.rs @@ -164,6 +164,18 @@ impl SchemaAdapter { } } } + + pub fn build_index_stats_query( + &self, + table: &TableRef, + expr: &LogqlExpr, + bounds: &StatsQueryBounds, + ) -> Result { + match self { + SchemaAdapter::Loki(schema) => schema.build_index_stats_query(table, expr, bounds), + SchemaAdapter::Flat(schema) => schema.build_index_stats_query(table, expr, bounds), + } + } } pub struct QueryBounds { @@ -185,6 +197,11 @@ pub struct MetricRangeQueryBounds { pub window_ns: i64, } +pub struct StatsQueryBounds { + pub start_ns: i64, + pub end_ns: i64, +} + #[derive(Clone, Copy, Default)] pub struct LabelQueryBounds { pub start_ns: Option, @@ -238,6 +255,10 @@ pub struct MetricRangeQueryPlan { pub labels: MetricLabelsPlan, } +pub struct StatsQueryPlan { + pub sql: String, +} + #[derive(Clone)] pub enum MetricLabelsPlan { LokiFull, diff --git a/src/databend/flat.rs b/src/databend/flat.rs index 011317f..7b899aa 100644 --- a/src/databend/flat.rs +++ b/src/databend/flat.rs @@ -27,11 +27,12 @@ use crate::{ use super::core::{ LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan, - MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef, - aggregate_value_select, ensure_line_column, ensure_timestamp_column, escape, execute_query, - format_float_literal, is_line_candidate, is_numeric_type, line_filter_clause, - matches_named_column, metric_bucket_cte, missing_required_column, quote_ident, - range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, value_to_timestamp, + MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds, + StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_line_column, + ensure_timestamp_column, escape, execute_query, format_float_literal, is_line_candidate, + is_numeric_type, line_filter_clause, matches_named_column, metric_bucket_cte, + missing_required_column, quote_ident, range_bucket_value_expression, timestamp_literal, + timestamp_offset_expr, value_to_timestamp, }; #[derive(Clone)] @@ -233,6 +234,56 @@ impl FlatSchema { } } + pub(crate) fn build_index_stats_query( + &self, + table: &TableRef, + expr: &LogqlExpr, + bounds: &StatsQueryBounds, + ) -> Result { + let ts_col = quote_ident(&self.timestamp_col); + let mut clauses = vec![ + format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?), + format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?), + ]; + for matcher in &expr.selectors { + clauses.push(label_clause_flat(matcher, &self.label_cols, None)?); + } + clauses.extend( + expr.filters + .iter() + .map(|f| line_filter_clause(quote_ident(&self.line_col), f)), + ); + let where_clause = clauses.join(" AND "); + let stream_expr = self.stream_identity_expr(None); + let line_col = quote_ident(&self.line_col); + let sql = format!( + "SELECT \ + COUNT(DISTINCT {stream}) AS streams, \ + COUNT(DISTINCT {stream}) AS chunks, \ + COUNT(*) AS entries, \ + COALESCE(SUM(length({line_col})), 0) AS bytes \ + FROM {table} \ + WHERE {where_clause}", + stream = stream_expr, + table = table.fq_name() + ); + Ok(StatsQueryPlan { sql }) + } + + fn stream_identity_expr(&self, alias: Option<&str>) -> String { + if self.label_cols.is_empty() { + return "''".to_string(); + } + let qualifier = alias.map(|name| format!("{name}.")).unwrap_or_default(); + let mut args = Vec::with_capacity(self.label_cols.len() + 1); + args.push("'|'".to_string()); + for column in &self.label_cols { + let qualified = format!("{qualifier}{}", quote_ident(&column.name)); + args.push(format!("COALESCE(CAST({qualified} AS STRING), '')")); + } + format!("concat_ws({})", args.join(", ")) + } + fn metric_stream_sql( &self, table: &TableRef, diff --git a/src/databend/loki.rs b/src/databend/loki.rs index fd1591f..d9c07a1 100644 --- a/src/databend/loki.rs +++ b/src/databend/loki.rs @@ -24,12 +24,12 @@ use crate::{ use super::core::{ LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan, - MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef, - aggregate_value_select, ensure_labels_column, ensure_line_column, ensure_timestamp_column, - escape, execute_query, format_float_literal, line_filter_clause, matches_line_column, - matches_named_column, metric_bucket_cte, missing_required_column, parse_labels_value, - quote_ident, range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, - value_to_timestamp, + MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds, + StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_labels_column, + ensure_line_column, ensure_timestamp_column, escape, execute_query, format_float_literal, + line_filter_clause, matches_line_column, matches_named_column, metric_bucket_cte, + missing_required_column, parse_labels_value, quote_ident, range_bucket_value_expression, + timestamp_literal, timestamp_offset_expr, value_to_timestamp, }; #[derive(Clone)] @@ -228,6 +228,44 @@ impl LokiSchema { } } + pub(crate) fn build_index_stats_query( + &self, + table: &TableRef, + expr: &LogqlExpr, + bounds: &StatsQueryBounds, + ) -> Result { + let ts_col = quote_ident(&self.timestamp_col); + let mut clauses = vec![ + format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?), + format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?), + ]; + clauses.extend( + expr.selectors + .iter() + .map(|m| label_clause_loki(m, quote_ident(&self.labels_col))), + ); + clauses.extend( + expr.filters + .iter() + .map(|f| line_filter_clause(quote_ident(&self.line_col), f)), + ); + let where_clause = clauses.join(" AND "); + let labels_col = quote_ident(&self.labels_col); + let line_col = quote_ident(&self.line_col); + let stream_hash = format!("city64withseed({labels_col}, 0)"); + let sql = format!( + "SELECT \ + COUNT(DISTINCT {stream_hash}) AS streams, \ + COUNT(DISTINCT {stream_hash}) AS chunks, \ + COUNT(*) AS entries, \ + COALESCE(SUM(length({line_col})), 0) AS bytes \ + FROM {table} \ + WHERE {where_clause}", + table = table.fq_name() + ); + Ok(StatsQueryPlan { sql }) + } + fn metric_streams_sql( &self, table: &TableRef,