Skip to content

Commit 8011158

Browse files
committed
refactor: send_message, receive and process_batch_inclusion_data
1 parent 8dc80d8 commit 8011158

File tree

1 file changed

+68
-110
lines changed

1 file changed

+68
-110
lines changed

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 68 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
errors::SubmitError,
1818
types::{
1919
AlignedVerificationData, ClientMessage, NoncedVerificationData, ResponseMessage,
20-
ValidityResponseMessage, VerificationData, VerificationDataCommitment,
20+
VerificationData, VerificationDataCommitment,
2121
},
2222
},
2323
};
@@ -29,23 +29,21 @@ pub type ResponseStream = TryFilter<
2929
>;
3030

3131
pub async fn send_messages(
32-
response_stream: Arc<Mutex<ResponseStream>>,
3332
ws_write: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
3433
payment_service_addr: Address,
3534
verification_data: &[VerificationData],
3635
max_fees: &[U256],
3736
wallet: Wallet<SigningKey>,
3837
mut nonce: U256,
39-
) -> Result<Vec<NoncedVerificationData>, SubmitError> {
38+
) -> Result<Vec<VerificationDataCommitment>, SubmitError> {
4039
let mut sent_verification_data = Vec::new();
4140

42-
let mut ws_write = ws_write.lock().await;
43-
44-
let mut response_stream = response_stream.lock().await;
45-
4641
let chain_id = U256::from(wallet.chain_id());
4742

43+
let mut ws_write = ws_write.lock().await;
44+
4845
for (idx, verification_data) in verification_data.iter().enumerate() {
46+
// Build each message to send
4947
let verification_data = NoncedVerificationData::new(
5048
verification_data.clone(),
5149
nonce,
@@ -55,113 +53,41 @@ pub async fn send_messages(
5553
);
5654

5755
nonce += U256::one();
58-
5956
let msg = ClientMessage::new(verification_data.clone(), wallet.clone()).await;
6057
let msg_bin = cbor_serialize(&msg).map_err(SubmitError::SerializationError)?;
58+
59+
// Send the message
6160
ws_write
6261
.send(Message::Binary(msg_bin.clone()))
6362
.await
6463
.map_err(SubmitError::WebSocketConnectionError)?;
6564

66-
debug!("Message sent...");
67-
68-
let msg = match response_stream.next().await {
69-
Some(Ok(msg)) => msg,
70-
_ => {
71-
return Err(SubmitError::GenericError(
72-
"Connection was closed without close message before receiving all messages"
73-
.to_string(),
74-
));
75-
}
76-
};
77-
78-
let response_msg: ValidityResponseMessage = cbor_deserialize(msg.into_data().as_slice())
79-
.map_err(SubmitError::SerializationError)?;
80-
81-
match response_msg {
82-
ValidityResponseMessage::Valid => {
83-
debug!("Message was valid");
84-
}
85-
ValidityResponseMessage::InvalidNonce => {
86-
info!("Invalid Nonce!");
87-
return Err(SubmitError::InvalidNonce);
88-
}
89-
ValidityResponseMessage::InvalidSignature => {
90-
error!("Invalid Signature!");
91-
return Err(SubmitError::InvalidSignature);
92-
}
93-
ValidityResponseMessage::ProofTooLarge => {
94-
error!("Proof too large!");
95-
return Err(SubmitError::ProofTooLarge);
96-
}
97-
ValidityResponseMessage::InvalidProof(reason) => {
98-
error!("Invalid Proof!: {}", reason);
99-
return Err(SubmitError::InvalidProof(reason));
100-
}
101-
ValidityResponseMessage::InvalidMaxFee => {
102-
error!("Invalid Max Fee!");
103-
return Err(SubmitError::InvalidMaxFee);
104-
}
105-
ValidityResponseMessage::InsufficientBalance(addr) => {
106-
error!("Insufficient balance for address: {}", addr);
107-
return Err(SubmitError::InsufficientBalance);
108-
}
109-
ValidityResponseMessage::InvalidChainId => {
110-
error!("Invalid chain id!");
111-
return Err(SubmitError::InvalidChainId);
112-
}
113-
ValidityResponseMessage::InvalidReplacementMessage => {
114-
error!("Invalid replacement message!");
115-
return Err(SubmitError::InvalidReplacementMessage);
116-
}
117-
ValidityResponseMessage::AddToBatchError => {
118-
error!("Error while pushing the entry to queue");
119-
return Err(SubmitError::AddToBatchError);
120-
}
121-
ValidityResponseMessage::EthRpcError => {
122-
return Err(SubmitError::EthereumProviderError(
123-
"Batcher experienced Eth RPC connection error".to_string(),
124-
));
125-
}
126-
ValidityResponseMessage::InvalidPaymentServiceAddress(received_addr, expected_addr) => {
127-
error!(
128-
"Invalid payment service address, received: {}, expected: {}",
129-
received_addr, expected_addr
130-
);
131-
return Err(SubmitError::InvalidPaymentServiceAddress(
132-
received_addr,
133-
expected_addr,
134-
));
135-
}
136-
ValidityResponseMessage::BatchInclusionData(data) => {
137-
debug!("Message was valid and included with the following BatchInclusionData: {:?}", data);
138-
139-
}
140-
ValidityResponseMessage::CreateNewTaskError(merkle_root, error) => {
141-
return Err(SubmitError::BatchSubmissionFailed(
142-
"Could not create task with merkle root ".to_owned() + &merkle_root + ", failed with error: " + &error,
143-
));
144-
}
145-
146-
};
65+
debug!("{:?} Message sent", idx);
14766

14867
sent_verification_data.push(verification_data.clone());
14968
}
15069

151-
Ok(sent_verification_data)
70+
// This vector is reversed so that when responses are received, the commitments corresponding
71+
// to that response can simply be popped of this vector.
72+
let verification_data_commitments_rev: Vec<VerificationDataCommitment> =
73+
sent_verification_data
74+
.into_iter()
75+
.map(|vd| vd.into())
76+
.rev()
77+
.collect();
78+
79+
Ok(verification_data_commitments_rev)
15280
}
15381

15482
pub async fn receive(
15583
response_stream: Arc<Mutex<ResponseStream>>,
156-
ws_write: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
15784
total_messages: usize,
158-
num_responses: Arc<Mutex<usize>>,
15985
verification_data_commitments_rev: &mut Vec<VerificationDataCommitment>,
16086
) -> Result<Vec<AlignedVerificationData>, SubmitError> {
16187
// Responses are filtered to only admit binary or close messages.
16288
let mut response_stream = response_stream.lock().await;
163-
16489
let mut aligned_verification_data: Vec<AlignedVerificationData> = Vec::new();
90+
let mut num_responses: usize = 0;
16591

16692
while let Some(Ok(msg)) = response_stream.next().await {
16793
if let Message::Close(close_frame) = msg {
@@ -175,17 +101,17 @@ pub async fn receive(
175101
.to_string(),
176102
));
177103
}
104+
178105
process_batch_inclusion_data(
179106
msg,
180107
&mut aligned_verification_data,
181108
verification_data_commitments_rev,
182-
num_responses.clone(),
183109
)
184110
.await?;
185-
186-
if *num_responses.lock().await == total_messages {
187-
debug!("All messages responded. Closing connection...");
188-
ws_write.lock().await.close().await?;
111+
112+
num_responses += 1;
113+
if num_responses == total_messages {
114+
info!("All message responses received");
189115
return Ok(aligned_verification_data);
190116
}
191117
}
@@ -199,20 +125,60 @@ async fn process_batch_inclusion_data(
199125
msg: Message,
200126
aligned_verification_data: &mut Vec<AlignedVerificationData>,
201127
verification_data_commitments_rev: &mut Vec<VerificationDataCommitment>,
202-
num_responses: Arc<Mutex<usize>>,
203128
) -> Result<(), SubmitError> {
204-
let mut num_responses_lock = num_responses.lock().await;
205-
*num_responses_lock += 1;
206129

207130
let data = msg.into_data();
208131
match cbor_deserialize(data.as_slice()) {
209-
Ok(ResponseMessage::BatchInclusionData(batch_inclusion_data)) => {
132+
Ok(ResponseMessage::BatchInclusionData(batch_inclusion_data)) => { //OK case. Proofs was valid and it was included in this batch.
210133
let _ = handle_batch_inclusion_data(
211134
batch_inclusion_data,
212135
aligned_verification_data,
213136
verification_data_commitments_rev,
214137
);
215138
}
139+
Ok(ResponseMessage::InvalidNonce) => {
140+
return Err(SubmitError::InvalidNonce);
141+
}
142+
Ok(ResponseMessage::InvalidSignature) => {
143+
return Err(SubmitError::InvalidSignature);
144+
}
145+
Ok(ResponseMessage::ProofTooLarge) => {
146+
return Err(SubmitError::ProofTooLarge);
147+
}
148+
Ok(ResponseMessage::InvalidMaxFee) => {
149+
return Err(SubmitError::InvalidMaxFee);
150+
}
151+
Ok(ResponseMessage::InsufficientBalance) => {
152+
return Err(SubmitError::InsufficientBalance);
153+
}
154+
Ok(ResponseMessage::InvalidChainId) => {
155+
return Err(SubmitError::InvalidChainId);
156+
}
157+
Ok(ResponseMessage::InvalidReplacementMessage) => {
158+
return Err(SubmitError::InvalidReplacementMessage);
159+
}
160+
Ok(ResponseMessage::AddToBatchError) => {
161+
return Err(SubmitError::AddToBatchError);
162+
}
163+
Ok(ResponseMessage::EthRpcError) => {
164+
return Err(SubmitError::EthereumProviderError(
165+
"Batcher experienced Eth RPC connection error".to_string(),
166+
));
167+
}
168+
Ok(ResponseMessage::InvalidPaymentServiceAddress(received_addr, expected_addr)) => {
169+
return Err(SubmitError::InvalidPaymentServiceAddress(
170+
received_addr,
171+
expected_addr,
172+
));
173+
}
174+
Ok(ResponseMessage::InvalidProof(reason)) => {
175+
return Err(SubmitError::InvalidProof(reason));
176+
}
177+
Ok(ResponseMessage::CreateNewTaskError(merkle_root, error)) => {
178+
return Err(SubmitError::BatchSubmissionFailed(
179+
"Could not create task with merkle root ".to_owned() + &merkle_root + ", failed with error: " + &error,
180+
));
181+
}
216182
Ok(ResponseMessage::ProtocolVersion(_)) => {
217183
return Err(SubmitError::UnexpectedBatcherResponse(
218184
"Batcher responded with protocol version instead of batch inclusion data"
@@ -225,14 +191,6 @@ async fn process_batch_inclusion_data(
225191
Ok(ResponseMessage::Error(e)) => {
226192
error!("Batcher responded with error: {}", e);
227193
}
228-
Ok(ResponseMessage::CreateNewTaskError(merkle_root, error)) => {
229-
return Err(SubmitError::BatchSubmissionFailed(
230-
"Could not create task with merkle root ".to_owned() + &merkle_root + ", failed with error: " + &error,
231-
));
232-
}
233-
Ok(ResponseMessage::InvalidProof(reason)) => {
234-
return Err(SubmitError::InvalidProof(reason));
235-
}
236194
Err(e) => {
237195
return Err(SubmitError::SerializationError(e));
238196
}

0 commit comments

Comments
 (0)