Skip to content

Commit dad51de

Browse files
Convert the warp servers into
1 parent 07eb91b commit dad51de

File tree

6 files changed

+73
-165
lines changed

6 files changed

+73
-165
lines changed

aggregation_mode/Cargo.lock

Lines changed: 0 additions & 110 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ db = { path = "./db" }
1717
sp1-sdk = "5.0.0"
1818
risc0-zkvm = { version = "3.0.3" }
1919
prometheus = { version = "0.13.4", features = ["process"] }
20-
warp = "0.3.7"
2120

2221
[profile.release]
2322
opt-level = 3

aggregation_mode/gateway/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ serde = { workspace = true }
88
serde_json = { workspace = true }
99
serde_yaml = { workspace = true }
1010
prometheus = { workspace = true }
11-
warp = { workspace = true }
1211
agg_mode_sdk = { path = "../sdk"}
1312
aligned-sdk = { workspace = true }
1413
sp1-sdk = { workspace = true }

aggregation_mode/gateway/src/metrics.rs

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,60 @@
1-
use prometheus::{self, histogram_opts, register_histogram};
2-
use warp::{reject::Rejection, reply::Reply, Filter};
1+
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
2+
use prometheus::{self, histogram_opts, Encoder, Histogram, Registry, TextEncoder};
3+
use std::sync::Arc;
34

45
#[derive(Clone, Debug)]
56
pub struct GatewayMetrics {
6-
pub time_elapsed_db_post: prometheus::Histogram,
7+
pub registry: Registry,
8+
pub time_elapsed_db_post: Histogram,
79
}
810

