Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use retry::batcher_retryables::{
get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
use tokio::time::{timeout, Instant};
use types::batch_state::BatchState;
use types::user_state::UserState;

Expand Down Expand Up @@ -1479,6 +1479,7 @@ impl Batcher {
proof_submitters: Vec<Address>,
fee_params: CreateNewTaskFeeParams,
) -> Result<TransactionReceipt, BatcherError> {
let start = Instant::now();
let result = retry_function(
|| {
create_new_task_retryable(
Expand All @@ -1497,6 +1498,9 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await;
self.metrics
.create_new_task_latency
.set(start.elapsed().as_millis() as i64);
match result {
Ok(receipt) => {
if let Err(e) = self
Expand Down Expand Up @@ -1524,10 +1528,11 @@ impl Batcher {
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
info!("Cancelling createNewTask transaction...");
let start = Instant::now();
let iteration = Arc::new(Mutex::new(0));
let previous_gas_price = Arc::new(Mutex::new(old_tx_gas_price));

if let Err(e) = retry_function(
match retry_function(
|| async {
let mut iteration = iteration.lock().await;
let mut previous_gas_price = previous_gas_price.lock().await;
Expand Down Expand Up @@ -1563,11 +1568,12 @@ impl Batcher {
)
.await
{
error!("Could not cancel createNewTask transaction: {e}");
return;
Ok(_) => info!("createNewTask transaction successfully canceled"),
Err(e) => error!("Could not cancel createNewTask transaction: {e}"),
};

info!("createNewTask transaction successfully canceled");
self.metrics
.cancel_create_new_task_latency
.set(start.elapsed().as_millis() as i64);
}

/// Only relevant for testing and for users to easily use Aligned
Expand Down Expand Up @@ -1709,6 +1715,8 @@ impl Batcher {
batch_bytes: &[u8],
file_name: &str,
) -> Result<(), BatcherError> {
let start = Instant::now();

retry_function(
|| {
Self::upload_batch_to_s3_retryable(
Expand All @@ -1724,7 +1732,12 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))?;

self.metrics
.s3_latency
.set(start.elapsed().as_micros() as i64);
Ok(())
}

async fn upload_batch_to_s3_retryable(
Expand Down
27 changes: 27 additions & 0 deletions batcher/aligned-batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct BatcherMetrics {
pub batcher_started: IntCounter,
pub gas_price_used_on_latest_batch: IntGauge,
pub broken_ws_connections: IntCounter,
pub s3_latency: IntGauge,
pub create_new_task_latency: IntGauge,
pub cancel_create_new_task_latency: IntGauge,
}

impl BatcherMetrics {
Expand All @@ -44,6 +47,24 @@ impl BatcherMetrics {
"Broken websocket connections"
))?;

let s3_latency = register_int_gauge!(opts!("s3_latency", "S3 Latency"))?;
let create_new_task_latency =
register_int_gauge!(opts!("create_new_task_latency", "Create New Task Latency"))?;
let cancel_create_new_task_latency = register_int_gauge!(opts!(
"cancel_create_new_task_latency",
"Cancel create New Task Latency"
))?;
// let s3_latency = register_histogram!(histogram_opts!("s3_latency","S3 Latency",linear_buckets(1.0, 0.5, 4).unwrap()))?;

// let histogram_opts = HistogramOpts::new("function_latency", "Latency of the function")
// .namespace("your_namespace") // Optional: Add namespace
// .subsystem("your_subsystem") // Optional: Add subsystem
// .buckets(vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]); // Custom buckets

// let histogram = Histogram::with_opts(histogram_opts).unwrap();
// registry.register(Box::new(histogram.clone())).unwrap();
// (registry, histogram)

registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
registry.register(Box::new(sent_batches.clone()))?;
Expand All @@ -53,6 +74,9 @@ impl BatcherMetrics {
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
registry.register(Box::new(batcher_started.clone()))?;
registry.register(Box::new(broken_ws_connections.clone()))?;
registry.register(Box::new(s3_latency.clone()))?;
registry.register(Box::new(create_new_task_latency.clone()))?;
registry.register(Box::new(cancel_create_new_task_latency.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
Expand All @@ -74,6 +98,9 @@ impl BatcherMetrics {
batcher_started,
gas_price_used_on_latest_batch,
broken_ws_connections,
s3_latency,
create_new_task_latency,
cancel_create_new_task_latency,
})
}

Expand Down
Loading
Loading