Skip to content

Commit 5a43143

Browse files
committed
feat: build and return array of results instead of result<vec,err>
1 parent 4a6c6ca commit 5a43143

File tree

3 files changed

+132
-76
lines changed

3 files changed

+132
-76
lines changed

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 68 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ pub async fn send_messages(
3939
max_fees: &[U256],
4040
wallet: Wallet<SigningKey>,
4141
mut nonce: U256,
42-
) -> Result<Vec<NoncedVerificationData>, SubmitError> {
42+
) -> Vec<Result<NoncedVerificationData, SubmitError>> {
4343
let chain_id = U256::from(wallet.chain_id());
4444
let mut ws_write = ws_write.lock().await;
45-
let mut sent_verification_data: Vec<NoncedVerificationData> = Vec::new();
45+
let mut sent_verification_data: Vec<Result<NoncedVerificationData, SubmitError>> = Vec::new();
4646

4747
for (idx, verification_data) in verification_data.iter().enumerate() {
4848
// Build each message to send
@@ -56,29 +56,37 @@ pub async fn send_messages(
5656

5757
nonce += U256::one();
5858
let msg = ClientMessage::new(verification_data.clone(), wallet.clone()).await;
59-
let msg_bin = cbor_serialize(&msg).map_err(SubmitError::SerializationError)?;
60-
59+
let msg_bin = match cbor_serialize(&msg).map_err(SubmitError::SerializationError) {
60+
Ok(bin) => bin,
61+
Err(e) => {
62+
error!("Error while serializing message: {:?}", e);
63+
sent_verification_data.push(Err(e));
64+
return sent_verification_data;
65+
}
66+
};
67+
6168
// Send the message
62-
ws_write
63-
.send(Message::Binary(msg_bin.clone()))
64-
.await
65-
.map_err(SubmitError::WebSocketConnectionError)?;
69+
if let Err(e) = ws_write.send(Message::Binary(msg_bin.clone())).await {
70+
error!("Error while sending message: {:?}", e);
71+
sent_verification_data.push(Err(SubmitError::WebSocketConnectionError(e)));
72+
return sent_verification_data;
73+
}
6674

6775
debug!("{:?} Message sent", idx);
6876

6977
// Save the verification data commitment to read its response later
70-
sent_verification_data.push(verification_data);
78+
sent_verification_data.push(Ok(verification_data));
7179
}
7280

7381
info!("All proofs sent");
7482
// This vector is reversed so that while responses are received, removing from the end is cheaper.
75-
let sent_verification_data_rev: Vec<NoncedVerificationData> =
83+
let sent_verification_data_rev: Vec<Result<NoncedVerificationData, SubmitError>> =
7684
sent_verification_data
7785
.into_iter()
7886
.map(|vd| vd.into())
7987
.rev()
8088
.collect();
81-
Ok(sent_verification_data_rev)
89+
sent_verification_data_rev
8290
}
8391

8492
// Receives the array of proofs sent
@@ -87,11 +95,11 @@ pub async fn send_messages(
8795
// finishes when the last proof sent receives its response
8896
pub async fn receive(
8997
response_stream: Arc<Mutex<ResponseStream>>,
90-
mut sent_verification_data_rev: Vec<NoncedVerificationData>,
91-
) -> Result<Vec<AlignedVerificationData>, SubmitError> {
98+
mut sent_verification_data_rev: Vec<Result<NoncedVerificationData, SubmitError>>,
99+
) -> Vec<Result<AlignedVerificationData, SubmitError>> {
92100
// Responses are filtered to only admit binary or close messages.
93101
let mut response_stream = response_stream.lock().await;
94-
let mut aligned_submitted_data: Vec<AlignedVerificationData> = Vec::new();
102+
let mut aligned_submitted_data: Vec<Result<AlignedVerificationData, SubmitError>> = Vec::new();
95103
let last_proof_nonce = get_biggest_nonce(&sent_verification_data_rev);
96104

97105
// read from WS
@@ -100,35 +108,51 @@ pub async fn receive(
100108
if let Message::Close(close_frame) = msg {
101109
warn!("Unexpected WS close");
102110
if let Some(close_msg) = close_frame {
103-
return Err(SubmitError::WebSocketClosedUnexpectedlyError(
104-
close_msg.to_owned(),
105-
));
111+
aligned_submitted_data.push(Err(SubmitError::WebSocketClosedUnexpectedlyError(close_msg.to_owned())));
112+
break;
106113
}
107-
return Err(SubmitError::GenericError(
108-
"Connection was closed before receive() processed all sent messages ".to_string(),
109-
));
114+
aligned_submitted_data.push(Err(SubmitError::GenericError("Connection was closed before receive() processed all sent messages ".to_string())));
115+
break;
110116
}
111117

112-
let batch_inclusion_data_message = handle_batcher_response(msg).await?;
118+
// first error msg from batcher will drop the rest of the messages in the burst
113119

114-
let related_verification_data = match_batcher_response_with_stored_verification_data(
115-
&batch_inclusion_data_message,
116-
&mut sent_verification_data_rev,
117-
)?;
120+
let batch_inclusion_data_message = match handle_batcher_response(msg).await {
121+
Ok(data) => data,
122+
Err(e) => {
123+
warn!("Error while handling batcher response: {:?}", e);
124+
aligned_submitted_data.push(Err(e));
125+
break;
126+
}
127+
};
118128

119-
let aligned_verification_data =
120-
process_batcher_response(&batch_inclusion_data_message, &related_verification_data)?;
129+
let related_verification_data = match match_batcher_response_with_stored_verification_data(&batch_inclusion_data_message, &mut sent_verification_data_rev) {
130+
Ok(data) => data,
131+
Err(e) => {
132+
warn!("Error while matching batcher response with sent data: {:?}", e);
133+
aligned_submitted_data.push(Err(e));
134+
break;
135+
}
136+
};
121137

122-
aligned_submitted_data.push(aligned_verification_data);
138+
let aligned_verification_data = match process_batcher_response(&batch_inclusion_data_message, &related_verification_data) {
139+
Ok(data) => data,
140+
Err(e) => {
141+
warn!("Error while processing batcher response: {:?}", e);
142+
aligned_submitted_data.push(Err(e));
143+
break;
144+
}
145+
};
146+
147+
aligned_submitted_data.push(Ok(aligned_verification_data));
123148
debug!("Message response handled successfully");
124149

125150
if batch_inclusion_data_message.user_nonce == last_proof_nonce {
126151
break;
127152
}
128153
}
129154

130-
debug!("All proof responses handled successfully");
131-
Ok(aligned_submitted_data)
155+
aligned_submitted_data
132156
}
133157

134158
async fn handle_batcher_response(msg: Message) -> Result<BatchInclusionData, SubmitError> {
@@ -226,20 +250,23 @@ async fn handle_batcher_response(msg: Message) -> Result<BatchInclusionData, Sub
226250
// This is used to verify the proof you sent was indeed included in the batch
227251
fn match_batcher_response_with_stored_verification_data(
228252
batch_inclusion_data: &BatchInclusionData,
229-
sent_verification_data_rev: &mut Vec<NoncedVerificationData>,
253+
sent_verification_data_rev: &mut Vec<Result<NoncedVerificationData, SubmitError>>,
230254
) -> Result<VerificationDataCommitment, SubmitError> {
231255
debug!("Matching verification data with batcher response ...");
232256
let mut index = None;
233257
for (i, sent_nonced_verification_data) in sent_verification_data_rev.iter_mut().enumerate().rev() { // iterate in reverse since the last element is the most probable to match
234-
if sent_nonced_verification_data.nonce == batch_inclusion_data.user_nonce {
235-
debug!("local nonced verification data matched with batcher response");
236-
index = Some(i);
237-
break;
258+
if let Ok(sent_nonced_verification_data) = sent_nonced_verification_data {
259+
if sent_nonced_verification_data.nonce == batch_inclusion_data.user_nonce {
260+
debug!("local nonced verification data matched with batcher response");
261+
index = Some(i);
262+
break;
263+
}
238264
}
239265
}
240266

267+
// cant remove an element while iterating, so we remove it here
241268
if let Some(i) = index {
242-
let verification_data = sent_verification_data_rev.remove(i);
269+
let verification_data = sent_verification_data_rev.remove(i).unwrap();
243270
return Ok(verification_data.verification_data.clone().into());
244271
}
245272

@@ -249,11 +276,13 @@ fn match_batcher_response_with_stored_verification_data(
249276
// Returns the biggest nonce from the sent verification data
250277
// Used to know which is the last proof sent to the Batcher,
251278
// to know when to stop reading the WS for responses
252-
fn get_biggest_nonce(sent_verification_data: &[NoncedVerificationData]) -> U256 {
279+
fn get_biggest_nonce(sent_verification_data: &[Result<NoncedVerificationData, SubmitError>]) -> U256 {
253280
let mut biggest_nonce = U256::zero();
254281
for verification_data in sent_verification_data.iter() {
255-
if verification_data.nonce > biggest_nonce {
256-
biggest_nonce = verification_data.nonce;
282+
if let Ok(verification_data) = verification_data {
283+
if verification_data.nonce > biggest_nonce {
284+
biggest_nonce = verification_data.nonce;
285+
}
257286
}
258287
}
259288
biggest_nonce

batcher/aligned-sdk/src/sdk.rs

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,24 +85,33 @@ pub async fn submit_multiple_and_wait_verification(
8585
max_fees: &[U256],
8686
wallet: Wallet<SigningKey>,
8787
nonce: U256,
88-
) -> Result<Vec<AlignedVerificationData>, errors::SubmitError> {
89-
let aligned_verification_data = submit_multiple(
88+
) -> Vec<Result<AlignedVerificationData, errors::SubmitError>> {
89+
let mut aligned_verification_data = submit_multiple(
9090
batcher_url,
9191
network,
9292
verification_data,
9393
max_fees,
9494
wallet,
9595
nonce,
9696
)
97-
.await?;
97+
.await;
9898

9999
// TODO: open issue: use a join to .await all at the same time, avoiding the loop
100100
// And await only once per batch, no need to await multiple proofs if they are in the same batch.
101+
let mut error_awaiting_batch_verification: Option<errors::SubmitError> = None;
101102
for aligned_verification_data_item in aligned_verification_data.iter() {
102-
await_batch_verification(aligned_verification_data_item, eth_rpc_url, network).await?;
103+
if let Ok(aligned_verification_data_item) = aligned_verification_data_item {
104+
if let Err(e) = await_batch_verification(aligned_verification_data_item, eth_rpc_url, network).await {
105+
error_awaiting_batch_verification = Some(e);
106+
break;
107+
}
108+
}
109+
}
110+
if let Some(error_awaiting_batch_verification) = error_awaiting_batch_verification {
111+
aligned_verification_data.push(Err(error_awaiting_batch_verification));
103112
}
104113

105-
Ok(aligned_verification_data)
114+
aligned_verification_data
106115
}
107116

108117
/// Returns the estimated `max_fee` depending on the batch inclusion preference of the user, based on the max priority gas price.
@@ -238,10 +247,13 @@ pub async fn submit_multiple(
238247
max_fees: &[U256],
239248
wallet: Wallet<SigningKey>,
240249
nonce: U256,
241-
) -> Result<Vec<AlignedVerificationData>, errors::SubmitError> {
242-
let (ws_stream, _) = connect_async(batcher_url)
243-
.await
244-
.map_err(errors::SubmitError::WebSocketConnectionError)?;
250+
) -> Vec<Result<AlignedVerificationData, errors::SubmitError>> {
251+
let (ws_stream, _) = match connect_async(batcher_url).await {
252+
Ok((ws_stream, response)) => (ws_stream, response),
253+
Err(e) => {
254+
return vec![Err(errors::SubmitError::WebSocketConnectionError(e))]
255+
}
256+
};
245257

246258
debug!("WebSocket handshake has been successfully completed");
247259
let (ws_write, ws_read) = ws_stream.split();
@@ -288,20 +300,22 @@ async fn _submit_multiple(
288300
max_fees: &[U256],
289301
wallet: Wallet<SigningKey>,
290302
nonce: U256,
291-
) -> Result<Vec<AlignedVerificationData>, errors::SubmitError> {
303+
) -> Vec<Result<AlignedVerificationData, errors::SubmitError>> {
292304
// First message from the batcher is the protocol version
293-
check_protocol_version(&mut ws_read).await?;
305+
if let Err(e) = check_protocol_version(&mut ws_read).await {
306+
return vec![Err(e)];
307+
}
294308

295309
if verification_data.is_empty() {
296-
return Err(errors::SubmitError::MissingRequiredParameter(
310+
return vec![Err(errors::SubmitError::MissingRequiredParameter(
297311
"verification_data".to_string(),
298-
));
312+
))];
299313
}
300314
if verification_data.len() > 10000 {
301315
//TODO Magic number
302-
return Err(errors::SubmitError::GenericError(
316+
return vec![Err(errors::SubmitError::GenericError(
303317
"Trying to submit too many proofs at once".to_string(),
304-
));
318+
))];
305319
}
306320

307321
let ws_write_clone = ws_write.clone();
@@ -322,14 +336,16 @@ async fn _submit_multiple(
322336
wallet,
323337
nonce,
324338
)
325-
.await?;
339+
.await;
326340
receive(response_stream, sent_verification_data_rev).await
327341
}
328342
.await;
329343

330344
// Close connection
331345
info!("Closing WS connection");
332-
ws_write_clone.lock().await.close().await?;
346+
if let Err(e) = ws_write_clone.lock().await.close().await {
347+
return vec![Err(errors::SubmitError::GenericError(e.to_string()))];
348+
}
333349
result
334350
}
335351

@@ -386,9 +402,13 @@ pub async fn submit_and_wait_verification(
386402
wallet,
387403
nonce,
388404
)
389-
.await?;
405+
.await;
390406

391-
Ok(aligned_verification_data[0].clone())
407+
match aligned_verification_data.get(0) {
408+
Some(Ok(aligned_verification_data)) => Ok(aligned_verification_data.clone()),
409+
Some(Err(e)) => Err(errors::SubmitError::GenericError(e.to_string())),
410+
None => Err(errors::SubmitError::GenericError("No response from the batcher".to_string())),
411+
}
392412
}
393413

394414
/// Submits a proof to the batcher to be verified in Aligned.
@@ -435,9 +455,13 @@ pub async fn submit(
435455
wallet,
436456
nonce,
437457
)
438-
.await?;
458+
.await;
439459

440-
Ok(aligned_verification_data[0].clone())
460+
match aligned_verification_data.get(0) {
461+
Some(Ok(aligned_verification_data)) => Ok(aligned_verification_data.clone()),
462+
Some(Err(e)) => Err(errors::SubmitError::GenericError(e.to_string())),
463+
None => Err(errors::SubmitError::GenericError("No response from the batcher".to_string())),
464+
}
441465
}
442466

443467
/// Checks if the proof has been verified with Aligned and is included in the batch.

batcher/aligned/src/main.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -321,33 +321,36 @@ async fn main() -> Result<(), AlignedError> {
321321

322322
let max_fees = vec![max_fee; repetitions];
323323

324-
let aligned_verification_data_vec = match submit_multiple(
324+
let aligned_verification_data_vec = submit_multiple(
325325
&connect_addr,
326326
submit_args.network.into(),
327327
&verification_data_arr,
328328
&max_fees,
329329
wallet.clone(),
330330
nonce,
331331
)
332-
.await
333-
{
334-
Ok(aligned_verification_data_vec) => aligned_verification_data_vec,
335-
Err(e) => {
336-
let nonce_file = format!("nonce_{:?}.bin", wallet.address());
337-
338-
handle_submit_err(e, nonce_file.as_str()).await;
339-
return Ok(());
340-
}
341-
};
332+
.await;
342333

343334
let mut unique_batch_merkle_roots = HashSet::new();
344335

345336
for aligned_verification_data in aligned_verification_data_vec {
346-
save_response(
347-
batch_inclusion_data_directory_path.clone(),
348-
&aligned_verification_data,
349-
)?;
350-
unique_batch_merkle_roots.insert(aligned_verification_data.batch_merkle_root);
337+
match aligned_verification_data {
338+
Ok(aligned_verification_data) => {
339+
info!("Proof submitted to aligned. Batch merkle root: 0x{}", hex::encode(aligned_verification_data.batch_merkle_root));
340+
save_response(
341+
batch_inclusion_data_directory_path.clone(),
342+
&aligned_verification_data,
343+
)?;
344+
unique_batch_merkle_roots.insert(aligned_verification_data.batch_merkle_root);
345+
},
346+
Err(e) => {
347+
warn!("Error while submitting proof: {:?}", e);
348+
let nonce_file = format!("nonce_{:?}.bin", wallet.address());
349+
350+
handle_submit_err(e, nonce_file.as_str()).await;
351+
return Ok(());
352+
}
353+
};
351354
}
352355

353356
match unique_batch_merkle_roots.len() {

0 commit comments

Comments
 (0)