Skip to content

Commit 2ebf0a0

Browse files
IAvecillaJuArceMauroToscano
authored
feat: Telemetry for the batcher and improve current tracing (#1335)
Co-authored-by: JuArce <[email protected]> Co-authored-by: Mauro Toscano <[email protected]>
1 parent c980252 commit 2ebf0a0

File tree

15 files changed

+504
-39
lines changed

15 files changed

+504
-39
lines changed

aggregator/internal/pkg/aggregator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
272272
agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
273273
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
274274
for i := 0; i < MaxSentTxRetries; i++ {
275-
_, err = agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
275+
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
276276
if err == nil {
277+
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
277278
agg.logger.Info("Aggregator successfully responded to task",
278279
"taskIndex", blsAggServiceResp.TaskIndex,
279280
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

aggregator/internal/pkg/telemetry.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ type TaskErrorMessage struct {
3030
TaskError string `json:"error"`
3131
}
3232

33+
type TaskSentToEthereumMessage struct {
34+
MerkleRoot string `json:"merkle_root"`
35+
TxHash string `json:"tx_hash"`
36+
}
37+
3338
type Telemetry struct {
3439
client http.Client
3540
baseURL url.URL
@@ -91,6 +96,16 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
9196
}
9297
}
9398

99+
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
100+
body := TaskSentToEthereumMessage{
101+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
102+
TxHash: txHash,
103+
}
104+
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
105+
t.logger.Error("[Telemetry] Error in TaskSentToEthereum", "error", err)
106+
}
107+
}
108+
94109
func (t *Telemetry) FinishTrace(batchMerkleRoot [32]byte) {
95110
// In order to wait for all operator responses, even if the quorum is reached, this function has a delayed execution
96111
go func() {

batcher/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

batcher/aligned-batcher/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ bincode = "1.3.3"
2828
aligned-sdk = { path = "../aligned-sdk" }
2929
ciborium = "=0.2.2"
3030
priority-queue = "2.1.0"
31+
reqwest = { version = "0.12", features = ["json"] }
3132

3233
once_cell = "1.20.2"
3334
warp = "0.3.7"

batcher/aligned-batcher/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct BatcherConfigFromYaml {
4242
pub max_batch_size: usize,
4343
pub pre_verification_is_enabled: bool,
4444
pub metrics_port: u16,
45+
pub telemetry_ip_port_address: String,
4546
pub non_paying: Option<NonPayingConfigFromYaml>,
4647
}
4748

batcher/aligned-batcher/src/lib.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
4848
use types::errors::{BatcherError, BatcherSendError};
4949

5050
use crate::config::{ConfigFromYaml, ContractDeploymentOutput};
51+
use crate::telemetry::sender::TelemetrySender;
5152

5253
mod config;
5354
mod connection;
@@ -58,6 +59,7 @@ pub mod retry;
5859
pub mod risc_zero;
5960
pub mod s3;
6061
pub mod sp1;
62+
pub mod telemetry;
6163
pub mod types;
6264
mod zk_utils;
6365

@@ -86,6 +88,7 @@ pub struct Batcher {
8688
posting_batch: Mutex<bool>,
8789
disabled_verifiers: Mutex<U256>,
8890
pub metrics: metrics::BatcherMetrics,
91+
pub telemetry: TelemetrySender,
8992
}
9093

9194
impl Batcher {
@@ -212,6 +215,11 @@ impl Batcher {
212215
}
213216
.expect("Failed to get disabled verifiers");
214217

218+
let telemetry = TelemetrySender::new(format!(
219+
"http://{}",
220+
config.batcher.telemetry_ip_port_address
221+
));
222+
215223
Self {
216224
s3_client,
217225
s3_bucket_name,
@@ -235,6 +243,7 @@ impl Batcher {
235243
batch_state: Mutex::new(batch_state),
236244
disabled_verifiers: Mutex::new(disabled_verifiers),
237245
metrics,
246+
telemetry,
238247
}
239248
}
240249

@@ -981,6 +990,14 @@ impl Batcher {
981990
.map(VerificationCommitmentBatch::hash_data)
982991
.collect();
983992

993+
if let Err(e) = self
994+
.telemetry
995+
.init_task_trace(&hex::encode(batch_merkle_tree.root))
996+
.await
997+
{
998+
error!("Failed to initialize task trace on telemetry: {:?}", e);
999+
}
1000+
9841001
if let Err(e) = self
9851002
.submit_batch(
9861003
&batch_bytes,
@@ -991,6 +1008,14 @@ impl Batcher {
9911008
)
9921009
.await
9931010
{
1011+
let reason = format!("{:?}", e);
1012+
if let Err(e) = self
1013+
.telemetry
1014+
.task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason)
1015+
.await
1016+
{
1017+
error!("Failed to send task status to telemetry: {:?}", e);
1018+
}
9941019
for entry in finalized_batch.into_iter() {
9951020
if let Some(ws_sink) = entry.messaging_sink {
9961021
let merkle_root = hex::encode(batch_merkle_tree.root);
@@ -1102,6 +1127,13 @@ impl Batcher {
11021127
info!("Uploading batch to S3...");
11031128
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
11041129

1130+
if let Err(e) = self
1131+
.telemetry
1132+
.task_uploaded_to_s3(&batch_merkle_root_hex)
1133+
.await
1134+
{
1135+
error!("Failed to send task status to telemetry: {:?}", e);
1136+
};
11051137
info!("Batch sent to S3 with name: {}", file_name);
11061138

11071139
info!("Uploading batch to contract");
@@ -1134,6 +1166,18 @@ impl Batcher {
11341166
.gas_price_used_on_latest_batch
11351167
.set(gas_price.as_u64() as i64);
11361168

1169+
if let Err(e) = self
1170+
.telemetry
1171+
.task_created(
1172+
&hex::encode(batch_merkle_root),
1173+
ethers::utils::format_ether(fee_per_proof),
1174+
num_proofs_in_batch,
1175+
)
1176+
.await
1177+
{
1178+
error!("Failed to send task status to telemetry: {:?}", e);
1179+
};
1180+
11371181
match self
11381182
.create_new_task(
11391183
*batch_merkle_root,
@@ -1178,7 +1222,16 @@ impl Batcher {
11781222
)
11791223
.await
11801224
{
1181-
Ok(receipt) => Ok(receipt),
1225+
Ok(receipt) => {
1226+
if let Err(e) = self
1227+
.telemetry
1228+
.task_sent(&hex::encode(batch_merkle_root), receipt.transaction_hash)
1229+
.await
1230+
{
1231+
error!("Failed to send task status to telemetry: {:?}", e);
1232+
}
1233+
Ok(receipt)
1234+
}
11821235
Err(BatcherSendError::TransactionReverted(err)) => {
11831236
// Since transaction was reverted, we don't want to retry with fallback.
11841237
warn!("Transaction reverted {:?}", err);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod sender;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use ethers::types::H256;
2+
3+
#[derive(Debug, serde::Serialize)]
4+
pub enum TraceMessage {
5+
CreatingTask([u8; 32]),
6+
TaskCreated(H256),
7+
TaskCreationFailed(H256),
8+
}
9+
10+
#[derive(Debug, serde::Serialize)]
11+
pub struct TraceMessageTask {
12+
merkle_root: String,
13+
}
14+
15+
#[derive(Debug, serde::Serialize)]
16+
pub struct TraceMessageTaskStarted {
17+
merkle_root: String,
18+
fee_per_proof: String,
19+
num_proofs_in_batch: usize,
20+
}
21+
22+
#[derive(Debug, serde::Serialize)]
23+
pub struct TraceMessageTaskSentToEthereum {
24+
merkle_root: String,
25+
tx_hash: H256,
26+
}
27+
28+
#[derive(Debug, serde::Serialize)]
29+
pub struct TraceMessageNewBatch {
30+
merkle_root: String,
31+
proof_count: usize,
32+
}
33+
34+
#[derive(Debug, serde::Serialize)]
35+
pub struct TraceMessageTaskError {
36+
merkle_root: String,
37+
error: String,
38+
}
39+
40+
pub struct TelemetrySender {
41+
base_url: String,
42+
client: reqwest::Client,
43+
}
44+
45+
impl TelemetrySender {
46+
pub fn new(base_url: String) -> Self {
47+
let client = reqwest::Client::new();
48+
Self { base_url, client }
49+
}
50+
51+
pub fn get_full_url(&self, path: &str) -> String {
52+
format!("{}/api/{}", self.base_url, path)
53+
}
54+
55+
pub async fn init_task_trace(&self, batch_merkle_root: &str) -> Result<(), reqwest::Error> {
56+
let url = self.get_full_url("initBatcherTaskTrace");
57+
let formatted_merkle_root = format!("0x{}", batch_merkle_root);
58+
let task = TraceMessageTask {
59+
merkle_root: formatted_merkle_root,
60+
};
61+
self.client.post(&url).json(&task).send().await?;
62+
Ok(())
63+
}
64+
65+
pub async fn task_sent(
66+
&self,
67+
batch_merkle_root: &str,
68+
tx_hash: H256,
69+
) -> Result<(), reqwest::Error> {
70+
let url = self.get_full_url("batcherTaskSent");
71+
let formatted_merkle_root = format!("0x{}", batch_merkle_root);
72+
let task = TraceMessageTaskSentToEthereum {
73+
merkle_root: formatted_merkle_root,
74+
tx_hash,
75+
};
76+
self.client.post(&url).json(&task).send().await?;
77+
Ok(())
78+
}
79+
80+
pub async fn task_created(
81+
&self,
82+
batch_merkle_root: &str,
83+
fee_per_proof: String,
84+
num_proofs_in_batch: usize,
85+
) -> Result<(), reqwest::Error> {
86+
let url = self.get_full_url("batcherTaskStarted");
87+
let formatted_merkle_root = format!("0x{}", batch_merkle_root);
88+
let task = TraceMessageTaskStarted {
89+
merkle_root: formatted_merkle_root,
90+
fee_per_proof,
91+
num_proofs_in_batch,
92+
};
93+
self.client.post(&url).json(&task).send().await?;
94+
Ok(())
95+
}
96+
97+
pub async fn task_uploaded_to_s3(&self, batch_merkle_root: &str) -> Result<(), reqwest::Error> {
98+
let url = self.get_full_url("batcherTaskUploadedToS3");
99+
let formatted_merkle_root = format!("0x{}", batch_merkle_root);
100+
let task = TraceMessageTask {
101+
merkle_root: formatted_merkle_root,
102+
};
103+
self.client.post(&url).json(&task).send().await?;
104+
Ok(())
105+
}
106+
107+
pub async fn task_creation_failed(
108+
&self,
109+
batch_merkle_root: &str,
110+
reason: &str,
111+
) -> Result<(), reqwest::Error> {
112+
let url = self.get_full_url("batcherTaskCreationFailed");
113+
let formatted_merkle_root = format!("0x{}", batch_merkle_root);
114+
let task = TraceMessageTaskError {
115+
merkle_root: formatted_merkle_root,
116+
error: reason.to_string(),
117+
};
118+
self.client.post(&url).json(&task).send().await?;
119+
Ok(())
120+
}
121+
}

config-files/config-batcher-docker.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ batcher:
2323
eth_ws_reconnects: 99999999999999
2424
pre_verification_is_enabled: true
2525
metrics_port: 9093
26+
telemetry_ip_port_address: localhost:4001
2627
non_paying:
2728
address: 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 # Anvil address 9
2829
replacement_private_key: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 # Anvil address 1

config-files/config-batcher.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ batcher:
2222
max_batch_size: 268435456 # 256 MiB
2323
pre_verification_is_enabled: true
2424
metrics_port: 9093
25+
telemetry_ip_port_address: localhost:4001
2526
non_paying:
2627
address: 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 # Anvil address 9
2728
replacement_private_key: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 # Anvil address 1

0 commit comments

Comments
 (0)