Skip to content

Commit 7028bbf

Browse files
entropidelicjotabulaciosMauroToscano
authored
feat(batcher): send response to clients once task has been confirmed in ethereum (#316)
Co-authored-by: jotabulacios <[email protected]> Co-authored-by: MauroFab <[email protected]>
1 parent 0d86b52 commit 7028bbf

File tree

8 files changed

+216
-68
lines changed

8 files changed

+216
-68
lines changed

batcher/Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

batcher/client/Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

batcher/client/src/main.rs

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@ mod errors;
44

55
use std::{path::PathBuf, sync::Arc};
66

7-
use alloy_primitives::{Address, hex};
7+
use alloy_primitives::{hex, Address};
88
use env_logger::Env;
99
use futures_util::{
1010
future,
1111
stream::{SplitSink, SplitStream},
1212
SinkExt, StreamExt, TryStreamExt,
1313
};
14-
use log::info;
14+
use log::{error, info};
1515
use tokio::{net::TcpStream, sync::Mutex};
1616
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
1717

18-
use batcher::types::{parse_proving_system, BatchInclusionData, VerificationData, ProvingSystemId};
18+
use batcher::types::{parse_proving_system, BatchInclusionData, ProvingSystemId, VerificationData};
1919

20+
use crate::errors::BatcherClientError;
2021
use clap::Parser;
2122
use tungstenite::Message;
22-
use crate::errors::BatcherClientError;
2323

2424
#[derive(Parser, Debug)]
2525
#[command(version, about, long_about = None)]
@@ -37,19 +37,13 @@ struct Args {
3737
#[arg(name = "Proof file name", long = "proof")]
3838
proof_file_name: PathBuf,
3939

40-
#[arg(
41-
name = "Public input file name",
42-
long = "public_input",
43-
)]
40+
#[arg(name = "Public input file name", long = "public_input")]
4441
pub_input_file_name: Option<PathBuf>,
4542

4643
#[arg(name = "Verification key file name", long = "vk")]
4744
verification_key_file_name: Option<PathBuf>,
4845

49-
#[arg(
50-
name = "VM prgram code file name",
51-
long = "vm_program",
52-
)]
46+
#[arg(name = "VM prgram code file name", long = "vm_program")]
5347
vm_program_code_file_name: Option<PathBuf>,
5448

5549
#[arg(
@@ -105,35 +99,48 @@ async fn receive(
10599
total_messages: usize,
106100
num_responses: Arc<Mutex<usize>>,
107101
) -> Result<(), BatcherClientError> {
108-
ws_read
109-
.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
110-
.try_for_each(|msg| async {
102+
// Responses are filtered to only admit binary or close messages.
103+
let mut response_stream =
104+
ws_read.try_filter(|msg| future::ready(msg.is_binary() || msg.is_close()));
105+
106+
while let Some(Ok(msg)) = response_stream.next().await {
107+
if let Message::Close(close_frame) = msg {
108+
if let Some(close_msg) = close_frame {
109+
error!("Connection was closed before receiving all messages. Reason: {}. Try submitting your proof again", close_msg.to_owned());
110+
ws_write.lock().await.close().await?;
111+
return Ok(());
112+
}
113+
error!("Connection was closed before receiving all messages. Try submitting your proof again");
114+
ws_write.lock().await.close().await?;
115+
return Ok(());
116+
} else {
111117
let mut num_responses_lock = num_responses.lock().await;
112118
*num_responses_lock += 1;
113-
let data = msg.into_data();
114-
let deserialized_data: BatchInclusionData = serde_json::from_slice(&data).unwrap();
115-
info!("Batcher response received: {}", deserialized_data);
116-
117-
let batch_merkle_root_hex = hex::encode(deserialized_data.batch_merkle_root);
118-
info!(
119-
"See the batch in the explorer:\nhttps://explorer.alignedlayer.com/batches/0x{}",
120-
batch_merkle_root_hex
121-
);
122119

120+
let data = msg.into_data();
121+
match serde_json::from_slice::<BatchInclusionData>(&data) {
122+
Ok(batch_inclusion_data) => {
123+
info!("Batcher response received: {}", batch_inclusion_data);
124+
info!("Proof verified in aligned. See the batch in the explorer:\nhttps://explorer.alignedlayer.com/batches/0x{}", hex::encode(batch_inclusion_data.batch_merkle_root));
125+
}
126+
Err(e) => {
127+
error!("Error while deserializing batcher response: {}", e);
128+
}
129+
}
123130
if *num_responses_lock == total_messages {
124131
info!("All messages responded. Closing connection...");
125132
ws_write.lock().await.close().await?;
133+
return Ok(());
126134
}
127-
128-
Ok(())
129-
}).await?;
135+
}
136+
}
130137

131138
Ok(())
132139
}
133140