911
impl GatewayMetrics {
1012
pub fn start(metrics_port: u16) -> Result<Self, prometheus::Error> {
11-
let registry = prometheus::Registry::new();
13+
let registry = Registry::new();
1214

13-
let time_elapsed_db_post = register_histogram!(histogram_opts!(
15+
let time_elapsed_db_post = Histogram::with_opts(histogram_opts!(
1416
"time_elapsed_db_post",
1517
"Time elapsed in DB posts"
1618
))?;
1719

1820
registry.register(Box::new(time_elapsed_db_post.clone()))?;
1921

20-
let metrics_route = warp::path!("metrics")
21-
.and(warp::any().map(move || registry.clone()))
22-
.and_then(GatewayMetrics::metrics_handler);
22+
// Arc is used because metrics are a shared resource accessed by both the background and metrics HTTP
23+
// server and the application code, across multiple Actix worker threads. The server outlives start(),
24+
// so the data must be static and safely shared between threads.
25+
let metrics = Arc::new(Self {
26+
registry,
27+
time_elapsed_db_post,
28+
});
2329

24-
tokio::task::spawn(async move {
25-
warp::serve(metrics_route)
26-
.run(([0, 0, 0, 0], metrics_port))
27-
.await;
30+
let server_metrics = metrics.clone();
31+
tokio::spawn(async move {
32+
let _ = HttpServer::new(move || {
33+
App::new()
34+
.app_data(web::Data::new(server_metrics.clone()))
35+
.route("/metrics", web::get().to(GatewayMetrics::metrics_handler))
36+
})
37+
.bind(("0.0.0.0", metrics_port))
38+
.expect("failed to bind metrics server")
39+
.run()
40+
.await;
2841
});
2942

30-
Ok(Self {
31-
time_elapsed_db_post,
32-
})
43+
Ok(Arc::try_unwrap(metrics).unwrap_or_else(|arc| (*arc).clone()))
3344
}
3445

35-
pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
36-
use prometheus::Encoder;
37-
let encoder = prometheus::TextEncoder::new();
46+
async fn metrics_handler(metrics: web::Data<Arc<GatewayMetrics>>) -> impl Responder {
47+
let encoder = TextEncoder::new();
48+
let metric_families = metrics.registry.gather();
3849

3950
let mut buffer = Vec::new();
40-
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
51+
if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
4152
tracing::error!("could not encode prometheus metrics: {e}");
42-
};
43-
let res = String::from_utf8(buffer.clone())
44-
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
45-
.unwrap_or_default();
46-
buffer.clear();
53+
}
4754

48-
Ok(res)
55+
HttpResponse::Ok()
56+
.insert_header(("Content-Type", encoder.format_type()))
57+
.body(buffer)
4958
}
5059

5160
pub fn register_db_response_time_post(&self, value: f64) {

aggregation_mode/payments_poller/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ serde_json = { workspace = true }
99
serde_yaml = { workspace = true }
1010
aligned-sdk = { workspace = true }
1111
prometheus = { workspace = true }
12-
warp = { workspace = true }
1312
tracing = { version = "0.1", features = ["log"] }
1413
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
1514
actix-web = "4"

aggregation_mode/payments_poller/src/metrics.rs

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,63 @@
1-
use prometheus::{self, opts, register_gauge};
2-
use warp::{reject::Rejection, reply::Reply, Filter};
1+
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
2+
use prometheus::{self, opts, Encoder, Gauge, Registry, TextEncoder};
3+
use std::sync::Arc;
34

45
#[derive(Clone, Debug)]
56
pub struct PaymentsPollerMetrics {
6-
pub last_processed_block: prometheus::Gauge,
7+
pub registry: Registry,
8+
pub last_processed_block: Gauge,
79
}
810

911
impl PaymentsPollerMetrics {
1012
pub fn start(metrics_port: u16) -> Result<Self, prometheus::Error> {
11-
let registry = prometheus::Registry::new();
13+
let registry = Registry::new();
1214

13-
let last_processed_block = register_gauge!(opts!(
15+
let last_processed_block = Gauge::with_opts(opts!(
1416
"last_processed_block",
1517
"Last processed block by poller"
1618
))?;
1719

1820
registry.register(Box::new(last_processed_block.clone()))?;
1921

20-
let metrics_route = warp::path!("metrics")
21-
.and(warp::any().map(move || registry.clone()))
22-
.and_then(PaymentsPollerMetrics::metrics_handler);
22+
// Arc is used because metrics are a shared resource accessed by both the background and metrics HTTP
23+
// server and the application code, across multiple Actix worker threads. The server outlives start(),
24+
// so the data must be static and safely shared between threads.
25+
let metrics = Arc::new(Self {
26+
registry,
27+
last_processed_block,
28+
});
2329

24-
tokio::task::spawn(async move {
25-
warp::serve(metrics_route)
26-
.run(([0, 0, 0, 0], metrics_port))
27-
.await;
30+
let server_metrics = metrics.clone();
31+
tokio::spawn(async move {
32+
let _ = HttpServer::new(move || {
33+
App::new()
34+
.app_data(web::Data::new(server_metrics.clone()))
35+
.route(
36+
"/metrics",
37+
web::get().to(PaymentsPollerMetrics::metrics_handler),
38+
)
39+
})
40+
.bind(("0.0.0.0", metrics_port))
41+
.expect("failed to bind metrics server")
42+
.run()
43+
.await;
2844
});
2945

30-
Ok(Self {
31-
last_processed_block,
32-
})
46+
Ok(Arc::try_unwrap(metrics).unwrap_or_else(|arc| (*arc).clone()))
3347
}
3448

35-
pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
36-
use prometheus::Encoder;
37-
let encoder = prometheus::TextEncoder::new();
49+
async fn metrics_handler(metrics: web::Data<Arc<PaymentsPollerMetrics>>) -> impl Responder {
50+
let encoder = TextEncoder::new();
51+
let metric_families = metrics.registry.gather();
3852

3953
let mut buffer = Vec::new();
40-
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
41-
eprintln!("could not encode prometheus metrics: {e}");
42-
};
43-
let res = String::from_utf8(buffer.clone())
44-
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
45-
.unwrap_or_default();
46-
buffer.clear();
47-
48-
Ok(res)
54+
if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
55+
tracing::error!("could not encode prometheus metrics: {e}");
56+
}
57+
58+
HttpResponse::Ok()
59+
.insert_header(("Content-Type", encoder.format_type()))
60+
.body(buffer)
4961
}
5062

5163
pub fn register_last_processed_block(&self, value: u64) {

0 commit comments

Comments
 (0)