Skip to content

Commit 666cfe5

Browse files
committed
refactor: send() and receive() to use storage instead of channel
1 parent 2b21071 commit 666cfe5

File tree

3 files changed

+70
-50
lines changed

3 files changed

+70
-50
lines changed

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use futures_util::stream::{SplitSink, TryFilter};
1111
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
1212

1313
use crate::communication::serialization::{cbor_deserialize, cbor_serialize};
14+
use crate::core::types::BatchInclusionData;
1415
use crate::{
15-
communication::batch::handle_batch_inclusion_data,
16+
communication::batch::process_batcher_response,
1617
core::{
1718
errors::SubmitError,
1819
types::{
@@ -35,10 +36,10 @@ pub async fn send_messages(
3536
max_fees: &[U256],
3637
wallet: Wallet<SigningKey>,
3738
mut nonce: U256,
38-
sender_channel: tokio::sync::mpsc::Sender<VerificationDataCommitment>,
39-
) -> Result<(), SubmitError> {
39+
) -> Result<Vec<NoncedVerificationData>, SubmitError> {
4040
let chain_id = U256::from(wallet.chain_id());
4141
let mut ws_write = ws_write.lock().await;
42+
let mut sent_verification_data: Vec<NoncedVerificationData> = Vec::new();
4243

4344
for (idx, verification_data) in verification_data.iter().enumerate() {
4445
// Build each message to send
@@ -61,41 +62,32 @@ pub async fn send_messages(
6162
.map_err(SubmitError::WebSocketConnectionError)?;
6263

6364
debug!("{:?} Message sent", idx);
64-
65-
match sender_channel.send(verification_data.into()).await {//.map_err(|e| SubmitError::GenericError(e.to_string()))?;
66-
Ok(_) => {
67-
debug!("Message sent to channel");
68-
}
69-
Err(e) if e.to_string() == "channel closed" => { // happens when receive has exited, because batcher replied with error
70-
error!("Error sending message because batcher has previously replied with an error");
71-
// return Err(SubmitError::GenericError(("Batcher has previously replied with an error").to_string()));
72-
return Ok(());
73-
}
74-
Err(e) => {
75-
error!("Error sending message to channel: {:?}", e.to_string());
76-
return Err(SubmitError::GenericError(e.to_string()));
77-
}
78-
}
65+
66+
// Save the verification data commitment to read its response later
67+
sent_verification_data.push(verification_data);
7968
}
8069

81-
//sender_channel will be closed as it falls out of scope, sending a 'None' to the receiver
8270
info!("All messages sent");
83-
Ok(())
71+
Ok(sent_verification_data)
8472
}
8573

74+
75+
// Instead of using a channel, use a storage.
76+
// From there, you can match received messages to the ones you sent.
77+
78+
// TODO missing analyzing which is the last expected nonce.
79+
// When received message of last expected nonce, i can exit this function
8680
pub async fn receive(
8781
response_stream: Arc<Mutex<ResponseStream>>,
88-
mut receiver_channnel: tokio::sync::mpsc::Receiver<VerificationDataCommitment>,
82+
mut sent_verification_data: Vec<NoncedVerificationData>,
8983
) -> Result<Vec<AlignedVerificationData>, SubmitError> {
9084
// Responses are filtered to only admit binary or close messages.
9185
let mut response_stream = response_stream.lock().await;
92-
let mut aligned_verification_data: Vec<AlignedVerificationData> = Vec::new();
86+
let mut aligned_submitted_data: Vec<AlignedVerificationData> = Vec::new();
9387

94-
while let Some(verification_data_commitment) = receiver_channnel.recv().await { //while there are messages in the channel
95-
// Read from WS
96-
let msg = response_stream.next().await.unwrap().map_err(SubmitError::WebSocketConnectionError)?;
97-
98-
// If websocket was closed prematurely:
88+
// read from WS
89+
while let Some(Ok(msg)) = response_stream.next().await {
90+
// unexpected WS close:
9991
if let Message::Close(close_frame) = msg {
10092
if let Some(close_msg) = close_frame {
10193
return Err(SubmitError::WebSocketClosedUnexpectedlyError(
@@ -107,32 +99,36 @@ pub async fn receive(
10799
.to_string(),
108100
));
109101
}
110-
111-
process_batch_inclusion_data(
102+
103+
let batch_inclusion_data_message = handle_batcher_response(
112104
msg,
113-
&mut aligned_verification_data,
114-
verification_data_commitment,
115-
).await?; // If batcher returned an error, this will close the channel and return the error
105+
).await?;
106+
107+
let related_verification_data = match_batcher_response_with_stored_verification_data(
108+
&batch_inclusion_data_message,
109+
&mut sent_verification_data,
110+
)?;
111+
112+
let aligned_verification_data = process_batcher_response(
113+
batch_inclusion_data_message,
114+
related_verification_data,
115+
)?;
116+
117+
aligned_submitted_data.push(aligned_verification_data);
116118
}
117119

118-
info!("All message responses handled succesfully");
119-
Ok(aligned_verification_data)
120+
debug!("All message responses handled succesfully");
121+
Ok(aligned_submitted_data)
120122
}
121123

122-
async fn process_batch_inclusion_data(
124+
async fn handle_batcher_response(
123125
msg: Message,
124-
aligned_verification_data: &mut Vec<AlignedVerificationData>,
125-
verification_data_commitment: VerificationDataCommitment,
126-
) -> Result<(), SubmitError> {
126+
) -> Result<BatchInclusionData, SubmitError> {
127127

128128
let data = msg.into_data();
129129
match cbor_deserialize(data.as_slice()) {
130130
Ok(ResponseMessage::BatchInclusionData(batch_inclusion_data)) => { //OK case. Proofs was valid and it was included in this batch.
131-
let _ = handle_batch_inclusion_data(
132-
batch_inclusion_data,
133-
aligned_verification_data,
134-
verification_data_commitment,
135-
);
131+
return Ok(batch_inclusion_data);
136132
}
137133
Ok(ResponseMessage::InvalidNonce) => {
138134
error!("Batcher responded with invalid nonce");
@@ -205,13 +201,36 @@ async fn process_batch_inclusion_data(
205201
}
206202
Ok(ResponseMessage::Error(e)) => {
207203
error!("Batcher responded with error: {}", e);
208-
error!("Batcher responded with error: {}", e);
204+
return Err(SubmitError::GenericError(e))
209205
}
210206
Err(e) => {
211207
error!("Error while deserializing batch inclusion data: {}", e);
212208
return Err(SubmitError::SerializationError(e));
213209
}
214210
}
211+
}
212+
213+
// Used to match the message received from the batcher,
214+
// a BatchInclusionData corresponding to the data you need to verify your proof is in a batch
215+
// with the NoncedVerificationData you sent, used to verify the proof was indeed included in the batch
216+
fn match_batcher_response_with_stored_verification_data(
217+
batch_inclusion_data: &BatchInclusionData,
218+
sent_verification_data: &mut Vec<NoncedVerificationData>,
219+
) -> Result<VerificationDataCommitment, SubmitError> {
220+
debug!("Matching verification data with batcher response ...");
221+
let mut index = None;
222+
for (i, sent_nonced_verification_data) in sent_verification_data.iter_mut().enumerate() {
223+
if sent_nonced_verification_data.nonce == batch_inclusion_data.user_nonce {
224+
debug!("local nonced verification data matched with batcher response");
225+
index = Some(i);
226+
break;
227+
}
228+
}
229+
230+
if let Some(i) = index {
231+
let verification_data = sent_verification_data.swap_remove(i); //TODO maybe only remove?
232+
return Ok(verification_data.verification_data.clone().into());
233+
}
215234

216-
Ok(())
235+
Err(SubmitError::InvalidProofInclusionData)
217236
}

batcher/aligned-sdk/src/core/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub enum SubmitError {
101101
InvalidPaymentServiceAddress(H160, H160),
102102
BatchSubmissionFailed(String),
103103
AddToBatchError,
104+
InvalidProofInclusionData,
104105
GenericError(String),
105106
}
106107

@@ -210,6 +211,9 @@ impl fmt::Display for SubmitError {
210211
}
211212
SubmitError::ProofQueueFlushed => write!(f, "Batch reset"),
212213
SubmitError::AddToBatchError => write!(f, "Error while adding entry to batch"),
214+
SubmitError::InvalidProofInclusionData => {
215+
write!(f, "Batcher responded with invalid batch inclusion data. Your proof was not correctly included in the batch.")
216+
}
213217
}
214218
}
215219
}

batcher/aligned-sdk/src/sdk.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use sha3::{Digest, Keccak256};
3333
use std::{str::FromStr, sync::Arc};
3434
use tokio::{net::TcpStream, sync::Mutex};
3535
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
36-
use tokio::sync::mpsc;
3736

3837
use log::{debug, info};
3938

@@ -312,17 +311,15 @@ async fn _submit_multiple(
312311

313312
let payment_service_addr = get_payment_service_address(network);
314313

315-
let (sender_channel, receiver_channel) = mpsc::channel(50000); //TODO Magic number
316-
317314
// done sequencial
318315
// added size check to avoid sequencial is too big
319316
// chequeando el nonce del ultimo, puedo cortar la conexión cuando recibo su respuesta
320317
// agregar nonce en la respuesta del batcher al sender
321318
// sacar el mensaje de que la proof es una replacement
322319

323320
let result = async {
324-
send_messages(ws_write, payment_service_addr, verification_data, max_fees, wallet, nonce, sender_channel).await?;
325-
receive(response_stream, receiver_channel).await
321+
let sent_verification_data = send_messages(ws_write, payment_service_addr, verification_data, max_fees, wallet, nonce).await?;
322+
receive(response_stream, sent_verification_data).await
326323
}.await;
327324

328325
// Close connection

0 commit comments

Comments
 (0)