134141
fn verification_data_from_args(args: Args) -> Result<VerificationData, BatcherClientError> {
135142
let proving_system = parse_proving_system(&args.proving_system_flag)
136-
.map_err(|_| errors::BatcherClientError::InvalidProvingSystem(args.proving_system_flag))?;
143+
.map_err(|_| BatcherClientError::InvalidProvingSystem(args.proving_system_flag))?;
137144

138145
// Read proof file
139146
let proof = read_file(args.proof_file_name)?;
@@ -144,15 +151,21 @@ fn verification_data_from_args(args: Args) -> Result<VerificationData, BatcherCl
144151

145152
match proving_system {
146153
ProvingSystemId::SP1 => {
147-
vm_program_code = Some(read_file_option("--vm_program", args.vm_program_code_file_name)?);
154+
vm_program_code = Some(read_file_option(
155+
"--vm_program",
156+
args.vm_program_code_file_name,
157+
)?);
148158
}
149159
ProvingSystemId::Halo2KZG
150160
| ProvingSystemId::Halo2IPA
151161
| ProvingSystemId::GnarkPlonkBls12_381
152162
| ProvingSystemId::GnarkPlonkBn254
153163
| ProvingSystemId::Groth16Bn254 => {
154164
verification_key = Some(read_file_option("--vk", args.verification_key_file_name)?);
155-
pub_input = Some(read_file_option("--public_input", args.pub_input_file_name)?);
165+
pub_input = Some(read_file_option(
166+
"--public_input",
167+
args.pub_input_file_name,
168+
)?);
156169
}
157170
}
158171

@@ -170,11 +183,14 @@ fn verification_data_from_args(args: Args) -> Result<VerificationData, BatcherCl
170183
}
171184

172185
fn read_file(file_name: PathBuf) -> Result<Vec<u8>, BatcherClientError> {
173-
std::fs::read(&file_name)
174-
.map_err(|e| BatcherClientError::IoError(file_name, e))
186+
std::fs::read(&file_name).map_err(|e| BatcherClientError::IoError(file_name, e))
175187
}
176188

177-
fn read_file_option(param_name: &str, file_name: Option<PathBuf>) -> Result<Vec<u8>, BatcherClientError> {
178-
let file_name = file_name.ok_or(BatcherClientError::MissingParameter(param_name.to_string()))?;
189+
fn read_file_option(
190+
param_name: &str,
191+
file_name: Option<PathBuf>,
192+
) -> Result<Vec<u8>, BatcherClientError> {
193+
let file_name =
194+
file_name.ok_or(BatcherClientError::MissingParameter(param_name.to_string()))?;
179195
read_file(file_name)
180196
}

batcher/src/eth/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33

44
use ethers::prelude::k256::ecdsa::SigningKey;
55
use ethers::prelude::*;
6+
use stream::EventStream;
67

78
use crate::config::ECDSAConfig;
89

@@ -11,9 +12,21 @@ abigen!(
1112
"./src/eth/abi/AlignedLayerServiceManager.json"
1213
);
1314

15+
#[derive(Debug, Clone, EthEvent)]
16+
pub struct BatchVerified {
17+
pub batch_merkle_root: [u8; 32],
18+
}
19+
1420
pub type AlignedLayerServiceManager =
1521
AlignedLayerServiceManagerContract<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;
1622

23+
pub type BatchVerifiedEventStream<'s> = EventStream<
24+
's,
25+
FilterWatcher<'s, Http, Log>,
26+
BatchVerifiedFilter,
27+
ContractError<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>,
28+
>;
29+
1730
pub fn get_provider(eth_rpc_url: String) -> Result<Provider<Http>, anyhow::Error> {
1831
Provider::<Http>::try_from(eth_rpc_url).map_err(|err| anyhow::anyhow!(err))
1932
}

0 commit comments

Comments
 (0)