Skip to content

Commit df0b8c0

Browse files
authored
refactor: avoid lazy init for batcher metrics (#1236)
1 parent 47f77eb commit df0b8c0

File tree

6 files changed

+93
-78
lines changed

6 files changed

+93
-78
lines changed

batcher/aligned-batcher/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct BatcherConfigFromYaml {
4242
pub max_batch_size: usize,
4343
pub eth_ws_reconnects: usize,
4444
pub pre_verification_is_enabled: bool,
45+
pub metrics_port: u16,
4546
pub non_paying: Option<NonPayingConfigFromYaml>,
4647
}
4748

batcher/aligned-batcher/src/lib.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub struct Batcher {
7575
pre_verification_is_enabled: bool,
7676
non_paying_config: Option<NonPayingConfig>,
7777
posting_batch: Mutex<bool>,
78+
pub metrics: metrics::BatcherMetrics,
7879
}
7980

8081
impl Batcher {
@@ -101,6 +102,13 @@ impl Batcher {
101102
.await
102103
.expect("Failed to get ethereum websocket provider");
103104

105+
log::info!(
106+
"Starting metrics server on port {}",
107+
config.batcher.metrics_port
108+
);
109+
let metrics = metrics::BatcherMetrics::start(config.batcher.metrics_port)
110+
.expect("Failed to start metrics server");
111+
104112
let eth_ws_provider_fallback = Provider::connect_with_reconnects(
105113
&config.eth_ws_url_fallback,
106114
config.batcher.eth_ws_reconnects,
@@ -201,6 +209,7 @@ impl Batcher {
201209
non_paying_config,
202210
posting_batch: Mutex::new(false),
203211
batch_state: Mutex::new(batch_state),
212+
metrics,
204213
}
205214
}
206215

@@ -213,7 +222,7 @@ impl Batcher {
213222

214223
// Let's spawn the handling of each connection in a separate task.
215224
while let Ok((stream, addr)) = listener.accept().await {
216-
metrics::OPEN_CONNECTIONS.inc();
225+
self.metrics.open_connections.inc();
217226
let batcher = self.clone();
218227
tokio::spawn(batcher.handle_connection(stream, addr));
219228
}
@@ -296,7 +305,7 @@ impl Batcher {
296305
Ok(_) => info!("{} disconnected", &addr),
297306
}
298307

299-
metrics::OPEN_CONNECTIONS.dec();
308+
self.metrics.open_connections.dec();
300309
Ok(())
301310
}
302311

@@ -316,7 +325,7 @@ impl Batcher {
316325
};
317326
let msg_nonce = client_msg.verification_data.nonce;
318327
debug!("Received message with nonce: {msg_nonce:?}",);
319-
metrics::RECEIVED_PROOFS.inc();
328+
self.metrics.received_proofs.inc();
320329

321330
// * ---------------------------------------------------*
322331
// * Perform validations over the message *
@@ -1016,7 +1025,9 @@ impl Batcher {
10161025

10171026
let proof_submitters = finalized_batch.iter().map(|entry| entry.sender).collect();
10181027

1019-
metrics::GAS_PRICE_USED_ON_LATEST_BATCH.set(gas_price.as_u64() as i64);
1028+
self.metrics
1029+
.gas_price_used_on_latest_batch
1030+
.set(gas_price.as_u64() as i64);
10201031

10211032
match self
10221033
.create_new_task(
@@ -1029,7 +1040,7 @@ impl Batcher {
10291040
{
10301041
Ok(_) => {
10311042
info!("Batch verification task created on Aligned contract");
1032-
metrics::SENT_BATCHES.inc();
1043+
self.metrics.sent_batches.inc();
10331044
Ok(())
10341045
}
10351046
Err(e) => {
@@ -1038,7 +1049,7 @@ impl Batcher {
10381049
e
10391050
);
10401051

1041-
metrics::REVERTED_BATCHES.inc();
1052+
self.metrics.reverted_batches.inc();
10421053
Err(e)
10431054
}
10441055
}

batcher/aligned-batcher/src/main.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use std::sync::Arc;
55
use clap::Parser;
66
use env_logger::Env;
77

8-
use aligned_batcher::metrics;
98
use aligned_batcher::{types::errors::BatcherError, Batcher};
10-
use warp::Filter;
119

1210
/// Batcher main flow:
1311
/// There are two main tasks spawned: `listen_connections` and `listen_new_blocks`
@@ -39,15 +37,6 @@ async fn main() -> Result<(), BatcherError> {
3937
};
4038

4139
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
42-
43-
// Endpoint for Prometheus
44-
metrics::init_variables();
45-
let metrics_route = warp::path!("metrics").and_then(metrics::metrics_handler);
46-
println!("Starting Batcher metrics on port 9093");
47-
tokio::task::spawn(async move {
48-
warp::serve(metrics_route).run(([0, 0, 0, 0], 9093)).await;
49-
}); //TODO read from config
50-
5140
let batcher = Batcher::new(cli.config).await;
5241
let batcher = Arc::new(batcher);
5342

@@ -63,7 +52,7 @@ async fn main() -> Result<(), BatcherError> {
6352
}
6453
});
6554

66-
metrics::batcher_started();
55+
batcher.metrics.inc_batcher_restart();
6756

6857
batcher.listen_connections(&addr).await?;
6958

Lines changed: 72 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,79 @@
1+
use std::{thread, time::Duration};
2+
13
// Prometheus
24
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
35

4-
use warp::{Rejection, Reply};
5-
6-
use once_cell::sync::Lazy;
7-
use std::{thread, time};
8-
9-
// Prometheus setup
10-
pub static BATCHER_STARTED: Lazy<IntCounter> =
11-
Lazy::new(|| register_int_counter!(opts!("batcher_started", "Batcher Started")).unwrap());
12-
13-
pub static OPEN_CONNECTIONS: Lazy<IntGauge> =
14-
Lazy::new(|| register_int_gauge!(opts!("open_connections", "Open Connections")).unwrap());
15-
16-
pub static RECEIVED_PROOFS: Lazy<IntCounter> =
17-
Lazy::new(|| register_int_counter!(opts!("received_proofs", "Received Proofs")).unwrap());
18-
19-
pub static SENT_BATCHES: Lazy<IntCounter> =
20-
Lazy::new(|| register_int_counter!(opts!("sent_batches", "Sent Batches")).unwrap());
21-
22-
pub static REVERTED_BATCHES: Lazy<IntCounter> =
23-
Lazy::new(|| register_int_counter!(opts!("reverted_batches", "Reverted Batches")).unwrap());
24-
25-
pub static GAS_PRICE_USED_ON_LATEST_BATCH: Lazy<IntGauge> = Lazy::new(|| {
26-
register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price")).unwrap()
27-
});
28-
29-
// so Prometheus can collect our metrics.
30-
pub async fn metrics_handler() -> Result<impl Reply, Rejection> {
31-
use prometheus::Encoder;
32-
let encoder = prometheus::TextEncoder::new();
33-
34-
let mut buffer = Vec::new();
35-
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
36-
eprintln!("could not encode prometheus metrics: {}", e);
37-
};
38-
let res = match String::from_utf8(buffer.clone()) {
39-
Ok(v) => v,
40-
Err(e) => {
41-
eprintln!("prometheus metrics could not be from_utf8'd: {}", e);
42-
String::default()
43-
}
44-
};
45-
buffer.clear();
46-
47-
Ok(res)
48-
}
49-
50-
pub fn init_variables() {
51-
BATCHER_STARTED.reset();
52-
53-
OPEN_CONNECTIONS.set(0);
54-
55-
RECEIVED_PROOFS.reset();
56-
57-
SENT_BATCHES.reset();
58-
59-
REVERTED_BATCHES.reset();
6+
use warp::{Filter, Rejection, Reply};
607

61-
GAS_PRICE_USED_ON_LATEST_BATCH.set(0);
8+
#[derive(Clone, Debug)]
9+
pub struct BatcherMetrics {
10+
pub open_connections: IntGauge,
11+
pub received_proofs: IntCounter,
12+
pub sent_batches: IntCounter,
13+
pub reverted_batches: IntCounter,
14+
pub batcher_started: IntCounter,
15+
pub gas_price_used_on_latest_batch: IntGauge,
6216
}
6317

64-
pub fn batcher_started() {
65-
thread::sleep(time::Duration::from_secs(10));
66-
BATCHER_STARTED.inc();
18+
impl BatcherMetrics {
19+
pub fn start(metrics_port: u16) -> anyhow::Result<Self> {
20+
let registry = prometheus::Registry::new();
21+
22+
let open_connections = register_int_gauge!(opts!("open_connections", "Open Connections"))?;
23+
let received_proofs = register_int_counter!(opts!("received_proofs", "Received Proofs"))?;
24+
let sent_batches = register_int_counter!(opts!("sent_batches", "Sent Batches"))?;
25+
let reverted_batches =
26+
register_int_counter!(opts!("reverted_batches", "Reverted Batches"))?;
27+
let batcher_started = register_int_counter!(opts!("batcher_started", "Batcher Started"))?;
28+
let gas_price_used_on_latest_batch =
29+
register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price"))?;
30+
31+
registry.register(Box::new(open_connections.clone()))?;
32+
registry.register(Box::new(received_proofs.clone()))?;
33+
registry.register(Box::new(sent_batches.clone()))?;
34+
registry.register(Box::new(reverted_batches.clone()))?;
35+
registry.register(Box::new(batcher_started.clone()))?;
36+
37+
let metrics_route = warp::path!("metrics")
38+
.and(warp::any().map(move || registry.clone()))
39+
.and_then(BatcherMetrics::metrics_handler);
40+
41+
tokio::task::spawn(async move {
42+
warp::serve(metrics_route)
43+
.run(([0, 0, 0, 0], metrics_port))
44+
.await;
45+
});
46+
47+
Ok(Self {
48+
open_connections,
49+
received_proofs,
50+
sent_batches,
51+
reverted_batches,
52+
batcher_started,
53+
gas_price_used_on_latest_batch,
54+
})
55+
}
56+
57+
pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
58+
use prometheus::Encoder;
59+
let encoder = prometheus::TextEncoder::new();
60+
61+
let mut buffer = Vec::new();
62+
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
63+
eprintln!("could not encode prometheus metrics: {}", e);
64+
};
65+
let res = String::from_utf8(buffer.clone())
66+
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
67+
.unwrap_or_default();
68+
buffer.clear();
69+
70+
Ok(res)
71+
}
72+
73+
pub fn inc_batcher_restart(&self) {
74+
// Sleep for 2 seconds to allow prometheus to start and set the metrics with default intial values.
75+
// If prometheus is not ready, the metrics will directly be set to 1 and prometheus will not be able to display the correct increment.
76+
thread::sleep(Duration::from_secs(2));
77+
self.batcher_started.inc();
78+
}
6779
}

config-files/config-batcher.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ batcher:
2222
max_batch_size: 268435456 # 256 MiB
2323
eth_ws_reconnects: 99999999999999
2424
pre_verification_is_enabled: true
25+
metrics_port: 9093
2526
non_paying:
2627
address: 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 # Anvil address 9
2728
replacement_private_key: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 # Anvil address 1

config-files/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ batcher:
2727
max_batch_size: 268435456 # 256 MiB
2828
eth_ws_reconnects: 99999999999999
2929
pre_verification_is_enabled: true
30+
metrics_port: 9093
3031

3132
## Aggregator Configurations
3233
aggregator:

0 commit comments

Comments
 (0)