Skip to content

Commit ecf04b8

Browse files
authored
Merge pull request #152 from semiotic-ai/aasseman/issue146
feat(aggregator): add prometheus metrics
2 parents a3d1e38 + b021cf5 commit ecf04b8

File tree

6 files changed

+176
-22
lines changed

6 files changed

+176
-22
lines changed

tap_aggregator/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ serde_json = { version = "1.0.96", features = ["raw_value"] }
2323
strum = { version = "0.24.1", features = ["strum_macros", "derive"] }
2424
tracing-subscriber = { version = "0.3.17" }
2525
log = "0.4.19"
26+
prometheus = "0.13.3"
27+
axum = "0.6.18"
28+
futures-util = "0.3.28"
29+
lazy_static = "1.4.0"
2630

2731
[dev-dependencies]
2832
jsonrpsee = { version = "0.18.0", features = ["http-client", "jsonrpsee-core"] }

tap_aggregator/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ pub mod aggregator;
55
pub mod api_versioning;
66
pub mod error_codes;
77
pub mod jsonrpsee_helpers;
8+
pub mod metrics;
89
pub mod server;

tap_aggregator/src/main.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ethers_signers::{coins_bip39::English, MnemonicBuilder};
99
use tokio::signal::unix::{signal, SignalKind};
1010

1111
use log::{debug, info, warn};
12+
use tap_aggregator::metrics;
1213
use tap_aggregator::server;
1314

1415
#[derive(Parser, Debug)]
@@ -36,6 +37,11 @@ struct Args {
3637
/// Defaults to 32.
3738
#[arg(long, default_value_t = 32, env = "TAP_MAX_CONNECTIONS")]
3839
max_connections: u32,
40+
41+
/// Metrics server port.
42+
/// Defaults to 5000.
43+
#[arg(long, default_value_t = 5000, env = "TAP_METRICS_PORT")]
44+
metrics_port: u16,
3945
}
4046

