Skip to content

Commit 5b6aca1

Browse files
feat: add latency metrics to batcher (#1578)
Co-authored-by: Marcos Nicolau <[email protected]>
1 parent a4e8e31 commit 5b6aca1

File tree

3 files changed

+298
-13
lines changed

3 files changed

+298
-13
lines changed

batcher/aligned-batcher/src/lib.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use retry::batcher_retryables::{
1212
user_balance_is_unlocked_retryable,
1313
};
1414
use retry::{retry_function, RetryError};
15-
use tokio::time::timeout;
15+
use tokio::time::{timeout, Instant};
1616
use types::batch_state::BatchState;
1717
use types::user_state::UserState;
1818

@@ -1524,6 +1524,7 @@ impl Batcher {
15241524
proof_submitters: Vec<Address>,
15251525
fee_params: CreateNewTaskFeeParams,
15261526
) -> Result<TransactionReceipt, BatcherError> {
1527+
let start = Instant::now();
15271528
let result = retry_function(
15281529
|| {
15291530
create_new_task_retryable(
@@ -1542,6 +1543,11 @@ impl Batcher {
15421543
ETHEREUM_CALL_MAX_RETRY_DELAY,
15431544
)
15441545
.await;
1546+
self.metrics
1547+
.create_new_task_duration
1548+
.set(start.elapsed().as_millis() as i64);
1549+
// Set to zero since it is not always executed
1550+
self.metrics.cancel_create_new_task_duration.set(0);
15451551
match result {
15461552
Ok(receipt) => {
15471553
if let Err(e) = self
@@ -1600,10 +1606,11 @@ impl Batcher {
16001606
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
16011607
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
16021608
info!("Cancelling createNewTask transaction...");
1609+
let start = Instant::now();
16031610
let iteration = Arc::new(Mutex::new(0));
16041611
let previous_gas_price = Arc::new(Mutex::new(old_tx_gas_price));
16051612

1606-
if let Err(e) = retry_function(
1613+
match retry_function(
16071614
|| async {
16081615
let mut iteration = iteration.lock().await;
16091616
let mut previous_gas_price = previous_gas_price.lock().await;
@@ -1639,11 +1646,12 @@ impl Batcher {
16391646
)
16401647
.await
16411648
{
1642-
error!("Could not cancel createNewTask transaction: {e}");
1643-
return;
1649+
Ok(_) => info!("createNewTask transaction successfully canceled"),
1650+
Err(e) => error!("Could not cancel createNewTask transaction: {e}"),
16441651
};
1645-
1646-
info!("createNewTask transaction successfully canceled");
1652+
self.metrics
1653+
.cancel_create_new_task_duration
1654+
.set(start.elapsed().as_millis() as i64);
16471655
}
16481656

16491657
/// Only relevant for testing and for users to easily use Aligned
@@ -1785,7 +1793,8 @@ impl Batcher {
17851793
batch_bytes: &[u8],
17861794
file_name: &str,
17871795
) -> Result<(), BatcherError> {
1788-
retry_function(
1796+
let start = Instant::now();
1797+
let result = retry_function(
17891798
|| {
17901799
Self::upload_batch_to_s3_retryable(
17911800
batch_bytes,
@@ -1800,7 +1809,13 @@ impl Batcher {
18001809
ETHEREUM_CALL_MAX_RETRY_DELAY,
18011810
)
18021811
.await
1803-
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))
1812+
.map_err(|e| BatcherError::BatchUploadError(e.to_string()));
1813+
1814+
self.metrics
1815+
.s3_duration
1816+
.set(start.elapsed().as_micros() as i64);
1817+
1818+
result
18041819
}
18051820

18061821
async fn upload_batch_to_s3_retryable(

batcher/aligned-batcher/src/metrics.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ pub struct BatcherMetrics {
1919
pub batcher_started: IntCounter,
2020
pub gas_price_used_on_latest_batch: IntGauge,
2121
pub broken_ws_connections: IntCounter,
22+
pub s3_duration: IntGauge,
23+
pub create_new_task_duration: IntGauge,
24+
pub cancel_create_new_task_duration: IntGauge,
2225
}
2326

2427
impl BatcherMetrics {
@@ -46,6 +49,15 @@ impl BatcherMetrics {
4649
"broken_ws_connections_count",
4750
"Broken websocket connections"
4851
))?;
52+
let s3_duration = register_int_gauge!(opts!("s3_duration", "S3 Duration"))?;
53+
let create_new_task_duration = register_int_gauge!(opts!(
54+
"create_new_task_duration",
55+
"Create New Task Duration"
56+
))?;
57+
let cancel_create_new_task_duration = register_int_gauge!(opts!(
58+
"cancel_create_new_task_duration",
59+
"Cancel create New Task Duration"
60+
))?;
4961

5062
registry.register(Box::new(open_connections.clone()))?;
5163
registry.register(Box::new(received_proofs.clone()))?;
@@ -56,6 +68,9 @@ impl BatcherMetrics {
5668
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
5769
registry.register(Box::new(batcher_started.clone()))?;
5870
registry.register(Box::new(broken_ws_connections.clone()))?;
71+
registry.register(Box::new(s3_duration.clone()))?;
72+
registry.register(Box::new(create_new_task_duration.clone()))?;
73+
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
5974

6075
let metrics_route = warp::path!("metrics")
6176
.and(warp::any().map(move || registry.clone()))
@@ -77,6 +92,9 @@ impl BatcherMetrics {
7792
batcher_started,
7893
gas_price_used_on_latest_batch,
7994
broken_ws_connections,
95+
s3_duration,
96+
create_new_task_duration,
97+
cancel_create_new_task_duration,
8098
})
8199
}
82100

0 commit comments

Comments
 (0)