Skip to content

Commit c73b299

Browse files
jaymellsvix-james
authored andcommitted
Add redis OTEL metrics
This adds support exporting OTEL metrics and adds some basic monitoring of the various redis queues.
1 parent 3d9f26c commit c73b299

File tree

7 files changed

+247
-57
lines changed

7 files changed

+247
-57
lines changed

server/Cargo.lock

Lines changed: 14 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/svix-server/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ once_cell = "1.18.0"
3333
figment = { version = "0.10", features = ["toml", "env", "test"] }
3434
tracing = "0.1.35"
3535
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
36-
tracing-opentelemetry = "0.23.0"
37-
opentelemetry = "0.22.0"
38-
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
39-
opentelemetry-http = "0.11.0"
40-
opentelemetry-otlp = { version = "0.15.0" }
36+
tracing-opentelemetry = "0.24.0"
37+
opentelemetry = { version = "0.23.0", features = ["metrics"] }
38+
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
39+
opentelemetry-http = "0.12.0"
40+
opentelemetry-otlp = { version = "0.16.0", features = ["metrics"] }
4141
validator = { version = "0.16.0", features = ["derive"] }
4242
jwt-simple = "0.11.6"
4343
ed25519-compact = "2.1.1"
@@ -72,6 +72,7 @@ omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad33
7272
# Not a well-known author, and no longer gets updates => pinned.
7373
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
7474
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
75+
hex = "0.4.3"
7576

7677
[target.'cfg(not(target_env = "msvc"))'.dependencies]
7778
tikv-jemallocator = { version = "0.5", optional = true }

