Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl Batcher {
Ok(msg) => msg,
Err(e) => {
warn!("Failed to deserialize message: {}", e);
self.metrics.user_error(&["deserialize_error", ""]);
return Ok(());
}
};
Expand All @@ -419,7 +420,7 @@ impl Batcher {
ValidityResponseMessage::InvalidChainId,
)
.await;

self.metrics.user_error(&["invalid_chain_id", ""]);
return Ok(());
}

Expand All @@ -435,7 +436,8 @@ impl Batcher {
),
)
.await;

self.metrics
.user_error(&["invalid_paument_service_address", ""]);
return Ok(());
}

Expand All @@ -447,6 +449,7 @@ impl Batcher {
ValidityResponseMessage::InvalidSignature,
)
.await;
self.metrics.user_error(&["invalid_signature", ""]);
return Ok(());
};
info!("Message signature verified");
Expand All @@ -455,6 +458,7 @@ impl Batcher {
if proof_size > self.max_proof_size {
error!("Proof size exceeds the maximum allowed size.");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::ProofTooLarge).await;
self.metrics.user_error(&["proof_too_large", ""]);
return Ok(());
}

Expand All @@ -478,6 +482,10 @@ impl Batcher {
)),
)
.await;
self.metrics.user_error(&[
"disabled_verifier",
&format!("{}", verification_data.proving_system),
]);
return Ok(());
}

Expand All @@ -488,6 +496,10 @@ impl Batcher {
ValidityResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof),
)
.await;
self.metrics.user_error(&[
"rejected_proof",
&format!("{}", verification_data.proving_system),
]);
return Ok(());
}
}
Expand All @@ -509,6 +521,7 @@ impl Batcher {
ValidityResponseMessage::InsufficientBalance(addr),
)
.await;
self.metrics.user_error(&["insufficient_balance", ""]);
return Ok(());
}

Expand All @@ -530,6 +543,7 @@ impl Batcher {
"Failed to get user nonce from Ethereum for address {addr:?}. Error: {e:?}"
);
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
}
};
Expand All @@ -548,6 +562,7 @@ impl Batcher {
let Some(user_balance) = self.get_user_balance(&addr).await else {
error!("Could not get balance for address {addr:?}");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::EthRpcError).await;
self.metrics.user_error(&["eth_rpc_error", ""]);
return Ok(());
};

Expand All @@ -560,6 +575,7 @@ impl Batcher {
error!("Failed to get user proof count: User not found in user states, but it should have been already inserted");
std::mem::drop(batch_state_lock);
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
};

Expand All @@ -570,6 +586,7 @@ impl Batcher {
ValidityResponseMessage::InsufficientBalance(addr),
)
.await;
self.metrics.user_error(&["insufficient_balance", ""]);
return Ok(());
}

Expand All @@ -578,13 +595,15 @@ impl Batcher {
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
std::mem::drop(batch_state_lock);
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
};

if expected_nonce < msg_nonce {
std::mem::drop(batch_state_lock);
warn!("Invalid nonce for address {addr}, had nonce {expected_nonce:?} < {msg_nonce:?}");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
}

Expand All @@ -608,13 +627,15 @@ impl Batcher {
let Some(user_min_fee) = batch_state_lock.get_user_min_fee(&addr).await else {
std::mem::drop(batch_state_lock);
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
};

if msg_max_fee > user_min_fee {
std::mem::drop(batch_state_lock);
warn!("Invalid max fee for address {addr}, had fee {user_min_fee:?} < {msg_max_fee:?}");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidMaxFee).await;
self.metrics.user_error(&["invalid_max_fee", ""]);
return Ok(());
}

Expand All @@ -634,6 +655,7 @@ impl Batcher {
{
error!("Error while adding entry to batch: {e:?}");
send_message(ws_conn_sink, ValidityResponseMessage::AddToBatchError).await;
self.metrics.user_error(&["add_to_batch_error", ""]);
return Ok(());
};

Expand Down Expand Up @@ -674,6 +696,7 @@ impl Batcher {
std::mem::drop(batch_state_lock);
warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
self.metrics.user_error(&["invalid_nonce", ""]);
return;
};

Expand All @@ -686,7 +709,8 @@ impl Batcher {
ValidityResponseMessage::InvalidReplacementMessage,
)
.await;

self.metrics
.user_error(&["invalid_replacement_message", ""]);
return;
}

