Skip to content

Commit 31ba7a4

Browse files
syepestrueleo
andauthored
Add Prometheus metrics (#283)
This PR adds metrics for S3 / local store response times, query performance, staging files, total events ingest and alerts. Signed-off-by: Sebastian YEPES <[email protected]> Co-authored-by: Satyam Singh <[email protected]>
1 parent c36da96 commit 31ba7a4

File tree

11 files changed

+371
-70
lines changed

11 files changed

+371
-70
lines changed

Cargo.lock

Lines changed: 36 additions & 36 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ actix-web-httpauth = "0.8"
1111
actix-web = { version = "4.3", features = ["rustls"] }
1212
actix-cors = "0.6"
1313
actix-files = "0.6"
14-
actix-web-prometheus = { version = "0.1", features = ["process"] }
14+
actix-web-prometheus = { version = "0.1" }
1515
prometheus = { version = "0.13", features = ["process"] }
1616
anyhow = { version = "1.0", features = ["backtrace"] }
1717
arrow-schema = { version = "31.0", features = ["serde"] }
@@ -40,7 +40,7 @@ futures = "0.3"
4040
fs_extra = "1.3"
4141
http = "0.2"
4242
humantime-serde = "1.1"
43-
lazy_static = "1.4.0"
43+
lazy_static = "1.4"
4444
log = "0.4"
4545
num_cpus = "1.15"
4646
md-5 = "0.10"
@@ -67,9 +67,9 @@ actix-web-static-files = "4.0"
6767
static-files = "0.2"
6868
ulid = { version = "1.0", features = ["serde"] }
6969
ureq = { version = "2.6", features = ["json"] }
70-
hex = "0.4.3"
71-
itertools = "0.10.5"
72-
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
70+
hex = "0.4"
71+
itertools = "0.10"
72+
xxhash-rust = { version = "0.8", features = ["xxh3"] }
7373

7474
[build-dependencies]
7575
static-files = "0.2"

server/src/alerts/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
*/
1818

1919
use serde::{Deserialize, Serialize};
20+
use std::fmt;
2021

2122
pub mod rule;
2223
pub mod target;
2324

25+
use crate::metrics::ALERTS_STATES;
2426
use crate::utils::uid::Uid;
2527

2628
pub use self::rule::Rule;
@@ -51,6 +53,13 @@ impl Alert {
5153
AlertState::Listening | AlertState::Firing => (),
5254
alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
5355
let context = self.get_context(stream_name, alert_state);
56+
ALERTS_STATES
57+
.with_label_values(&[
58+
context.stream.as_str(),
59+
context.alert_name.as_str(),
60+
context.alert_state.to_string().as_str(),
61+
])
62+
.inc();
5463
for target in &self.targets {
5564
target.call(context.clone());
5665
}
@@ -124,3 +133,14 @@ impl Default for AlertState {
124133
Self::Listening
125134
}
126135
}
136+
137+
impl fmt::Display for AlertState {
138+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139+
match *self {
140+
AlertState::Listening => write!(f, "Listening"),
141+
AlertState::SetToFiring => write!(f, "SetToFiring"),
142+
AlertState::Firing => write!(f, "Firing"),
143+
AlertState::Resolved => write!(f, "Resolved"),
144+
}
145+
}
146+
}

server/src/handlers/event.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
use actix_web::{web, HttpRequest, HttpResponse};
2020
use serde_json::Value;
21+
use std::time::Instant;
2122

2223
use crate::event;
24+
use crate::metrics::QUERY_EXECUTE_TIME;
2325
use crate::option::CONFIG;
2426
use crate::query::Query;
2527
use crate::response::QueryResponse;
@@ -34,17 +36,23 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
3436
const SEPARATOR: char = '^';
3537

3638
pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResponse, QueryError> {
39+
let time = Instant::now();
3740
let json = json.into_inner();
3841
let query = Query::parse(json)?;
3942

4043
let storage = CONFIG.storage().get_object_store();
41-
4244
let query_result = query.execute(storage).await;
43-
44-
query_result
45+
let query_result = query_result
4546
.map(Into::<QueryResponse>::into)
4647
.map(|response| response.to_http())
47-
.map_err(|e| e.into())
48+
.map_err(|e| e.into());
49+
50+
let time = time.elapsed().as_secs_f64();
51+
QUERY_EXECUTE_TIME
52+
.with_label_values(&[query.stream_name.as_str()])
53+
.observe(time);
54+
55+
query_result
4856
}
4957

5058
// Handler for POST /api/v1/ingest

server/src/main.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use actix_web::dev::ServiceRequest;
2121
use actix_web::{middleware, web, App, HttpServer};
2222
use actix_web_httpauth::extractors::basic::BasicAuth;
2323
use actix_web_httpauth::middleware::HttpAuthentication;
24-
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
24+
use actix_web_prometheus::PrometheusMetrics;
2525
use actix_web_static_files::ResourceFiles;
2626
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
2727
use log::warn;
@@ -45,6 +45,7 @@ mod banner;
4545
mod event;
4646
mod handlers;
4747
mod metadata;
48+
mod metrics;
4849
mod migration;
4950
mod option;
5051
mod query;
@@ -69,11 +70,8 @@ async fn main() -> anyhow::Result<()> {
6970
CONFIG.validate_storage(&*storage).await;
7071
let metadata = storage::resolve_parseable_metadata().await?;
7172
banner::print(&CONFIG, metadata);
72-
let prometheus = PrometheusMetricsBuilder::new(env!("CARGO_PKG_NAME"))
73-
.registry(prometheus::default_registry().clone())
74-
.endpoint("/metrics")
75-
.build()
76-
.unwrap();
73+
let prometheus = metrics::build_metrics_handler();
74+
CONFIG.storage().register_store_metrics(&prometheus);
7775

7876
migration::run_migration(&CONFIG).await?;
7977

server/src/metadata.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::sync::{Arc, RwLock};
2323

2424
use crate::alerts::Alerts;
2525
use crate::event::Event;
26+
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
2627
use crate::stats::{Stats, StatsCounter};
2728
use crate::storage::ObjectStorage;
2829

@@ -180,6 +181,12 @@ impl STREAM_INFO {
180181

181182
stream.stats.add_ingestion_size(size);
182183
stream.stats.increase_event_by_one();
184+
EVENTS_INGESTED
185+
.with_label_values(&[stream_name.clone(), "json"])
186+
.inc();
187+
EVENTS_INGESTED_SIZE
188+
.with_label_values(&[stream_name.clone(), "json"])
189+
.add(size as i64);
183190

184191
Ok(())
185192
}

0 commit comments

Comments
 (0)