Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ Ensure the table matches one of the schemas above (including indexes) so Grafana

## HTTP API

All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins.
All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins. Refer to the upstream [Loki HTTP API reference](https://grafana.com/docs/loki/latest/reference/loki-http-api/) for the detailed contract of each endpoint.

| Endpoint | Description |
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. |
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |
| `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. |
| `GET /loki/api/v1/tail` | WebSocket tail endpoint that streams live logs for a LogQL query; compatible with Grafana Explore and `logcli --tail`. |

`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.
Expand Down
111 changes: 109 additions & 2 deletions src/app/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use axum::{
routing::get,
};
use chrono::Utc;
use serde::Deserialize;
use databend_driver::{NumberValue, Row, Value};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, time::Instant};
use tokio::time::{Duration, sleep};

use crate::{
databend::{
LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder,
execute_query,
StatsQueryBounds, execute_query,
},
error::AppError,
logql::{DurationValue, LogqlExpr},
Expand All @@ -57,6 +58,7 @@ pub fn router(state: AppState) -> Router {
.route("/loki/api/v1/label/{label}/values", get(label_values))
.route("/loki/api/v1/query", get(instant_query))
.route("/loki/api/v1/query_range", get(range_query))
.route("/loki/api/v1/index/stats", get(index_stats))
.route("/loki/api/v1/tail", get(tail_logs))
.with_state(state)
.layer(middleware::from_fn(log_requests))
Expand Down Expand Up @@ -92,6 +94,21 @@ struct TailQueryParams {
delay_for: Option<u64>,
}

#[derive(Debug, Deserialize)]
struct StatsQueryParams {
query: Option<String>,
start: Option<i64>,
end: Option<i64>,
}

#[derive(Debug, Serialize)]
struct IndexStatsResponse {
streams: u64,
chunks: u64,
entries: u64,
bytes: u64,
}

async fn instant_query(
State(state): State<AppState>,
Query(params): Query<InstantQueryParams>,
Expand Down Expand Up @@ -229,6 +246,50 @@ async fn range_query(
Ok(Json(LokiResponse::success(streams)))
}

async fn index_stats(
State(state): State<AppState>,
Query(params): Query<StatsQueryParams>,
) -> Result<Json<IndexStatsResponse>, AppError> {
let query = params
.query
.as_deref()
.filter(|value| !value.is_empty())
.ok_or_else(|| AppError::BadRequest("query is required".into()))?;
let start = params
.start
.ok_or_else(|| AppError::BadRequest("start is required".into()))?;
let end = params
.end
.ok_or_else(|| AppError::BadRequest("end is required".into()))?;
if start >= end {
return Err(AppError::BadRequest(
"start must be smaller than end".into(),
));
}
let expr = state.parse(query)?;
let plan = state.schema().build_index_stats_query(
state.table(),
&expr,
&StatsQueryBounds {
start_ns: start,
end_ns: end,
},
)?;
log::debug!(
"index stats SQL (start_ns={}, end_ns={}): {}",
start,
end,
plan.sql
);
let rows = execute_query(state.client(), &plan.sql).await?;
let row = rows
.into_iter()
.next()
.ok_or_else(|| AppError::Internal("index stats query returned no rows".into()))?;
let stats = parse_index_stats_row(row)?;
Ok(Json(stats))
}

async fn tail_logs(
State(state): State<AppState>,
Query(params): Query<TailQueryParams>,
Expand Down Expand Up @@ -361,6 +422,52 @@ fn parse_vector_term(segment: &str) -> Option<f64> {
Some(value)
}

fn parse_index_stats_row(row: Row) -> Result<IndexStatsResponse, AppError> {
if row.len() < 4 {
return Err(AppError::Internal(
"index stats query must return streams, chunks, entries, bytes".into(),
));
}
let values = row.values();
let streams = value_to_u64(&values[0], "streams")?;
let chunks = value_to_u64(&values[1], "chunks")?;
let entries = value_to_u64(&values[2], "entries")?;
let bytes = value_to_u64(&values[3], "bytes")?;
Ok(IndexStatsResponse {
streams,
chunks,
entries,
bytes,
})
}

fn value_to_u64(value: &Value, context: &str) -> Result<u64, AppError> {
match value {
Value::Null => Ok(0),
Value::Number(number) => match number {
NumberValue::UInt8(v) => Ok(*v as u64),
NumberValue::UInt16(v) => Ok(*v as u64),
NumberValue::UInt32(v) => Ok(*v as u64),
NumberValue::UInt64(v) => Ok(*v),
NumberValue::Int8(v) if *v >= 0 => Ok(*v as u64),
NumberValue::Int16(v) if *v >= 0 => Ok(*v as u64),
NumberValue::Int32(v) if *v >= 0 => Ok(*v as u64),
NumberValue::Int64(v) if *v >= 0 => Ok(*v as u64),
NumberValue::Float32(v) if *v >= 0.0 => Ok(v.trunc() as u64),
NumberValue::Float64(v) if *v >= 0.0 => Ok(v.trunc() as u64),
other => Err(AppError::Internal(format!(
"unexpected {context} numeric value: {other:?}"
))),
},
Value::String(text) => text
.parse::<u64>()
.map_err(|err| AppError::Internal(format!("failed to parse {context} as u64: {err}"))),
other => Err(AppError::Internal(format!(
"unexpected {context} value type: {other:?}"
))),
}
}

async fn log_requests(req: Request<Body>, next: Next) -> Response {
let method = req.method().clone();
let uri = req.uri().clone();
Expand Down
21 changes: 21 additions & 0 deletions src/databend/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ impl SchemaAdapter {
}
}
}

pub fn build_index_stats_query(
&self,
table: &TableRef,
expr: &LogqlExpr,
bounds: &StatsQueryBounds,
) -> Result<StatsQueryPlan, AppError> {
match self {
SchemaAdapter::Loki(schema) => schema.build_index_stats_query(table, expr, bounds),
SchemaAdapter::Flat(schema) => schema.build_index_stats_query(table, expr, bounds),
}
}
}

pub struct QueryBounds {
Expand All @@ -185,6 +197,11 @@ pub struct MetricRangeQueryBounds {
pub window_ns: i64,
}

pub struct StatsQueryBounds {
pub start_ns: i64,
pub end_ns: i64,
}

#[derive(Clone, Copy, Default)]
pub struct LabelQueryBounds {
pub start_ns: Option<i64>,
Expand Down Expand Up @@ -238,6 +255,10 @@ pub struct MetricRangeQueryPlan {
pub labels: MetricLabelsPlan,
}

pub struct StatsQueryPlan {
pub sql: String,
}

#[derive(Clone)]
pub enum MetricLabelsPlan {
LokiFull,
Expand Down
61 changes: 56 additions & 5 deletions src/databend/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use crate::{

use super::core::{
LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan,
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef,
aggregate_value_select, ensure_line_column, ensure_timestamp_column, escape, execute_query,
format_float_literal, is_line_candidate, is_numeric_type, line_filter_clause,
matches_named_column, metric_bucket_cte, missing_required_column, quote_ident,
range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, value_to_timestamp,
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds,
StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_line_column,
ensure_timestamp_column, escape, execute_query, format_float_literal, is_line_candidate,
is_numeric_type, line_filter_clause, matches_named_column, metric_bucket_cte,
missing_required_column, quote_ident, range_bucket_value_expression, timestamp_literal,
timestamp_offset_expr, value_to_timestamp,
};

#[derive(Clone)]
Expand Down Expand Up @@ -233,6 +234,56 @@ impl FlatSchema {
}
}

pub(crate) fn build_index_stats_query(
&self,
table: &TableRef,
expr: &LogqlExpr,
bounds: &StatsQueryBounds,
) -> Result<StatsQueryPlan, AppError> {
let ts_col = quote_ident(&self.timestamp_col);
let mut clauses = vec![
format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?),
format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?),
];
for matcher in &expr.selectors {
clauses.push(label_clause_flat(matcher, &self.label_cols, None)?);
}
clauses.extend(
expr.filters
.iter()
.map(|f| line_filter_clause(quote_ident(&self.line_col), f)),
);
let where_clause = clauses.join(" AND ");
let stream_expr = self.stream_identity_expr(None);
let line_col = quote_ident(&self.line_col);
let sql = format!(
"SELECT \
COUNT(DISTINCT {stream}) AS streams, \
COUNT(DISTINCT {stream}) AS chunks, \
COUNT(*) AS entries, \
COALESCE(SUM(length({line_col})), 0) AS bytes \
FROM {table} \
WHERE {where_clause}",
stream = stream_expr,
table = table.fq_name()
);
Ok(StatsQueryPlan { sql })
}

fn stream_identity_expr(&self, alias: Option<&str>) -> String {
if self.label_cols.is_empty() {
return "''".to_string();
}
let qualifier = alias.map(|name| format!("{name}.")).unwrap_or_default();
let mut args = Vec::with_capacity(self.label_cols.len() + 1);
args.push("'|'".to_string());
for column in &self.label_cols {
let qualified = format!("{qualifier}{}", quote_ident(&column.name));
args.push(format!("COALESCE(CAST({qualified} AS STRING), '')"));
}
format!("concat_ws({})", args.join(", "))
}

fn metric_stream_sql(
&self,
table: &TableRef,
Expand Down
50 changes: 44 additions & 6 deletions src/databend/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use crate::{

use super::core::{
LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan,
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef,
aggregate_value_select, ensure_labels_column, ensure_line_column, ensure_timestamp_column,
escape, execute_query, format_float_literal, line_filter_clause, matches_line_column,
matches_named_column, metric_bucket_cte, missing_required_column, parse_labels_value,
quote_ident, range_bucket_value_expression, timestamp_literal, timestamp_offset_expr,
value_to_timestamp,
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds,
StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_labels_column,
ensure_line_column, ensure_timestamp_column, escape, execute_query, format_float_literal,
line_filter_clause, matches_line_column, matches_named_column, metric_bucket_cte,
missing_required_column, parse_labels_value, quote_ident, range_bucket_value_expression,
timestamp_literal, timestamp_offset_expr, value_to_timestamp,
};

#[derive(Clone)]
Expand Down Expand Up @@ -228,6 +228,44 @@ impl LokiSchema {
}
}

pub(crate) fn build_index_stats_query(
&self,
table: &TableRef,
expr: &LogqlExpr,
bounds: &StatsQueryBounds,
) -> Result<StatsQueryPlan, AppError> {
let ts_col = quote_ident(&self.timestamp_col);
let mut clauses = vec![
format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?),
format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?),
];
clauses.extend(
expr.selectors
.iter()
.map(|m| label_clause_loki(m, quote_ident(&self.labels_col))),
);
clauses.extend(
expr.filters
.iter()
.map(|f| line_filter_clause(quote_ident(&self.line_col), f)),
);
let where_clause = clauses.join(" AND ");
let labels_col = quote_ident(&self.labels_col);
let line_col = quote_ident(&self.line_col);
let stream_hash = format!("city64withseed({labels_col}, 0)");
let sql = format!(
"SELECT \
COUNT(DISTINCT {stream_hash}) AS streams, \
COUNT(DISTINCT {stream_hash}) AS chunks, \
COUNT(*) AS entries, \
COALESCE(SUM(length({line_col})), 0) AS bytes \
FROM {table} \
WHERE {where_clause}",
table = table.fq_name()
);
Ok(StatsQueryPlan { sql })
}

fn metric_streams_sql(
&self,
table: &TableRef,
Expand Down
Loading