4147
#[tokio::main]
@@ -50,12 +56,17 @@ async fn main() -> Result<()> {
5056
let args = Args::parse();
5157
debug!("Settings: {:?}", args);
5258

59+
// Start the metrics server.
60+
// We just let it gracelessly get killed at the end of main()
61+
tokio::spawn(metrics::run_server(args.metrics_port));
62+
5363
// Create a wallet from the mnemonic.
5464
let wallet = MnemonicBuilder::<English>::default()
5565
.phrase(args.mnemonic.as_str())
5666
.build()?;
5767

5868
// Start the JSON-RPC server.
69+
// This await is non-blocking
5970
let (handle, _) = server::run_server(
6071
args.port,
6172
wallet,

tap_aggregator/src/metrics.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2023-, Semiotic AI, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::{net::SocketAddr, panic};
5+
6+
use axum::{http::StatusCode, response::IntoResponse, routing::get, Router, Server};
7+
use futures_util::FutureExt;
8+
use jsonrpsee::tracing::error;
9+
use log::{debug, info};
10+
use prometheus::TextEncoder;
11+
12+
async fn handler_metrics() -> (StatusCode, String) {
13+
let metric_families = prometheus::gather();
14+
let encoder = TextEncoder::new();
15+
16+
match encoder.encode_to_string(&metric_families) {
17+
Ok(s) => (StatusCode::OK, s),
18+
Err(e) => {
19+
error!("Error encoding metrics: {}", e);
20+
(
21+
StatusCode::INTERNAL_SERVER_ERROR,
22+
format!("Error encoding metrics: {}", e),
23+
)
24+
}
25+
}
26+
}
27+
28+
async fn handler_404() -> impl IntoResponse {
29+
(StatusCode::NOT_FOUND, "404 Not Found")
30+
}
31+
32+
async fn _run_server(port: u16) {
33+
let app = Router::new()
34+
.route("/metrics", get(handler_metrics))
35+
.fallback(handler_404);
36+
let addr = SocketAddr::from(([0, 0, 0, 0], port));
37+
let server = Server::bind(&addr).serve(app.into_make_service());
38+
39+
info!("Metrics server listening on {}", addr);
40+
41+
let res = server.await;
42+
43+
debug!("Metrics server stopped");
44+
45+
if let Err(err) = res {
46+
panic!("Metrics server error: {:#?}", err);
47+
};
48+
}
49+
50+
pub async fn run_server(port: u16) {
51+
// Code here is to abort program if there is a panic in _run_server
52+
// Otherwise, when spawning the task, the panic will be silently ignored
53+
let res = panic::AssertUnwindSafe(_run_server(port))
54+
.catch_unwind()
55+
.await;
56+
if res.is_err() {
57+
std::process::abort();
58+
}
59+
}

tap_aggregator/src/server.rs

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use jsonrpsee::{
1010
server::ServerBuilder,
1111
{core::async_trait, server::ServerHandle},
1212
};
13+
use lazy_static::lazy_static;
14+
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1315

1416
use crate::aggregator::check_and_aggregate_receipts;
1517
use crate::api_versioning::{
@@ -23,6 +25,51 @@ use tap_core::{
2325
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::Receipt,
2426
};
2527

28+
// Register the metrics into the global metrics registry.
29+
lazy_static! {
30+
static ref AGGREGATION_SUCCESS_COUNTER: IntCounter = register_int_counter!(
31+
"aggregation_success_count",
32+
"Number of successful receipt aggregation requests."
33+
)
34+
.unwrap();
35+
}
36+
lazy_static! {
37+
static ref AGGREGATION_FAILURE_COUNTER: IntCounter = register_int_counter!(
38+
"aggregation_failure_count",
39+
"Number of failed receipt aggregation requests (for any reason)."
40+
)
41+
.unwrap();
42+
}
43+
lazy_static! {
44+
static ref DEPRECATION_WARNING_COUNT: IntCounter = register_int_counter!(
45+
"deprecation_warning_count",
46+
"Number of deprecation warnings sent to clients."
47+
)
48+
.unwrap();
49+
}
50+
lazy_static! {
51+
static ref VERSION_ERROR_COUNT: IntCounter = register_int_counter!(
52+
"version_error_count",
53+
"Number of API version errors sent to clients."
54+
)
55+
.unwrap();
56+
}
57+
lazy_static! {
58+
static ref TOTAL_AGGREGATED_RECEIPTS: IntCounter = register_int_counter!(
59+
"total_aggregated_receipts",
60+
"Total number of receipts successfully aggregated."
61+
)
62+
.unwrap();
63+
}
64+
// Using float for the GRT value because it can somewhat easily exceed the maximum value of int64.
65+
lazy_static! {
66+
static ref TOTAL_GRT_AGGREGATED: Counter = register_counter!(
67+
"total_aggregated_grt",
68+
"Total successfully aggregated GRT value (wei)."
69+
)
70+
.unwrap();
71+
}
72+
2673
/// Generates the `RpcServer` trait that is used to define the JSON-RPC API.
2774
///
2875
/// Note that because of the way the `rpc` macro works, we cannot document the RpcServer trait here.
@@ -80,6 +127,45 @@ fn check_api_version_deprecation(api_version: &TapRpcApiVersion) -> Option<JsonR
80127
}
81128
}
82129

130+
async fn aggregate_receipts_(
131+
api_version: String,
132+
wallet: &LocalWallet,
133+
receipts: Vec<EIP712SignedMessage<Receipt>>,
134+
previous_rav: Option<EIP712SignedMessage<ReceiptAggregateVoucher>>,
135+
) -> JsonRpcResult<EIP712SignedMessage<ReceiptAggregateVoucher>> {
136+
// Return an error if the API version is not supported.
137+
let api_version = match parse_api_version(api_version.as_str()) {
138+
Ok(v) => v,
139+
Err(e) => {
140+
VERSION_ERROR_COUNT.inc();
141+
return Err(e);
142+
}
143+
};
144+
145+
// Add a warning if the API version is to be deprecated.
146+
let mut warnings: Vec<JsonRpcWarning> = Vec::new();
147+
if let Some(w) = check_api_version_deprecation(&api_version) {
148+
warnings.push(w);
149+
DEPRECATION_WARNING_COUNT.inc();
150+
}
151+
152+
let res = match api_version {
153+
TapRpcApiVersion::V0_0 => {
154+
check_and_aggregate_receipts(&receipts, previous_rav, wallet).await
155+
}
156+
};
157+
158+
// Handle aggregation error
159+
match res {
160+
Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)),
161+
Err(e) => Err(jsonrpsee::types::ErrorObject::owned(
162+
JsonRpcErrorCode::Aggregation as i32,
163+
e.to_string(),
164+
None::<()>,
165+
)),
166+
}
167+
}
168+
83169
#[async_trait]
84170
impl RpcServer for RpcImpl {
85171
async fn api_versions(&self) -> JsonRpcResult<TapRpcApiVersionsInfo> {
@@ -92,29 +178,21 @@ impl RpcServer for RpcImpl {
92178
receipts: Vec<EIP712SignedMessage<Receipt>>,
93179
previous_rav: Option<EIP712SignedMessage<ReceiptAggregateVoucher>>,
94180
) -> JsonRpcResult<EIP712SignedMessage<ReceiptAggregateVoucher>> {
95-
// Return an error if the API version is not supported.
96-
let api_version = parse_api_version(api_version.as_str())?;
97-
98-
// Add a warning if the API version is to be deprecated.
99-
let mut warnings: Vec<JsonRpcWarning> = Vec::new();
100-
if let Some(w) = check_api_version_deprecation(&api_version) {
101-
warnings.push(w);
102-
}
103-
104-
let res = match api_version {
105-
TapRpcApiVersion::V0_0 => {
106-
check_and_aggregate_receipts(&receipts, previous_rav, &self.wallet).await
181+
// Values for Prometheus metrics
182+
let receipts_grt: u128 = receipts.iter().map(|r| r.message.value).sum();
183+
let receipts_count: u64 = receipts.len() as u64;
184+
185+
match aggregate_receipts_(api_version, &self.wallet, receipts, previous_rav).await {
186+
Ok(res) => {
187+
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
188+
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
189+
AGGREGATION_SUCCESS_COUNTER.inc();
190+
Ok(res)
191+
}
192+
Err(e) => {
193+
AGGREGATION_FAILURE_COUNTER.inc();
194+
Err(e)
107195
}
108-
};
109-
110-
// Handle aggregation error
111-
match res {
112-
Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)),
113-
Err(e) => Err(jsonrpsee::types::ErrorObject::owned(
114-
JsonRpcErrorCode::Aggregation as i32,
115-
e.to_string(),
116-
None::<()>,
117-
)),
118196
}
119197
}
120198
}

tap_integration_tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ rand = "0.8.5"
1818
futures = "0.3.28"
1919
anyhow = "1.0.71"
2020
tokio = "1.28.2"
21+
prometheus = "0.13.3"
2122

2223
[[test]]
2324
name = "integration_tests"

0 commit comments

Comments
 (0)