Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,24 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha
| Endpoint | Description |
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. The `step` parameter is parsed but ignored because the adapter streams raw log lines rather than resampled metrics. |
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require `step` to match the range selector duration. |
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |

`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.

### Metric queries

The adapter currently supports a narrow LogQL metric surface area:

- Range functions: `count_over_time` and `rate`. The latter reports per-second values (`COUNT / window_seconds`).
- Optional outer aggregations: `sum`, `avg`, `min`, `max`, `count`, each with `by (...)`. `without` or other modifiers return `errorType:bad_data`.
- Pipelines: only `drop` stages are honored (labels are removed after aggregation to match Loki semantics). Any other stage still results in `errorType:bad_data`.
- `/loki/api/v1/query_range` metric calls must provide `step`, and `step` must equal the range selector duration so Databend can materialize every bucket in a single SQL statement; the adapter never fans out multiple queries or aggregates in memory.
- `/loki/api/v1/query` metric calls reuse the same expressions but evaluate them over `[time - range, time]`.

Both schema adapters (loki VARIANT labels and flat wide tables) translate the metric expression into one SQL statement that joins generated buckets with the raw rows via `generate_series`, so all aggregation happens inside Databend. Non-metric queries continue to stream raw logs.

## Logging

By default the adapter configures `env_logger` with `databend_loki_adapter` at `info` level and every other module at `warn`. This keeps the startup flow visible without flooding the console with dependency logs. To override the levels, set `RUST_LOG` just like any other `env_logger` application, e.g.:
Expand Down
123 changes: 119 additions & 4 deletions src/app/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ use serde::Deserialize;
use std::time::Instant;

use crate::{
databend::{LabelQueryBounds, QueryBounds, SqlOrder, execute_query},
databend::{
LabelQueryBounds, MetricQueryBounds, MetricRangeQueryBounds, QueryBounds, SqlOrder,
execute_query,
},
error::AppError,
logql::DurationValue,
};

use super::{
responses::{LabelsResponse, LokiResponse, rows_to_streams},
responses::{LabelsResponse, LokiResponse, metric_matrix, metric_vector, rows_to_streams},
state::{AppState, DEFAULT_LOOKBACK_NS},
};

Expand Down Expand Up @@ -72,9 +76,31 @@ async fn instant_query(
Query(params): Query<InstantQueryParams>,
) -> Result<Json<LokiResponse>, AppError> {
let target_ns = params.time.unwrap_or_else(current_time_ns);
log::debug!(
"instant query received: query=`{}` limit={:?} time_ns={}",
params.query,
params.limit,
target_ns
);
if let Some(response) = try_constant_vector(&params.query, target_ns) {
return Ok(Json(response));
}
if let Some(metric) = state.parse_metric(&params.query)? {
let duration_ns = metric.range.duration.as_nanoseconds();
let start_ns = target_ns.saturating_sub(duration_ns);
let plan = state.schema().build_metric_query(
state.table(),
&metric,
&MetricQueryBounds {
start_ns,
end_ns: target_ns,
},
)?;
log::debug!("instant metric SQL: {}", plan.sql);
let rows = execute_query(state.client(), &plan.sql).await?;
let samples = state.schema().parse_metric_rows(rows, &plan)?;
return Ok(Json(metric_vector(target_ns, samples)));
}
let expr = state.parse(&params.query)?;
let start_ns = target_ns.saturating_sub(DEFAULT_LOOKBACK_NS);
let limit = state.clamp_limit(params.limit);
Expand All @@ -90,6 +116,13 @@ async fn instant_query(
},
)?;

log::debug!(
"instant query SQL (start_ns={}, end_ns={}, limit={}): {}",
start_ns,
target_ns,
limit,
sql
);
let rows = execute_query(state.client(), &sql).await?;
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
Ok(Json(LokiResponse::success(streams)))
Expand All @@ -99,8 +132,14 @@ async fn range_query(
State(state): State<AppState>,
Query(params): Query<RangeQueryParams>,
) -> Result<Json<LokiResponse>, AppError> {
let expr = state.parse(&params.query)?;
let _ = params.step;
log::debug!(
"range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?}",
params.query,
params.limit,
params.start,
params.end,
params.step
);
let start = params
.start
.ok_or_else(|| AppError::BadRequest("start is required".into()))?;
Expand All @@ -114,6 +153,37 @@ async fn range_query(
));
}

if let Some(metric) = state.parse_metric(&params.query)? {
let step_raw = params
.step
.as_deref()
.ok_or_else(|| AppError::BadRequest("step is required for metric queries".into()))?;
let step_duration = parse_step_duration(step_raw)?;
let step_ns = step_duration.as_nanoseconds();
let window_ns = metric.range.duration.as_nanoseconds();
if step_ns != window_ns {
return Err(AppError::BadRequest(
"metric range queries require step to match the range selector duration".into(),
));
}
let plan = state.schema().build_metric_range_query(
state.table(),
&metric,
&MetricRangeQueryBounds {
start_ns: start,
end_ns: end,
step_ns,
window_ns,
},
)?;
log::debug!("range metric SQL: {}", plan.sql);
let rows = execute_query(state.client(), &plan.sql).await?;
let samples = state.schema().parse_metric_matrix_rows(rows, &plan)?;
return Ok(Json(metric_matrix(samples)));
}

let expr = state.parse(&params.query)?;

let limit = state.clamp_limit(params.limit);
let sql = state.schema().build_query(
state.table(),
Expand All @@ -126,11 +196,56 @@ async fn range_query(
},
)?;

log::debug!(
"range query SQL (start_ns={}, end_ns={}, limit={}): {}",
start,
end,
limit,
sql
);
let rows = execute_query(state.client(), &sql).await?;
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
Ok(Json(LokiResponse::success(streams)))
}

fn parse_step_duration(step_raw: &str) -> Result<DurationValue, AppError> {
match DurationValue::parse_literal(step_raw) {
Ok(value) => Ok(value),
Err(literal_err) => match parse_numeric_step_seconds(step_raw) {
Ok(value) => Ok(value),
Err(numeric_err) => Err(AppError::BadRequest(format!(
"invalid step duration `{step_raw}`: {literal_err}; {numeric_err}"
))),
},
}
}

fn parse_numeric_step_seconds(step_raw: &str) -> Result<DurationValue, String> {
let trimmed = step_raw.trim();
if trimmed.is_empty() {
return Err("numeric seconds cannot be empty".into());
}
let seconds: f64 = trimmed
.parse()
.map_err(|err| format!("failed to parse numeric seconds: {err}"))?;
if !seconds.is_finite() {
return Err("numeric seconds must be finite".into());
}
if seconds <= 0.0 {
return Err("numeric seconds must be positive".into());
}
let nanos = seconds * 1_000_000_000.0;
if nanos <= 0.0 {
return Err("numeric seconds are too small".into());
}
if nanos > i64::MAX as f64 {
return Err("numeric seconds exceed supported range".into());
}
let nanos = nanos.round() as i64;
DurationValue::new(nanos)
.map_err(|err| format!("failed to convert numeric seconds to duration: {err}"))
}

async fn label_names(
State(state): State<AppState>,
Query(params): Query<LabelsQueryParams>,
Expand Down
85 changes: 84 additions & 1 deletion src/app/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use std::collections::BTreeMap;
use databend_driver::Row;
use serde::Serialize;

use crate::{databend::SchemaAdapter, error::AppError, logql::Pipeline};
use crate::{
databend::{MetricMatrixSample, MetricSample, SchemaAdapter},
error::AppError,
logql::Pipeline,
};

pub(crate) fn rows_to_streams(
schema: &SchemaAdapter,
Expand All @@ -43,6 +47,34 @@ pub(crate) fn rows_to_streams(
Ok(result)
}

pub(crate) fn metric_vector(timestamp_ns: i64, samples: Vec<MetricSample>) -> LokiResponse {
let mut vectors = Vec::with_capacity(samples.len());
for sample in samples {
vectors.push(LokiVectorSample::new(
sample.labels,
timestamp_ns,
sample.value,
));
}
LokiResponse::vector(vectors)
}

pub(crate) fn metric_matrix(samples: Vec<MetricMatrixSample>) -> LokiResponse {
let mut buckets: BTreeMap<String, MatrixBucket> = BTreeMap::new();
for sample in samples {
let key = serde_json::to_string(&sample.labels).unwrap_or_else(|_| "{}".to_string());
let bucket = buckets
.entry(key)
.or_insert_with(|| MatrixBucket::new(sample.labels.clone()));
bucket.values.push((sample.timestamp_ns, sample.value));
}
let mut series = Vec::with_capacity(buckets.len());
for bucket in buckets.into_values() {
series.push(bucket.into_series());
}
LokiResponse::matrix(series)
}

struct StreamBucket {
labels: BTreeMap<String, String>,
values: Vec<(i128, String)>,
Expand Down Expand Up @@ -70,6 +102,33 @@ impl StreamBucket {
}
}

struct MatrixBucket {
labels: BTreeMap<String, String>,
values: Vec<(i64, f64)>,
}

impl MatrixBucket {
fn new(labels: BTreeMap<String, String>) -> Self {
Self {
labels,
values: Vec::new(),
}
}

fn into_series(mut self) -> LokiMatrixSeries {
self.values.sort_by_key(|(ts, _)| *ts);
let values = self
.values
.into_iter()
.map(|(ts, value)| VectorValue::new(ts, value))
.collect();
LokiMatrixSeries {
metric: self.labels,
values,
}
}
}

#[derive(Serialize)]
pub(crate) struct LokiResponse {
status: &'static str,
Expand Down Expand Up @@ -102,6 +161,15 @@ impl LokiResponse {
},
}
}

pub(crate) fn matrix(series: Vec<LokiMatrixSeries>) -> Self {
Self {
status: "success",
data: LokiData {
result: LokiResult::Matrix { result: series },
},
}
}
}

#[derive(Serialize)]
Expand All @@ -117,6 +185,8 @@ enum LokiResult {
Streams { result: Vec<LokiStream> },
#[serde(rename = "vector")]
Vector { result: Vec<LokiVectorSample> },
#[serde(rename = "matrix")]
Matrix { result: Vec<LokiMatrixSeries> },
}

#[derive(Serialize)]
Expand All @@ -138,6 +208,19 @@ impl LokiVectorSample {
value: VectorValue::new(timestamp_ns, value),
}
}

pub(crate) fn new(metric: BTreeMap<String, String>, timestamp_ns: i64, value: f64) -> Self {
Self {
metric,
value: VectorValue::new(timestamp_ns, value),
}
}
}

#[derive(Serialize)]
pub(crate) struct LokiMatrixSeries {
metric: BTreeMap<String, String>,
values: Vec<VectorValue>,
}

struct VectorValue {
Expand Down
6 changes: 5 additions & 1 deletion src/app/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
resolve_table_ref,
},
error::AppError,
logql::{LogqlExpr, LogqlParser},
logql::{LogqlExpr, LogqlParser, MetricExpr},
};

pub(crate) const DEFAULT_LIMIT: u64 = 500;
Expand Down Expand Up @@ -84,6 +84,10 @@ impl AppState {
self.parser.parse(query).map_err(AppError::from)
}

pub fn parse_metric(&self, query: &str) -> Result<Option<MetricExpr>, AppError> {
self.parser.parse_metric(query).map_err(AppError::from)
}

pub fn clamp_limit(&self, requested: Option<u64>) -> u64 {
requested
.and_then(|value| (value > 0).then_some(value))
Expand Down
Loading
Loading