Skip to content

Commit d26ff58

Browse files
authored
feat: support index stats endpoint (#11)
1 parent 5037f29 commit d26ff58

File tree

5 files changed

+232
-14
lines changed

5 files changed

+232
-14
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,15 @@ Ensure the table matches one of the schemas above (including indexes) so Grafana
117117

118118
## HTTP API
119119

120-
All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins.
120+
All endpoints return Loki-compatible JSON responses and reuse the same error shape that Loki expects (`status:error`, `errorType`, `error`). Grafana can therefore talk to the adapter using the stock Loki data source without any proxy layers or plugins. Refer to the upstream [Loki HTTP API reference](https://grafana.com/docs/loki/latest/reference/loki-http-api/) for the detailed contract of each endpoint.
121121

122122
| Endpoint | Description |
123123
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
124124
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
125125
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. |
126126
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
127127
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |
128+
| `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. |
128129
| `GET /loki/api/v1/tail` | WebSocket tail endpoint that streams live logs for a LogQL query; compatible with Grafana Explore and `logcli --tail`. |
129130

130131
`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.

src/app/handlers.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ use axum::{
2525
routing::get,
2626
};
2727
use chrono::Utc;
28-
use serde::Deserialize;
28+
use databend_driver::{NumberValue, Row, Value};
29+
use serde::{Deserialize, Serialize};
2930
use std::{collections::HashSet, time::Instant};
3031
use tokio::time::{Duration, sleep};
3132

3233
use crate::{
3334
databend::{
3435
LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder,
35-
execute_query,
36+
StatsQueryBounds, execute_query,
3637
},
3738
error::AppError,
3839
logql::{DurationValue, LogqlExpr},
@@ -57,6 +58,7 @@ pub fn router(state: AppState) -> Router {
5758
.route("/loki/api/v1/label/{label}/values", get(label_values))
5859
.route("/loki/api/v1/query", get(instant_query))
5960
.route("/loki/api/v1/query_range", get(range_query))
61+
.route("/loki/api/v1/index/stats", get(index_stats))
6062
.route("/loki/api/v1/tail", get(tail_logs))
6163
.with_state(state)
6264
.layer(middleware::from_fn(log_requests))
@@ -92,6 +94,21 @@ struct TailQueryParams {
9294
delay_for: Option<u64>,
9395
}
9496

97+
#[derive(Debug, Deserialize)]
98+
struct StatsQueryParams {
99+
query: Option<String>,
100+
start: Option<i64>,
101+
end: Option<i64>,
102+
}
103+
104+
#[derive(Debug, Serialize)]
105+
struct IndexStatsResponse {
106+
streams: u64,
107+
chunks: u64,
108+
entries: u64,
109+
bytes: u64,
110+
}
111+
95112
async fn instant_query(
96113
State(state): State<AppState>,
97114
Query(params): Query<InstantQueryParams>,
@@ -229,6 +246,50 @@ async fn range_query(
229246
Ok(Json(LokiResponse::success(streams)))
230247
}
231248

249+
async fn index_stats(
250+
State(state): State<AppState>,
251+
Query(params): Query<StatsQueryParams>,
252+
) -> Result<Json<IndexStatsResponse>, AppError> {
253+
let query = params
254+
.query
255+
.as_deref()
256+
.filter(|value| !value.is_empty())
257+
.ok_or_else(|| AppError::BadRequest("query is required".into()))?;
258+
let start = params
259+
.start
260+
.ok_or_else(|| AppError::BadRequest("start is required".into()))?;
261+
let end = params
262+
.end
263+
.ok_or_else(|| AppError::BadRequest("end is required".into()))?;
264+
if start >= end {
265+
return Err(AppError::BadRequest(
266+
"start must be smaller than end".into(),
267+
));
268+
}
269+
let expr = state.parse(query)?;
270+
let plan = state.schema().build_index_stats_query(
271+
state.table(),
272+
&expr,
273+
&StatsQueryBounds {
274+
start_ns: start,
275+
end_ns: end,
276+
},
277+
)?;
278+
log::debug!(
279+
"index stats SQL (start_ns={}, end_ns={}): {}",
280+
start,
281+
end,
282+
plan.sql
283+
);
284+
let rows = execute_query(state.client(), &plan.sql).await?;
285+
let row = rows
286+
.into_iter()
287+
.next()
288+
.ok_or_else(|| AppError::Internal("index stats query returned no rows".into()))?;
289+
let stats = parse_index_stats_row(row)?;
290+
Ok(Json(stats))
291+
}
292+
232293
async fn tail_logs(
233294
State(state): State<AppState>,
234295
Query(params): Query<TailQueryParams>,
@@ -361,6 +422,52 @@ fn parse_vector_term(segment: &str) -> Option<f64> {
361422
Some(value)
362423
}
363424

425+
fn parse_index_stats_row(row: Row) -> Result<IndexStatsResponse, AppError> {
426+
if row.len() < 4 {
427+
return Err(AppError::Internal(
428+
"index stats query must return streams, chunks, entries, bytes".into(),
429+
));
430+
}
431+
let values = row.values();
432+
let streams = value_to_u64(&values[0], "streams")?;
433+
let chunks = value_to_u64(&values[1], "chunks")?;
434+
let entries = value_to_u64(&values[2], "entries")?;
435+
let bytes = value_to_u64(&values[3], "bytes")?;
436+
Ok(IndexStatsResponse {
437+
streams,
438+
chunks,
439+
entries,
440+
bytes,
441+
})
442+
}
443+
444+
fn value_to_u64(value: &Value, context: &str) -> Result<u64, AppError> {
445+
match value {
446+
Value::Null => Ok(0),
447+
Value::Number(number) => match number {
448+
NumberValue::UInt8(v) => Ok(*v as u64),
449+
NumberValue::UInt16(v) => Ok(*v as u64),
450+
NumberValue::UInt32(v) => Ok(*v as u64),
451+
NumberValue::UInt64(v) => Ok(*v),
452+
NumberValue::Int8(v) if *v >= 0 => Ok(*v as u64),
453+
NumberValue::Int16(v) if *v >= 0 => Ok(*v as u64),
454+
NumberValue::Int32(v) if *v >= 0 => Ok(*v as u64),
455+
NumberValue::Int64(v) if *v >= 0 => Ok(*v as u64),
456+
NumberValue::Float32(v) if *v >= 0.0 => Ok(v.trunc() as u64),
457+
NumberValue::Float64(v) if *v >= 0.0 => Ok(v.trunc() as u64),
458+
other => Err(AppError::Internal(format!(
459+
"unexpected {context} numeric value: {other:?}"
460+
))),
461+
},
462+
Value::String(text) => text
463+
.parse::<u64>()
464+
.map_err(|err| AppError::Internal(format!("failed to parse {context} as u64: {err}"))),
465+
other => Err(AppError::Internal(format!(
466+
"unexpected {context} value type: {other:?}"
467+
))),
468+
}
469+
}
470+
364471
async fn log_requests(req: Request<Body>, next: Next) -> Response {
365472
let method = req.method().clone();
366473
let uri = req.uri().clone();

src/databend/core.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ impl SchemaAdapter {
164164
}
165165
}
166166
}
167+
168+
pub fn build_index_stats_query(
169+
&self,
170+
table: &TableRef,
171+
expr: &LogqlExpr,
172+
bounds: &StatsQueryBounds,
173+
) -> Result<StatsQueryPlan, AppError> {
174+
match self {
175+
SchemaAdapter::Loki(schema) => schema.build_index_stats_query(table, expr, bounds),
176+
SchemaAdapter::Flat(schema) => schema.build_index_stats_query(table, expr, bounds),
177+
}
178+
}
167179
}
168180

169181
pub struct QueryBounds {
@@ -185,6 +197,11 @@ pub struct MetricRangeQueryBounds {
185197
pub window_ns: i64,
186198
}
187199

200+
pub struct StatsQueryBounds {
201+
pub start_ns: i64,
202+
pub end_ns: i64,
203+
}
204+
188205
#[derive(Clone, Copy, Default)]
189206
pub struct LabelQueryBounds {
190207
pub start_ns: Option<i64>,
@@ -238,6 +255,10 @@ pub struct MetricRangeQueryPlan {
238255
pub labels: MetricLabelsPlan,
239256
}
240257

258+
pub struct StatsQueryPlan {
259+
pub sql: String,
260+
}
261+
241262
#[derive(Clone)]
242263
pub enum MetricLabelsPlan {
243264
LokiFull,

src/databend/flat.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ use crate::{
2727

2828
use super::core::{
2929
LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan,
30-
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef,
31-
aggregate_value_select, ensure_line_column, ensure_timestamp_column, escape, execute_query,
32-
format_float_literal, is_line_candidate, is_numeric_type, line_filter_clause,
33-
matches_named_column, metric_bucket_cte, missing_required_column, quote_ident,
34-
range_bucket_value_expression, timestamp_literal, timestamp_offset_expr, value_to_timestamp,
30+
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds,
31+
StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_line_column,
32+
ensure_timestamp_column, escape, execute_query, format_float_literal, is_line_candidate,
33+
is_numeric_type, line_filter_clause, matches_named_column, metric_bucket_cte,
34+
missing_required_column, quote_ident, range_bucket_value_expression, timestamp_literal,
35+
timestamp_offset_expr, value_to_timestamp,
3536
};
3637

3738
#[derive(Clone)]
@@ -233,6 +234,56 @@ impl FlatSchema {
233234
}
234235
}
235236

237+
pub(crate) fn build_index_stats_query(
238+
&self,
239+
table: &TableRef,
240+
expr: &LogqlExpr,
241+
bounds: &StatsQueryBounds,
242+
) -> Result<StatsQueryPlan, AppError> {
243+
let ts_col = quote_ident(&self.timestamp_col);
244+
let mut clauses = vec![
245+
format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?),
246+
format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?),
247+
];
248+
for matcher in &expr.selectors {
249+
clauses.push(label_clause_flat(matcher, &self.label_cols, None)?);
250+
}
251+
clauses.extend(
252+
expr.filters
253+
.iter()
254+
.map(|f| line_filter_clause(quote_ident(&self.line_col), f)),
255+
);
256+
let where_clause = clauses.join(" AND ");
257+
let stream_expr = self.stream_identity_expr(None);
258+
let line_col = quote_ident(&self.line_col);
259+
let sql = format!(
260+
"SELECT \
261+
COUNT(DISTINCT {stream}) AS streams, \
262+
COUNT(DISTINCT {stream}) AS chunks, \
263+
COUNT(*) AS entries, \
264+
COALESCE(SUM(length({line_col})), 0) AS bytes \
265+
FROM {table} \
266+
WHERE {where_clause}",
267+
stream = stream_expr,
268+
table = table.fq_name()
269+
);
270+
Ok(StatsQueryPlan { sql })
271+
}
272+
273+
fn stream_identity_expr(&self, alias: Option<&str>) -> String {
274+
if self.label_cols.is_empty() {
275+
return "''".to_string();
276+
}
277+
let qualifier = alias.map(|name| format!("{name}.")).unwrap_or_default();
278+
let mut args = Vec::with_capacity(self.label_cols.len() + 1);
279+
args.push("'|'".to_string());
280+
for column in &self.label_cols {
281+
let qualified = format!("{qualifier}{}", quote_ident(&column.name));
282+
args.push(format!("COALESCE(CAST({qualified} AS STRING), '')"));
283+
}
284+
format!("concat_ws({})", args.join(", "))
285+
}
286+
236287
fn metric_stream_sql(
237288
&self,
238289
table: &TableRef,

src/databend/loki.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use crate::{
2424

2525
use super::core::{
2626
LabelQueryBounds, LogEntry, MetricLabelsPlan, MetricQueryBounds, MetricQueryPlan,
27-
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, TableColumn, TableRef,
28-
aggregate_value_select, ensure_labels_column, ensure_line_column, ensure_timestamp_column,
29-
escape, execute_query, format_float_literal, line_filter_clause, matches_line_column,
30-
matches_named_column, metric_bucket_cte, missing_required_column, parse_labels_value,
31-
quote_ident, range_bucket_value_expression, timestamp_literal, timestamp_offset_expr,
32-
value_to_timestamp,
27+
MetricRangeQueryBounds, MetricRangeQueryPlan, QueryBounds, SchemaConfig, StatsQueryBounds,
28+
StatsQueryPlan, TableColumn, TableRef, aggregate_value_select, ensure_labels_column,
29+
ensure_line_column, ensure_timestamp_column, escape, execute_query, format_float_literal,
30+
line_filter_clause, matches_line_column, matches_named_column, metric_bucket_cte,
31+
missing_required_column, parse_labels_value, quote_ident, range_bucket_value_expression,
32+
timestamp_literal, timestamp_offset_expr, value_to_timestamp,
3333
};
3434

3535
#[derive(Clone)]
@@ -228,6 +228,44 @@ impl LokiSchema {
228228
}
229229
}
230230

231+
pub(crate) fn build_index_stats_query(
232+
&self,
233+
table: &TableRef,
234+
expr: &LogqlExpr,
235+
bounds: &StatsQueryBounds,
236+
) -> Result<StatsQueryPlan, AppError> {
237+
let ts_col = quote_ident(&self.timestamp_col);
238+
let mut clauses = vec![
239+
format!("{ts_col} >= {}", timestamp_literal(bounds.start_ns)?),
240+
format!("{ts_col} <= {}", timestamp_literal(bounds.end_ns)?),
241+
];
242+
clauses.extend(
243+
expr.selectors
244+
.iter()
245+
.map(|m| label_clause_loki(m, quote_ident(&self.labels_col))),
246+
);
247+
clauses.extend(
248+
expr.filters
249+
.iter()
250+
.map(|f| line_filter_clause(quote_ident(&self.line_col), f)),
251+
);
252+
let where_clause = clauses.join(" AND ");
253+
let labels_col = quote_ident(&self.labels_col);
254+
let line_col = quote_ident(&self.line_col);
255+
let stream_hash = format!("city64withseed({labels_col}, 0)");
256+
let sql = format!(
257+
"SELECT \
258+
COUNT(DISTINCT {stream_hash}) AS streams, \
259+
COUNT(DISTINCT {stream_hash}) AS chunks, \
260+
COUNT(*) AS entries, \
261+
COALESCE(SUM(length({line_col})), 0) AS bytes \
262+
FROM {table} \
263+
WHERE {where_clause}",
264+
table = table.fq_name()
265+
);
266+
Ok(StatsQueryPlan { sql })
267+
}
268+
231269
fn metric_streams_sql(
232270
&self,
233271
table: &TableRef,

0 commit comments

Comments
 (0)