Expand Down Expand Up @@ -723,6 +747,8 @@ impl Batcher {
ValidityResponseMessage::InvalidReplacementMessage,
)
.await;
self.metrics
.user_error(&["invalid_replacement_message", ""]);
return;
}

Expand Down
16 changes: 15 additions & 1 deletion batcher/aligned-batcher/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{thread, time::Duration};

// Prometheus
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use prometheus::{
opts, register_int_counter, register_int_counter_vec, register_int_gauge, IntCounter,
IntCounterVec, IntGauge,
};

use warp::{Filter, Rejection, Reply};

Expand All @@ -12,6 +15,7 @@ pub struct BatcherMetrics {
pub sent_batches: IntCounter,
pub reverted_batches: IntCounter,
pub canceled_batches: IntCounter,
pub user_errors: IntCounterVec,
pub batcher_started: IntCounter,
pub gas_price_used_on_latest_batch: IntGauge,
pub broken_ws_connections: IntCounter,
Expand All @@ -28,6 +32,10 @@ impl BatcherMetrics {
register_int_counter!(opts!("reverted_batches", "Reverted Batches"))?;
let canceled_batches =
register_int_counter!(opts!("canceled_batches", "Canceled Batches"))?;
let user_errors = register_int_counter_vec!(
opts!("user_errors", "User Errors"),
&["error_type", "proving_system"]
)?;
let batcher_started = register_int_counter!(opts!("batcher_started", "Batcher Started"))?;
let gas_price_used_on_latest_batch =
register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price"))?;
Expand All @@ -41,6 +49,7 @@ impl BatcherMetrics {
registry.register(Box::new(sent_batches.clone()))?;
registry.register(Box::new(reverted_batches.clone()))?;
registry.register(Box::new(canceled_batches.clone()))?;
registry.register(Box::new(user_errors.clone()))?;
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
registry.register(Box::new(batcher_started.clone()))?;
registry.register(Box::new(broken_ws_connections.clone()))?;
Expand All @@ -61,6 +70,7 @@ impl BatcherMetrics {
sent_batches,
reverted_batches,
canceled_batches,
user_errors,
batcher_started,
gas_price_used_on_latest_batch,
broken_ws_connections,
Expand Down Expand Up @@ -89,4 +99,8 @@ impl BatcherMetrics {
thread::sleep(Duration::from_secs(2));
self.batcher_started.inc();
}

pub fn user_error(&self, label_values: &[&str]) {
self.user_errors.with_label_values(label_values).inc();
}
}
137 changes: 135 additions & 2 deletions grafana/provisioning/dashboards/aligned/aggregator_batcher.json
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@
},
"gridPos": {
"h": 8,
"w": 12,
"w": 10,
"x": 0,
"y": 25
},
Expand Down Expand Up @@ -982,6 +982,139 @@
"title": "Broken websocket connections",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 11,
"x": 10,
"y": 25
},
"id": 24,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "user_errors",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "{{error_type}}",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "User Error Count",
"transformations": [
{
"id": "calculateField",
"options": {
"alias": "proof_rejected",
"mode": "reduceRow",
"reduce": {
"include": [
"rejected_proof"
],
"reducer": "sum"
}
}
},
{
"id": "organize",
"options": {
"excludeByName": {
"rejected_proof": true
},
"indexByName": {},
"renameByName": {}
}
},
{
"id": "calculateField",
"options": {
"alias": "total",
"mode": "reduceRow",
"reduce": {
"reducer": "sum"
}
}
}
],
"type": "timeseries"
},
{
"collapsed": true,
"gridPos": {
Expand Down Expand Up @@ -1513,6 +1646,6 @@
"timezone": "browser",
"title": "Aggregator Data",
"uid": "aggregator",
"version": 2,
"version": 3,
"weekStart": ""
}