Skip to content

Commit 00d5338

Browse files
Add the payments poller metrics integration
1 parent 61f889a commit 00d5338

File tree

12 files changed

+198
-11
lines changed

12 files changed

+198
-11
lines changed

aggregation_mode/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/gateway/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub struct Config {
88
pub db_connection_url: String,
99
pub network: String,
1010
pub max_daily_proofs_per_user: i64,
11-
pub metrics_port: u16,
11+
pub gateway_metrics_port: u16,
1212
}
1313

1414
impl Config {

aggregation_mode/gateway/src/http.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ impl GatewayServer {
4242
pub fn new(db: Db, config: Config) -> Self {
4343
let network = Network::from_str(&config.network).expect("A valid network in config file");
4444

45-
tracing::info!("Starting metrics server on port {}", config.metrics_port);
46-
let metrics =
47-
GatewayMetrics::start(config.metrics_port).expect("Failed to start metrics server");
45+
tracing::info!(
46+
"Starting metrics server on port {}",
47+
config.gateway_metrics_port
48+
);
49+
let metrics = GatewayMetrics::start(config.gateway_metrics_port)
50+
.expect("Failed to start metrics server");
4851

4952
Self {
5053
db,

aggregation_mode/payments_poller/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ serde = { workspace = true }
88
serde_json = { workspace = true }
99
serde_yaml = { workspace = true }
1010
aligned-sdk = { workspace = true }
11+
prometheus = { workspace = true }
12+
anyhow = { workspace = true }
13+
warp = { workspace = true }
1114
tracing = { version = "0.1", features = ["log"] }
1215
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
1316
actix-web = "4"

aggregation_mode/payments_poller/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub struct Config {
88
pub eth_rpc_url: String,
99
pub payment_service_address: String,
1010
pub last_block_fetched_filepath: String,
11+
pub poller_metrics_port: u16,
1112
}
1213

1314
#[derive(Debug, Deserialize, Serialize)]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod config;
22
pub mod db;
3+
pub mod metrics;
34
pub mod payments;
45
pub mod types;
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use prometheus::{self, opts, register_gauge};
2+
use warp::{reject::Rejection, reply::Reply, Filter};
3+
4+
#[derive(Clone, Debug)]
5+
pub struct PaymentsPollerMetrics {
6+
pub last_processed_block: prometheus::Gauge,
7+
}
8+
9+
impl PaymentsPollerMetrics {
10+
pub fn start(metrics_port: u16) -> anyhow::Result<Self> {
11+
let registry = prometheus::Registry::new();
12+
13+
let last_processed_block = register_gauge!(opts!(
14+
"last_processed_block",
15+
"Last processed block by poller"
16+
))?;
17+
18+
registry.register(Box::new(last_processed_block.clone()))?;
19+
20+
let metrics_route = warp::path!("metrics")
21+
.and(warp::any().map(move || registry.clone()))
22+
.and_then(PaymentsPollerMetrics::metrics_handler);
23+
24+
tokio::task::spawn(async move {
25+
warp::serve(metrics_route)
26+
.run(([0, 0, 0, 0], metrics_port))
27+
.await;
28+
});
29+
30+
Ok(Self {
31+
last_processed_block,
32+
})
33+
}
34+
35+
pub async fn metrics_handler(registry: prometheus::Registry) -> Result<impl Reply, Rejection> {
36+
use prometheus::Encoder;
37+
let encoder = prometheus::TextEncoder::new();
38+
39+
let mut buffer = Vec::new();
40+
if let Err(e) = encoder.encode(&registry.gather(), &mut buffer) {
41+
eprintln!("could not encode prometheus metrics: {}", e);
42+
};
43+
let res = String::from_utf8(buffer.clone())
44+
.inspect_err(|e| eprintln!("prometheus metrics could not be parsed correctly: {e}"))
45+
.unwrap_or_default();
46+
buffer.clear();
47+
48+
Ok(res)
49+
}
50+
51+
pub fn register_last_processed_block(&self, value: u64) {
52+
self.last_processed_block.set(value as f64);
53+
}
54+
}

aggregation_mode/payments_poller/src/payments.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::str::FromStr;
33
use crate::{
44
config::Config,
55
db::Db,
6+
metrics::PaymentsPollerMetrics,
67
types::{AggregationModePaymentService, AggregationModePaymentServiceContract, RpcProvider},
78
};
89
use alloy::{
@@ -21,6 +22,7 @@ pub struct PaymentsPoller {
2122
proof_aggregation_service: AggregationModePaymentServiceContract,
2223
rpc_provider: RpcProvider,
2324
config: Config,
25+
metrics: PaymentsPollerMetrics,
2426
}
2527

2628
impl PaymentsPoller {
@@ -38,11 +40,19 @@ impl PaymentsPoller {
3840
.get_last_block_fetched()
3941
.map_err(|err| PaymentsPollerError::ReadLastBlockError(err.to_string()));
4042

43+
tracing::info!(
44+
"Starting metrics server on port {}",
45+
config.poller_metrics_port
46+
);
47+
let metrics = PaymentsPollerMetrics::start(config.poller_metrics_port)
48+
.expect("Failed to start metrics server");
49+
4150
Ok(Self {
4251
db,
4352
proof_aggregation_service,
4453
rpc_provider,
4554
config,
55+
metrics,
4656
})
4757
}
4858

@@ -121,6 +131,8 @@ impl PaymentsPoller {
121131
continue;
122132
};
123133

134+
self.metrics.register_last_processed_block(current_block);
135+
124136
tokio::time::sleep(std::time::Duration::from_secs(
125137
seconds_to_wait_between_polls,
126138
))

config-files/config-agg-mode-gateway-ethereum-package.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ db_connection_url: "postgres://postgres:postgres@localhost:5435/"
33
eth_rpc_url: "http://localhost:8545"
44
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
55
network: "devnet"
6-
max_daily_proofs_per_user: 32
6+
max_daily_proofs_per_user: 100
77
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"
88

99
# Metrics
10-
metrics_port: 9093
10+
gateway_metrics_port: 9094
11+
poller_metrics_port: 9095

config-files/config-agg-mode-gateway.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ db_connection_url: "postgres://postgres:postgres@localhost:5435/"
33
eth_rpc_url: "http://localhost:8545"
44
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
55
network: "devnet"
6-
max_daily_proofs_per_user: 4
6+
max_daily_proofs_per_user: 100
77
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"
88

99
# Metrics
10-
metrics_port: 9093
10+
gateway_metrics_port: 9094
11+
poller_metrics_port: 9095

0 commit comments

Comments
 (0)