server/svix-server/src/lib.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ use std::{
1313

1414
use aide::axum::ApiRouter;
1515
use cfg::ConfigurationInner;
16+
use once_cell::sync::Lazy;
1617
use opentelemetry_otlp::WithExportConfig;
17-
use opentelemetry_sdk::runtime::Tokio;
18+
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio};
1819
use queue::TaskQueueProducer;
1920
use redis::RedisManager;
2021
use sea_orm::DatabaseConnection;
2122
use sentry::integrations::tracing::EventFilter;
23+
use svix_ksuid::{KsuidLike, KsuidMs};
2224
use tower::layer::layer_fn;
2325
use tower_http::{
2426
cors::{AllowHeaders, Any, CorsLayer},
@@ -44,6 +46,7 @@ pub mod core;
4446
pub mod db;
4547
pub mod error;
4648
pub mod expired_message_cleaner;
49+
pub mod metrics;
4750
pub mod openapi;
4851
pub mod queue;
4952
pub mod redis;
@@ -54,6 +57,9 @@ const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");
5457

5558
pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
5659

60+
pub static INSTANCE_ID: Lazy<String> =
61+
Lazy::new(|| hex::encode(KsuidMs::new(None, None).to_string()));
62+
5763
async fn graceful_shutdown_handler() {
5864
let ctrl_c = async {
5965
tokio::signal::ctrl_c()
@@ -83,6 +89,8 @@ async fn graceful_shutdown_handler() {
8389

8490
#[tracing::instrument(name = "app_start", level = "trace", skip_all)]
8591
pub async fn run(cfg: Configuration, listener: Option<TcpListener>) {
92+
let _metrics = setup_metrics(&cfg);
93+
8694
run_with_prefix(None, cfg, listener).await
8795
}
8896

@@ -325,6 +333,32 @@ pub fn setup_tracing(
325333
(registry, sentry_guard)
326334
}
327335

336+
pub fn setup_metrics(cfg: &ConfigurationInner) -> Option<SdkMeterProvider> {
337+
cfg.opentelemetry_address.as_ref().map(|addr| {
338+
let exporter = opentelemetry_otlp::new_exporter()
339+
.tonic()
340+
.with_endpoint(addr);
341+
342+
opentelemetry_otlp::new_pipeline()
343+
.metrics(Tokio)
344+
.with_delta_temporality()
345+
.with_exporter(exporter)
346+
.with_resource(opentelemetry_sdk::Resource::new(vec![
347+
opentelemetry::KeyValue::new(
348+
"service.name",
349+
cfg.opentelemetry_service_name.clone(),
350+
),
351+
opentelemetry::KeyValue::new("instance_id", INSTANCE_ID.to_owned()),
352+
opentelemetry::KeyValue::new(
353+
"service.version",
354+
option_env!("GITHUB_SHA").unwrap_or("unknown"),
355+
),
356+
]))
357+
.build()
358+
.unwrap()
359+
})
360+
}
361+
328362
pub fn setup_tracing_for_tests() {
329363
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
330364

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
mod redis;
2+
3+
pub fn init_metric<T, E: std::fmt::Debug>(result: Result<T, E>) -> Option<T> {
4+
match result {
5+
Ok(t) => Some(t),
6+
Err(e) => {
7+
tracing::error!(error = ?e, "Failed to initialize metric");
8+
None
9+
}
10+
}
11+
}
12+
13+
pub use self::redis::{RedisQueueMetrics, RedisQueueType};
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use opentelemetry::metrics::{Meter, ObservableGauge};
2+
use redis::{streams::StreamPendingReply, AsyncCommands as _};
3+
4+
use super::init_metric;
5+
use crate::{
6+
error::{Error, Result},
7+
redis::RedisManager,
8+
};
9+
10+
pub enum RedisQueueType<'a> {
11+
Stream(&'a str),
12+
StreamPending { stream: &'a str, group: &'a str },
13+
List(&'a str),
14+
SortedSet(&'a str),
15+
}
16+
17+
impl<'a> RedisQueueType<'a> {
18+
pub async fn queue_depth(&self, redis: &RedisManager) -> Result<u64> {
19+
let mut conn = redis.get().await?;
20+
match self {
21+
RedisQueueType::Stream(q) => conn
22+
.xlen(q)
23+
.await
24+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
25+
RedisQueueType::StreamPending { stream, group } => {
26+
let reply: StreamPendingReply = conn.xpending(stream, group).await?;
27+
Ok(reply.count() as _)
28+
}
29+
RedisQueueType::List(q) => conn
30+
.llen(q)
31+
.await
32+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
33+
RedisQueueType::SortedSet(q) => conn
34+
.zcard(q)
35+
.await
36+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone)]
42+
pub struct RedisQueueMetrics {
43+
main_queue: Option<ObservableGauge<u64>>,
44+
pending_queue: Option<ObservableGauge<u64>>,
45+
delayed_queue: Option<ObservableGauge<u64>>,
46+
}
47+
48+
impl RedisQueueMetrics {
49+
pub fn new(meter: &Meter) -> Self {
50+
let main_queue = init_metric(
51+
meter
52+
.u64_observable_gauge("svix.queue.depth_main")
53+
.try_init(),
54+
);
55+
56+
let pending_queue = init_metric(
57+
meter
58+
.u64_observable_gauge("svix.queue.pending_msgs")
59+
.try_init(),
60+
);
61+
62+
let delayed_queue = init_metric(
63+
meter
64+
.u64_observable_gauge("svix.queue.depth_delayed")
65+
.try_init(),
66+
);
67+
68+
Self {
69+
main_queue,
70+
pending_queue,
71+
delayed_queue,
72+
}
73+
}
74+
pub async fn record(
75+
&self,
76+
redis: &RedisManager,
77+
main_queue: &RedisQueueType<'_>,
78+
pending_queue: &RedisQueueType<'_>,
79+
delayed_queue: &RedisQueueType<'_>,
80+
) {
81+
main_queue
82+
.queue_depth(redis)
83+
.await
84+
.map(|d| self.record_main_queue_depth(d))
85+
.unwrap_or_else(|e| {
86+
tracing::warn!("Failed to record queue depth: {e}");
87+
});
88+
pending_queue
89+
.queue_depth(redis)
90+
.await
91+
.map(|d| self.record_pending_queue_depth(d))
92+
.unwrap_or_else(|e| {
93+
tracing::warn!("Failed to record queue depth: {e}");
94+
});
95+
delayed_queue
96+
.queue_depth(redis)
97+
.await
98+
.map(|d| self.record_delayed_queue_depth(d))
99+
.unwrap_or_else(|e| {
100+
tracing::warn!("Failed to record queue depth: {e}");
101+
});
102+
}
103+
104+
fn record_main_queue_depth(&self, value: u64) {
105+
if let Some(recorder) = &self.main_queue {
106+
recorder.observe(value, &[]);
107+
}
108+
}
109+
fn record_pending_queue_depth(&self, value: u64) {
110+
if let Some(recorder) = &self.pending_queue {
111+
recorder.observe(value, &[]);
112+
}
113+
}
114+
fn record_delayed_queue_depth(&self, value: u64) {
115+
if let Some(recorder) = &self.delayed_queue {
116+
recorder.observe(value, &[]);
117+
}
118+
}
119+
}

server/svix-server/src/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
mod redis;
2+
mod worker;
3+
4+
pub use svix_server_core::metrics::*;
5+
6+
pub use self::{
7+
redis::{RedisQueueMetrics, RedisQueueType},
8+
worker::WorkerMetrics,
9+
};

0 commit comments

Comments
 (0)