diff --git a/Cargo.lock b/Cargo.lock index 8298f09..d18b4b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4895,6 +4895,7 @@ dependencies = [ "clap", "flate2", "futures", + "lazy_static", "ouroboros", "prometheus-client 0.24.0", "serde", @@ -4911,6 +4912,7 @@ dependencies = [ "sqd-storage", "tikv-jemallocator", "tokio", + "tower-http", "tracing", "tracing-subscriber", "url", @@ -5467,6 +5469,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags 2.8.0", + "bytes", + "http 1.2.0", + "http-body 1.0.1", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", + "uuid", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/crates/hotblocks/Cargo.toml b/crates/hotblocks/Cargo.toml index a311847..41fad8e 100644 --- a/crates/hotblocks/Cargo.toml +++ b/crates/hotblocks/Cargo.toml @@ -11,6 +11,7 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["std"] } clap = { workspace = true, features = ["derive"] } flate2 = { workspace = true } +lazy_static = "1.4.0" futures = { workspace = true } ouroboros = { workspace = true } prometheus-client = { workspace = true } @@ -28,6 +29,7 @@ sqd-query = { path = "../query", features = ["storage"] } sqd-storage = { path = "../storage" } tikv-jemallocator = "0.6.0" tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6.1", features = ["request-id", "trace"] } tracing = { workspace = true, features = ["valuable"] } tracing-subscriber = { workspace = true, features = ["env-filter", "json", "valuable"] } -url = { workspace = true, features = ["serde"] } \ No newline at end of file +url = { workspace = true, features = ["serde"] } diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 754138d..b9ab86f 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -1,12 +1,17 @@ use crate::cli::App; -use crate::errors::{BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, UnknownDataset}; +use crate::dataset_controller::DatasetController; +use crate::errors::{ + BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, + UnknownDataset, +}; use crate::query::QueryResponse; use crate::types::RetentionStrategy; use anyhow::bail; use async_stream::try_stream; use axum::body::{Body, Bytes}; -use axum::extract::Path; +use axum::extract::{Path, Request}; use axum::http::StatusCode; +use axum::http::Uri; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{BoxError, Extension, Json, Router}; @@ -16,8 +21,9 @@ use sqd_primitives::BlockRef; use sqd_query::{Query, UnexpectedBaseBlock}; use sqd_storage::db::DatasetId; use std::sync::Arc; -use tracing::error; - +use std::time::Instant; +use tower_http::request_id::{MakeRequestUuid, RequestId, SetRequestIdLayer}; +use tracing::{Instrument, error}; macro_rules! json_ok { ($json:expr) => { @@ -25,75 +31,170 @@ macro_rules! json_ok { }; } - macro_rules! text { ($status:expr, $($arg:tt)+) => { ($status, format!($($arg)*)).into_response() }; } - macro_rules! get_dataset { ($app:expr, $dataset_id:expr) => { match $app.data_service.get_dataset($dataset_id) { Ok(ds) => ds, - Err(err) => return text!(StatusCode::NOT_FOUND, "{}", err) + Err(err) => return text!(StatusCode::NOT_FOUND, "{}", err), } }; } - type AppRef = Arc; - pub fn build_api(app: App) -> Router { Router::new() - .route("/", get(|| async { "Welcome to SQD hot block data service!" })) + .route( + "/", + get(|| async { "Welcome to SQD hot block data service!" }), + ) .route("/datasets/{id}/stream", post(stream)) .route("/datasets/{id}/finalized-stream", post(finalized_stream)) .route("/datasets/{id}/head", get(get_head)) .route("/datasets/{id}/finalized-head", get(get_finalized_head)) - .route("/datasets/{id}/retention", get(get_retention).post(set_retention)) + .route( + "/datasets/{id}/retention", + get(get_retention).post(set_retention), + ) .route("/datasets/{id}/status", get(get_status)) .route("/datasets/{id}/metadata", get(get_metadata)) .route("/metrics", get(get_metrics)) .route("/rocksdb/stats", get(get_rocks_stats)) .route("/rocksdb/prop/{cf}/{name}", get(get_rocks_prop)) + .fallback(handle_404) + .layer(axum::middleware::from_fn(middleware)) + .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid::default())) .layer(Extension(Arc::new(app))) } +pub async fn middleware(req: Request, next: axum::middleware::Next) -> impl IntoResponse { + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let version = req.version(); + let start = Instant::now(); + let request_id = req + .extensions() + .get::() + .expect("RequestId should be set by SetRequestIdLayer") + .header_value() + .to_str() + .expect("Request ID should be a valid string"); + + let span = tracing::span!(tracing::Level::INFO, "http_request", request_id); + let mut response = next.run(req).instrument(span.clone()).await; + let latency = start.elapsed(); + + let mut labels = response + .extensions_mut() + .remove::() + .map(|labels| labels.0) + .unwrap_or(Vec::new()); + labels.push(("status".to_string(), response.status().as_str().to_owned())); + + span.in_scope(|| { + tracing::info!( + target: "http_request", + method, + path, + ?version, + status = %response.status(), + ?latency, + "HTTP request processed" + ); + }); + + crate::metrics::report_http_response(&labels, latency); + + response +} + +#[derive(Clone)] +pub struct Labels(Vec<(String, String)>); + +pub struct ResponseWithMetadata { + pub labels: Labels, + pub response: Option, +} + +impl ResponseWithMetadata { + fn new() -> Self { + Self { + labels: Labels(vec![]), + response: None, + } + } + + pub fn with_dataset_id(mut self, id: DatasetId) -> Self { + self.labels + .0 + .push(("dataset_name".to_string(), id.as_str().to_owned())); + self + } + + pub fn with_endpoint(mut self, endpoint: &str) -> Self { + self.labels + .0 + .push(("endpoint".to_string(), endpoint.to_string())); + self + } + + pub fn with_response(mut self, clause: F) -> Self + where + F: FnOnce() -> Response, + { + self.response = Some(clause()); + self + } +} + +impl IntoResponse for ResponseWithMetadata { + fn into_response(self) -> Response { + let mut response = self.response.expect("response is mandatory method"); + response.extensions_mut().insert(self.labels); + response + } +} async fn stream( Extension(app): Extension, Path(dataset_id): Path, - Json(query): Json -) -> Response -{ - stream_internal(app, dataset_id, query, false).await + Json(query): Json, +) -> impl IntoResponse { + let response = stream_internal(app, dataset_id, query, false).await; + ResponseWithMetadata::new() + .with_dataset_id(dataset_id) + .with_endpoint("/stream") + .with_response(|| response) } - async fn finalized_stream( Extension(app): Extension, Path(dataset_id): Path, - Json(query): Json -) -> Response -{ - stream_internal(app, dataset_id, query, true).await + Json(query): Json, +) -> impl IntoResponse { + let response = stream_internal(app, dataset_id, query, true).await; + ResponseWithMetadata::new() + .with_dataset_id(dataset_id) + .with_endpoint("/finalized_stream") + .with_response(|| response) } - async fn stream_internal( app: AppRef, dataset_id: DatasetId, query: Query, - finalized: bool -) -> Response -{ + finalized: bool, +) -> Response { let dataset = get_dataset!(app, dataset_id); if let Err(err) = query.validate() { - return text!(StatusCode::BAD_REQUEST, "{}", err) + return text!(StatusCode::BAD_REQUEST, "{}", err); } let query_result = if finalized { @@ -114,7 +215,9 @@ async fn stream_internal( // For finalized stream, use the finalized head as the head res = res.header("x-sqd-head-number", finalized_head.number); } else { - let head_block = finalized_head.number.max(dataset.get_head_block_number().unwrap_or(0)); + let head_block = finalized_head + .number + .max(dataset.get_head_block_number().unwrap_or(0)); res = res.header("x-sqd-head-number", head_block); } res = res.header("x-sqd-finalized-head-number", finalized_head.number); @@ -123,18 +226,17 @@ async fn stream_internal( res = res.header("x-sqd-head-number", head_block); } - let body = Body::from_stream( - stream_query_response(stream) - ); + let body = Body::from_stream(stream_query_response(stream)); res.body(body).unwrap() - }, - Err(err) => error_to_response(err) + } + Err(err) => error_to_response(err), } } - -fn stream_query_response(mut stream: QueryResponse) -> impl TryStream { +fn stream_query_response( + mut stream: QueryResponse, +) -> impl TryStream { try_stream! { while let Some(pack_result) = stream.next_data_pack().await.transpose() { match pack_result { @@ -155,7 +257,6 @@ fn stream_query_response(mut stream: QueryResponse) -> impl TryStream Response { if let Some(above_the_head) = err.downcast_ref::() { let mut res = Response::builder().status(204); @@ -163,16 +264,17 @@ fn error_to_response(err: anyhow::Error) -> Response { res = res.header("x-sqd-finalized-head-number", head.number); res = res.header("x-sqd-finalized-head-hash", head.hash.as_str()); } - return res.body(Body::empty()).unwrap() + return res.body(Body::empty()).unwrap(); } if let Some(fork) = err.downcast_ref::() { return ( StatusCode::CONFLICT, Json(BaseBlockConflict { - previous_blocks: &fork.prev_blocks - }) - ).into_response() + previous_blocks: &fork.prev_blocks, + }), + ) + .into_response(); } let status_code = if err.is::() { @@ -198,75 +300,82 @@ fn error_to_response(err: anyhow::Error) -> Response { (status_code, message).into_response() } - #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct BaseBlockConflict<'a> { - previous_blocks: &'a [BlockRef] + previous_blocks: &'a [BlockRef], } - async fn get_finalized_head( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_finalized_head() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/finalized_head") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_finalized_head() + } + }) } - async fn get_head( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_head() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/head") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_head() + } + }) } - async fn get_retention( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_retention() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/retention") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_retention() + } + }) } - async fn set_retention( Extension(app): Extension, Path(dataset_id): Path, - Json(strategy): Json -) -> Response -{ - let ds = get_dataset!(app, dataset_id); - if app.api_controlled_datasets.contains(&dataset_id) { - ds.retain(strategy); - text!(StatusCode::OK, "OK") - } else { - text!( - StatusCode::FORBIDDEN, - "dataset '{}' can't be managed via API", - dataset_id - ) - } + Json(strategy): Json, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/retention") + .with_response(|| { + let ds = get_dataset!(app, dataset_id); + if app.api_controlled_datasets.contains(&dataset_id) { + ds.retain(strategy); + text!(StatusCode::OK, "OK") + } else { + text!( + StatusCode::FORBIDDEN, + "dataset '{}' can't be managed via API", + dataset_id + ) + } + }) } - async fn get_status( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - let ctl = get_dataset!(app, dataset_id); - - let read_status = || -> anyhow::Result<_> { + Path(dataset_id): Path, +) -> impl IntoResponse { + let read_status = |ctl: Arc| -> anyhow::Result<_> { let db = app.db.snapshot(); let Some(label) = db.get_label(dataset_id)? else { @@ -278,7 +387,7 @@ async fn get_status( "kind": label.kind(), "retentionStrategy": ctl.get_retention(), "data": null - }}) + }}); }; let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { @@ -298,68 +407,85 @@ async fn get_status( }}) }; - match read_status() { - Ok(status) => json_ok!(status), - Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err) - } + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/status") + .with_response(|| { + let ctl = get_dataset!(app, dataset_id); + match read_status(ctl) { + Ok(status) => json_ok!(status), + Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err), + } + }) } async fn get_metadata( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - get_dataset!(app, dataset_id); - - let db = app.db.snapshot(); - - let first_chunk = match db.get_first_chunk(dataset_id) { - Ok(chunk) => chunk, - Err(err) => return text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err) - }; - - json_ok!(serde_json::json! {{ - "dataset": dataset_id, - "aliases": [], - "real_time": true, - "start_block": first_chunk.map(|chunk| chunk.first_block()), - }}) + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/metadata") + .with_response(|| { + get_dataset!(app, dataset_id); + + let db = app.db.snapshot(); + + let first_chunk = match db.get_first_chunk(dataset_id) { + Ok(chunk) => chunk, + Err(err) => return text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err), + }; + + json_ok!(serde_json::json! {{ + "dataset": dataset_id, + "aliases": [], + "real_time": true, + "start_block": first_chunk.map(|chunk| chunk.first_block()), + }}) + }) } - -async fn get_metrics( - Extension(app): Extension -) -> Response -{ +async fn get_metrics(Extension(app): Extension) -> impl IntoResponse { let mut metrics = String::new(); prometheus_client::encoding::text::encode(&mut metrics, &app.metrics_registry) .expect("String IO is infallible"); - metrics.into_response() + ResponseWithMetadata::new() + .with_endpoint("/metrics") + .with_response(|| metrics.into_response()) } - -async fn get_rocks_stats( - Extension(app): Extension -) -> Response -{ - if let Some(stats) = app.db.get_statistics() { - stats.into_response() - } else { - text!(StatusCode::INTERNAL_SERVER_ERROR, "rocksdb stats are not enabled") - } +async fn get_rocks_stats(Extension(app): Extension) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("/rocks_stats") + .with_response(|| { + if let Some(stats) = app.db.get_statistics() { + stats.into_response() + } else { + text!( + StatusCode::INTERNAL_SERVER_ERROR, + "rocksdb stats are not enabled" + ) + } + }) } - async fn get_rocks_prop( Extension(app): Extension, - Path((cf, name)): Path<(String, String)> -) -> Response -{ - match app.db.get_property(&cf, &name) { - Ok(Some(s)) => s.into_response(), - Ok(None) => text!(StatusCode::NOT_FOUND, "property not found"), - Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{}", err) - } + Path((cf, name)): Path<(String, String)>, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("/rocks_prop") + .with_response(|| match app.db.get_property(&cf, &name) { + Ok(Some(s)) => s.into_response(), + Ok(None) => text!(StatusCode::NOT_FOUND, "property not found"), + Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{}", err), + }) +} + +async fn handle_404(uri: Uri) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("404_fallback") + .with_response(|| text!(StatusCode::NOT_FOUND, "Not found: {}", uri.path())) } diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index e378a83..bb1e5d3 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -7,6 +7,7 @@ use clap::Parser; use sqd_storage::db::{DatabaseSettings, DatasetId}; use std::collections::BTreeSet; use std::sync::Arc; +use std::time::Duration; #[derive(Parser, Debug)] @@ -100,6 +101,7 @@ impl CLI { Arc::new(builder.build()) }; + query_service.spawn_metrics_reporter(Duration::from_secs(5)); Ok(App { db, @@ -109,4 +111,4 @@ impl CLI { metrics_registry }) } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/dataset_config.rs b/crates/hotblocks/src/dataset_config.rs index 95df6c8..204e73b 100644 --- a/crates/hotblocks/src/dataset_config.rs +++ b/crates/hotblocks/src/dataset_config.rs @@ -8,11 +8,14 @@ use url::Url; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RetentionConfig { + // Fixed, starting from the block number FromBlock { number: BlockNumber, parent_hash: Option, }, + // Moving window that keeps up to N blocks Head(u64), + // Retention is set dynamically from the portal Api, None, } diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index d9f7bd0..caf61b3 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -1,70 +1,102 @@ use crate::types::DBRef; use anyhow::bail; use prometheus_client::collector::Collector; -use prometheus_client::encoding::{DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder}; -use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::MetricType; +use prometheus_client::encoding::{ + DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder, +}; +use prometheus_client::metrics::{ + MetricType, + counter::Counter, + family::Family, + gauge::Gauge, + histogram::{Histogram, exponential_buckets}, +}; use prometheus_client::registry::Registry; use sqd_storage::db::{DatasetId, ReadSnapshot}; use std::fmt::Write; -use std::sync::LazyLock; +use std::time::Duration; use tracing::error; - #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)] struct DatasetLabel { - dataset: DatasetValue + dataset: DatasetValue, } - #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq)] struct DatasetValue(DatasetId); - impl EncodeLabelValue for DatasetValue { fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> { encoder.write_str(self.0.as_str()) } } - macro_rules! dataset_label { ($dataset_id:expr) => { DatasetLabel { - dataset: DatasetValue($dataset_id) + dataset: DatasetValue($dataset_id), } }; } +type Labels = Vec<(String, String)>; -macro_rules! metric { - ($name:ident, $t:ty) => { - static $name: LazyLock<$t> = LazyLock::new(Default::default); - }; +fn buckets(start: f64, count: usize) -> impl Iterator { + std::iter::successors(Some(start), |x| Some(x * 10.)) + .flat_map(|x| [x, x * 1.5, x * 2.5, x * 5.0]) + .take(count) } - -metric!(QUERY_ERROR_TOO_MANY_TASKS, Counter); -metric!(QUERY_ERROR_TOO_MANY_DATA_WAITERS, Counter); - +lazy_static::lazy_static! { + pub static ref HTTP_STATUS: Family = Default::default(); + pub static ref HTTP_TTFB: Family = + Family::new_with_constructor(|| Histogram::new(buckets(0.001, 20))); + + pub static ref QUERY_ERROR_TOO_MANY_TASKS: Counter = Default::default(); + pub static ref QUERY_ERROR_TOO_MANY_DATA_WAITERS: Counter = Default::default(); + + pub static ref ACTIVE_QUERIES: Gauge = Default::default(); + pub static ref COMPLETED_QUERIES: Counter = Default::default(); + + pub static ref STREAM_DURATIONS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.01, 2.0, 20))); + pub static ref STREAM_BYTES: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))); + pub static ref STREAM_BLOCKS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); + pub static ref STREAM_CHUNKS: Family = + Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); + pub static ref STREAM_BYTES_PER_SECOND: Histogram = Histogram::new(exponential_buckets(100., 3.0, 20)); + pub static ref STREAM_BLOCKS_PER_SECOND: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 3.0, 20))); + + pub static ref QUERIED_BLOCKS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); + pub static ref QUERIED_CHUNKS: Family = + Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); +} pub fn report_query_too_many_tasks_error() { QUERY_ERROR_TOO_MANY_TASKS.inc(); } - pub fn report_query_too_many_data_waiters_error() { QUERY_ERROR_TOO_MANY_DATA_WAITERS.inc(); } +pub fn report_http_response(labels: &Vec<(String, String)>, to_first_byte: Duration) { + HTTP_STATUS.get_or_create(&labels).inc(); + HTTP_TTFB + .get_or_create(&labels) + .observe(to_first_byte.as_secs_f64()); +} #[derive(Debug)] struct DatasetMetricsCollector { db: DBRef, - datasets: Vec + datasets: Vec, } - impl Collector for DatasetMetricsCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { let db = self.db.snapshot(); @@ -82,7 +114,7 @@ impl Collector for DatasetMetricsCollector { dataset_id ); Ok(()) - } + }; } } @@ -90,92 +122,144 @@ impl Collector for DatasetMetricsCollector { } } - fn collect_dataset_metrics( encoder: &mut DescriptorEncoder, db: &ReadSnapshot, - dataset_id: DatasetId -) -> anyhow::Result<()> -{ + dataset_id: DatasetId, +) -> anyhow::Result<()> { let Some(label) = db.get_label(dataset_id)? else { - return Ok(()) + return Ok(()); }; let Some(first_chunk) = db.get_first_chunk(dataset_id)? else { - return Ok(()) + return Ok(()); }; let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { bail!("first chunk exists, while last does not") }; - encoder.encode_descriptor( - "hotblocks_first_block", - "First block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &first_chunk.first_block() - )?; - - encoder.encode_descriptor( - "hotblocks_last_block", - "Last block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &last_chunk.last_block() - )?; - - encoder.encode_descriptor( - "hotblocks_last_block_timestamp_ms", - "Timestamp of the last block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &last_chunk.last_block_time().unwrap_or(0) - )?; - - encoder.encode_descriptor( - "hotblocks_last_finalized_block", - "Last finalized block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &label.finalized_head().map_or(0, |h| h.number) - )?; + encoder + .encode_descriptor( + "hotblocks_first_block", + "First block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&first_chunk.first_block())?; + + encoder + .encode_descriptor( + "hotblocks_last_block", + "Last block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_chunk.last_block())?; + + encoder + .encode_descriptor( + "hotblocks_last_block_timestamp_ms", + "Timestamp of the last block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_chunk.last_block_time().unwrap_or(0))?; + + encoder + .encode_descriptor( + "hotblocks_last_finalized_block", + "Last finalized block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&label.finalized_head().map_or(0, |h| h.number))?; Ok(()) } - pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { - let mut registry = Registry::default(); + let mut top_registry = Registry::default(); + let registry = top_registry.sub_registry_with_prefix("hotblocks"); registry.register( - "hotblocks_query_error_too_many_tasks", + "query_error_too_many_tasks", "Number of query tasks rejected due to task queue overflow", - QUERY_ERROR_TOO_MANY_TASKS.clone() + QUERY_ERROR_TOO_MANY_TASKS.clone(), ); registry.register( - "hotblocks_query_error_too_many_data_waiters", + "query_error_too_many_data_waiters", "Number of queries rejected, because data is not yet available and there are too many data waiters", QUERY_ERROR_TOO_MANY_DATA_WAITERS.clone() ); - registry.register_collector(Box::new(DatasetMetricsCollector { - db, - datasets - })); + registry.register( + "http_status", + "Number of sent HTTP responses", + HTTP_STATUS.clone(), + ); + registry.register( + "http_seconds_to_first_byte", + "Time to first byte of HTTP responses", + HTTP_TTFB.clone(), + ); - registry -} \ No newline at end of file + registry.register( + "stream_bytes", + "Number of bytes per stream", + STREAM_BYTES.clone(), + ); + registry.register( + "stream_blocks", + "Number of blocks per stream", + STREAM_BLOCKS.clone(), + ); + registry.register( + "stream_chunks", + "Number of chunks per stream", + STREAM_CHUNKS.clone(), + ); + registry.register( + "stream_bytes_per_second", + "Completed streams bandwidth", + STREAM_BYTES_PER_SECOND.clone(), + ); + registry.register( + "stream_blocks_per_second", + "Completed streams speed in blocks", + STREAM_BLOCKS_PER_SECOND.clone(), + ); + registry.register( + "stream_duration_seconds", + "Durations of completed streams", + STREAM_DURATIONS.clone(), + ); + registry.register( + "queried_blocks", + "Number of blocks per running query", + QUERIED_BLOCKS.clone(), + ); + registry.register( + "queried_chunks", + "Number of chunks per running query", + QUERIED_CHUNKS.clone(), + ); + registry.register( + "active_queries", + "Number of active queries", + ACTIVE_QUERIES.clone(), + ); + registry.register( + "completed_queries", + "Number of completed queries", + COMPLETED_QUERIES.clone(), + ); + top_registry.register_collector(Box::new(DatasetMetricsCollector { db, datasets })); + + top_registry +} diff --git a/crates/hotblocks/src/query/executor.rs b/crates/hotblocks/src/query/executor.rs index f3f97d5..78d27e2 100644 --- a/crates/hotblocks/src/query/executor.rs +++ b/crates/hotblocks/src/query/executor.rs @@ -1,66 +1,81 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; +use crate::metrics::{ACTIVE_QUERIES, COMPLETED_QUERIES, report_query_too_many_tasks_error}; use std::sync::Arc; - +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::time::{self, Duration}; #[derive(Clone)] pub struct QueryExecutor { + // number of concurrent queries in_flight: Arc, + // limit for concurrent queries max_pending_tasks: usize, - urgency: usize + urgency: usize, } - impl QueryExecutor { pub fn new(max_pending_tasks: usize, urgency: usize) -> Self { Self { in_flight: Arc::new(AtomicUsize::new(0)), max_pending_tasks, - urgency + urgency, } } pub fn get_slot(&self) -> Option { - if self.in_flight.fetch_add(1, Ordering::SeqCst) < self.max_pending_tasks { + let active_queries = self.in_flight.fetch_add(1, Ordering::SeqCst); + if active_queries < self.max_pending_tasks { Some(QuerySlot { in_flight: self.in_flight.clone(), - urgency: self.urgency + urgency: self.urgency, }) } else { self.in_flight.fetch_sub(1, Ordering::SeqCst); - crate::metrics::report_query_too_many_tasks_error(); + report_query_too_many_tasks_error(); None } } -} + pub fn spawn_metrics_reporter(&self, interval: Duration) { + let active_count = self.in_flight.clone(); + + tokio::spawn(async move { + let mut ticker = time::interval(interval); + + loop { + ticker.tick().await; + let active_queries = active_count.load(Ordering::SeqCst); + ACTIVE_QUERIES.set(active_queries as i64); + } + }); + } +} pub struct QuerySlot { in_flight: Arc, - urgency: usize + urgency: usize, } - impl Drop for QuerySlot { fn drop(&mut self) { self.in_flight.fetch_sub(1, Ordering::SeqCst); + COMPLETED_QUERIES.inc(); } } - impl QuerySlot { pub fn time_limit(&self) -> usize { let in_flight = self.in_flight.load(Ordering::SeqCst); if in_flight == 0 { - return 100 + return 100; } let time = self.urgency * sqd_polars::POOL.current_num_threads() / in_flight; time.min(100) } - + pub async fn run(self, task: F) -> R where F: FnOnce(&Self) -> R + Send + 'static, - R: Send + 'static + R: Send + 'static, { let (tx, rx) = tokio::sync::oneshot::channel(); @@ -69,7 +84,7 @@ impl QuerySlot { let result = task(&slot); let _ = tx.send(result); }); - + rx.await.expect("task panicked") } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index a0568bb..c5d0943 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -1,22 +1,76 @@ use super::executor::{QueryExecutor, QuerySlot}; -use super::running::RunningQuery; +use super::running::{RunningQuery, RunningQueryStats}; use crate::errors::Busy; +use crate::metrics::{ + STREAM_BLOCKS, STREAM_BLOCKS_PER_SECOND, STREAM_BYTES, STREAM_BYTES_PER_SECOND, STREAM_CHUNKS, + STREAM_DURATIONS, +}; use crate::types::DBRef; use anyhow::bail; use bytes::Bytes; use sqd_primitives::BlockRef; use sqd_query::Query; use sqd_storage::db::DatasetId; +use std::time::Duration; use std::time::Instant; +const DEFAULT_QUERY_LIMIT: Duration = Duration::from_secs(10); pub struct QueryResponse { executor: QueryExecutor, runner: Option>, - start: Instant, - finalized_head: Option + finalized_head: Option, + dataset_id: DatasetId, + stats: QueryStreamStats, + time_limit: Duration, } +pub struct QueryStreamStats { + response_chunks: u64, + response_blocks: u64, + response_bytes: u64, + start_time: Instant, +} + +impl QueryStreamStats { + pub fn new() -> Self { + Self { + response_chunks: 0, + response_blocks: 0, + response_bytes: 0, + start_time: Instant::now(), + } + } + + pub fn add_running_stats(&mut self, running_stats: &RunningQueryStats) { + self.response_chunks = self + .response_chunks + .saturating_add(running_stats.chunks_read); + self.response_blocks = self + .response_blocks + .saturating_add(running_stats.blocks_read); + } + + fn report_metrics(&self, dataset_id: &DatasetId) { + let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; + + let duration = self.start_time.elapsed().as_secs_f64(); + let bytes = self.response_bytes as f64; + let blocks = self.response_blocks as f64; + let chunks = self.response_chunks as f64; + + STREAM_DURATIONS.get_or_create(&labels).observe(duration); + STREAM_BYTES.get_or_create(&labels).observe(bytes); + STREAM_BLOCKS.get_or_create(&labels).observe(blocks); + STREAM_CHUNKS.get_or_create(&labels).observe(chunks); + if duration > 0.0 { + STREAM_BYTES_PER_SECOND.observe(bytes / duration); + STREAM_BLOCKS_PER_SECOND + .get_or_create(&labels) + .observe(blocks / duration); + } + } +} impl QueryResponse { pub(super) async fn new( @@ -25,25 +79,30 @@ impl QueryResponse { dataset_id: DatasetId, query: Query, only_finalized: bool, - ) -> anyhow::Result - { + time_limit: Option, + ) -> anyhow::Result { let Some(slot) = executor.get_slot() else { bail!(Busy) }; - let start = Instant::now(); - - let mut runner = slot.run(move |slot| -> anyhow::Result<_> { - let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; - next_run(&mut runner, slot)?; - Ok(runner) - }).await?; - + let stats = QueryStreamStats::new(); + let mut runner = slot + .run(move |slot| -> anyhow::Result<_> { + let mut runner = + RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; + next_run(&mut runner, slot)?; + Ok(runner) + }) + .await?; + + let time_limit = time_limit.unwrap_or(DEFAULT_QUERY_LIMIT); let response = Self { executor, finalized_head: runner.take_finalized_head(), runner: Some(runner), - start + stats, + dataset_id, + time_limit, }; Ok(response) @@ -55,17 +114,28 @@ impl QueryResponse { pub async fn next_data_pack(&mut self) -> anyhow::Result> { let Some(mut runner) = self.runner.take() else { - return Ok(None) + return Ok(None); }; - if !runner.has_next_chunk() || self.start.elapsed().as_secs() > 10 { - return Ok(Some(runner.finish())) + if !runner.has_next_chunk() { + return Ok(self.finish_with_runner(runner)); + } + + if self.stats.start_time.elapsed() > self.time_limit { + // Client is expected to retry the query based on the data that they have received + tracing::warn!( + "terminate query that has been running for more than {} seconds", + self.time_limit.as_secs() + ); + return Ok(self.finish_with_runner(runner)); } if runner.buffered_bytes() > 0 { let bytes = runner.take_buffered_bytes(); + self.stats.response_bytes = + self.stats.response_bytes.saturating_add(bytes.len() as u64); self.runner = Some(runner); - return Ok(Some(bytes)) + return Ok(Some(bytes)); } let Some(slot) = self.executor.get_slot() else { @@ -73,31 +143,51 @@ impl QueryResponse { bail!(Busy); }; - let (mut runner, result) = slot.run(move |slot| { - let mut runner = runner; - let result = next_run(&mut runner, slot); - (runner, result) - }).await; + let (mut runner, result) = slot + .run(move |slot| { + let mut runner = runner; + let result = next_run(&mut runner, slot); + (runner, result) + }) + .await; if let Err(err) = result { self.runner = Some(runner); - return Err(err) + return Err(err); } - if !runner.has_next_chunk() || self.start.elapsed().as_secs() > 10 { - Ok(Some(runner.finish())) - } else { + if runner.has_next_chunk() { let bytes = runner.take_buffered_bytes(); + self.stats.response_bytes = + self.stats.response_bytes.saturating_add(bytes.len() as u64); self.runner = Some(runner); Ok(Some(bytes)) + } else { + return Ok(self.finish_with_runner(runner)); } } + fn finish_with_runner(&mut self, runner: Box) -> Option { + runner.stats().report_metrics(&self.dataset_id); + self.stats.add_running_stats(runner.stats()); + let bytes = runner.finish(); + self.stats.response_bytes = self.stats.response_bytes.saturating_add(bytes.len() as u64); + Some(bytes) + } + pub fn finish(&mut self) -> Bytes { - self.runner.take().map(|runner| runner.finish()).unwrap_or_default() + self.runner + .take() + .map(|runner| self.finish_with_runner(runner).unwrap()) + .unwrap_or_default() } } +impl Drop for QueryResponse { + fn drop(&mut self) { + self.stats.report_metrics(&self.dataset_id) + } +} fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { let start = Instant::now(); @@ -109,7 +199,7 @@ fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { runner.write_next_chunk()?; if !runner.has_next_chunk() || runner.buffered_bytes() > 512 * 1024 { - return Ok(()) + return Ok(()); } elapsed = start.elapsed().as_millis(); @@ -119,7 +209,7 @@ fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { let next_chunk_eta = next_chunk_eta.min(chunk_time * 5).max(chunk_time / 5); let eta = elapsed + next_chunk_eta; if eta > slot.time_limit() as u128 { - return Ok(()) + return Ok(()); } } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index 87cd0f4..31835b5 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -1,22 +1,46 @@ use crate::errors::{BlockItemIsNotAvailable, QueryKindMismatch}; use crate::errors::{BlockRangeMissing, QueryIsAboveTheHead}; +use crate::metrics::{QUERIED_BLOCKS, QUERIED_CHUNKS}; use crate::query::static_snapshot::{StaticChunkIterator, StaticChunkReader, StaticSnapshot}; use crate::types::{DBRef, DatasetKind}; use anyhow::{anyhow, bail, ensure}; use bytes::{BufMut, Bytes, BytesMut}; -use flate2::write::GzEncoder; use flate2::Compression; +use flate2::write::GzEncoder; use sqd_primitives::{BlockNumber, BlockRef}; use sqd_query::{JsonLinesWriter, Plan, Query}; use sqd_storage::db::{Chunk as StorageChunk, DatasetId}; use std::io::Write; - struct LeftOver { chunk: StaticChunkReader, - next_block: BlockNumber + next_block: BlockNumber, } +pub struct RunningQueryStats { + pub chunks_read: u64, + pub blocks_read: u64, +} + +impl RunningQueryStats { + pub fn new() -> Self { + Self { + chunks_read: 0, + blocks_read: 0, + } + } + + pub fn report_metrics(&self, dataset_id: &DatasetId) { + let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; + + QUERIED_BLOCKS + .get_or_create(&labels) + .observe(self.blocks_read as f64); + QUERIED_CHUNKS + .get_or_create(&labels) + .observe(self.chunks_read as f64); + } +} pub struct RunningQuery { plan: Plan, @@ -25,18 +49,17 @@ pub struct RunningQuery { next_chunk: Option>, chunk_iterator: StaticChunkIterator, finalized_head: Option, - buf: GzEncoder> + buf: GzEncoder>, + stats: RunningQueryStats, } - impl RunningQuery { pub fn new( db: DBRef, dataset_id: DatasetId, query: &Query, only_finalized: bool, - ) -> anyhow::Result - { + ) -> anyhow::Result { let snapshot = StaticSnapshot::new(db); let finalized_head = match snapshot.get_label(dataset_id)? { @@ -54,13 +77,9 @@ impl RunningQuery { } }; - let mut chunk_iterator = StaticChunkIterator::new( - snapshot, - dataset_id, - query.first_block(), - None - ); - + let mut chunk_iterator = + StaticChunkIterator::new(snapshot, dataset_id, query.first_block(), None); + let Some(first_chunk) = chunk_iterator.next().transpose()? else { bail!(QueryIsAboveTheHead { finalized_head: None @@ -74,7 +93,10 @@ impl RunningQuery { last_block: first_chunk.first_block() - 1 } ); - + + let mut stats = RunningQueryStats::new(); + stats.chunks_read += 1; + let plan = if query.first_block() == first_chunk.first_block() { if let Some(parent_hash) = query.parent_block_hash() { ensure!( @@ -118,10 +140,8 @@ impl RunningQuery { next_chunk: Some(Ok(first_chunk)), chunk_iterator, finalized_head, - buf: GzEncoder::new( - BytesMut::new().writer(), - Compression::fast() - ) + buf: GzEncoder::new(BytesMut::new().writer(), Compression::fast()), + stats, }) } @@ -129,6 +149,10 @@ impl RunningQuery { self.finalized_head.take() } + pub fn stats(&self) -> &RunningQueryStats { + &self.stats + } + pub fn buffered_bytes(&self) -> usize { self.buf.get_ref().get_ref().len() } @@ -144,7 +168,7 @@ impl RunningQuery { .into_inner() .freeze() } - + pub fn has_next_chunk(&self) -> bool { self.next_chunk.is_some() || self.left_over.is_some() } @@ -153,15 +177,26 @@ impl RunningQuery { /// /// Everything written to the buffer is always well-formed. pub fn write_next_chunk(&mut self) -> anyhow::Result<()> { - let chunk = if let Some(left_over) = self.left_over.take() { - self.plan.set_first_block(left_over.next_block); - left_over.chunk + let (chunk, first_block_queried) = if let Some(left_over) = self.left_over.take() { + let first_block = left_over.next_block; + self.plan.set_first_block(first_block); + (left_over.chunk, first_block) } else { - let chunk = self.next_chunk()?; - self.chunk_iterator.snapshot().create_chunk_reader(chunk) + let storage_chunk = self.next_chunk()?; + // Increment chunks_downloaded when we fetch a new chunk + self.stats.chunks_read += 1; + let chunk = self + .chunk_iterator + .snapshot() + .create_chunk_reader(storage_chunk); + let first_block = chunk.first_block(); + (chunk, first_block) }; - if self.last_block.map_or(false, |end| end < chunk.last_block()) { + if self + .last_block + .map_or(false, |end| end < chunk.last_block()) + { let last_block = self.last_block; self.plan.set_last_block(last_block); } else { @@ -176,7 +211,7 @@ impl RunningQuery { item_name: err.table_name, first_block: chunk.first_block(), last_block: chunk.last_block() - }) + }); } err }); @@ -184,17 +219,22 @@ impl RunningQuery { // no matter what, we are moving to the next chunk self.plan.set_first_block(None); self.plan.set_parent_block_hash(None); - + let Some(mut block_writer) = query_result? else { - return Ok(()) + return Ok(()); }; + let blocks_written = block_writer.last_block() - first_block_queried + 1; + self.stats.blocks_read += blocks_written; + if chunk.last_block() > block_writer.last_block() - && self.last_block.map_or(true, |end| end > block_writer.last_block()) + && self + .last_block + .map_or(true, |end| end > block_writer.last_block()) { self.left_over = Some(LeftOver { chunk, - next_block: block_writer.last_block() + 1 + next_block: block_writer.last_block() + 1, }) } @@ -209,7 +249,7 @@ impl RunningQuery { .expect("IO errors are not possible"); self.buf.flush().expect("IO errors are not possible"); - + Ok(()) } @@ -218,14 +258,16 @@ impl RunningQuery { bail!("no more chunks left") }; - self.next_chunk = self.chunk_iterator.next() + self.next_chunk = self + .chunk_iterator + .next() .transpose() .map(|maybe_next_chunk| { let next_chunk = maybe_next_chunk?; let is_continuous = chunk.last_block() + 1 == next_chunk.first_block(); - let is_requested = self.last_block.map_or(true, |end| { - next_chunk.first_block() <= end - }); + let is_requested = self + .last_block + .map_or(true, |end| next_chunk.first_block() <= end); if is_continuous && is_requested { Some(next_chunk) } else { @@ -239,10 +281,9 @@ impl RunningQuery { /// Size of the next chunk (in blocks) pub fn next_chunk_size(&self) -> usize { - self.left_over.as_ref() - .map(|lo| { - lo.chunk.last_block() - lo.chunk.first_block() + 1 - }) + self.left_over + .as_ref() + .map(|lo| lo.chunk.last_block() - lo.chunk.first_block() + 1) .or_else(|| { let chunk = self.next_chunk.as_ref()?.as_ref().ok()?; let size = chunk.last_block() - chunk.first_block() + 1; @@ -250,4 +291,4 @@ impl RunningQuery { }) .unwrap_or(0) as usize } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index c6bf9ae..401ce50 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -153,8 +153,13 @@ impl QueryService { dataset.dataset_id(), query, finalized, + None ).await } + + pub fn spawn_metrics_reporter(&self, interval: Duration) { + self.executor.spawn_metrics_reporter(interval) + } } @@ -189,4 +194,4 @@ impl<'a> Drop for WaitingSlot<'a> { fn drop(&mut self) { self.waiters.fetch_sub(1, Ordering::SeqCst); } -} \ No newline at end of file +}