Skip to content

Commit 5298915

Browse files
committed
feat: messaging.rs with channels for new version with join
1 parent 88bc36f commit 5298915

File tree

1 file changed

+21
-31
lines changed

1 file changed

+21
-31
lines changed

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@ pub async fn send_messages(
3535
max_fees: &[U256],
3636
wallet: Wallet<SigningKey>,
3737
mut nonce: U256,
38-
) -> Result<Vec<VerificationDataCommitment>, SubmitError> {
39-
let mut sent_verification_data = Vec::new();
40-
38+
sender_channel: tokio::sync::mpsc::Sender<VerificationDataCommitment>,
39+
) -> Result<bool, SubmitError> {
4140
let chain_id = U256::from(wallet.chain_id());
42-
4341
let mut ws_write = ws_write.lock().await;
4442

4543
for (idx, verification_data) in verification_data.iter().enumerate() {
@@ -64,30 +62,20 @@ pub async fn send_messages(
6462

6563
debug!("{:?} Message sent", idx);
6664

67-
sent_verification_data.push(verification_data.clone());
65+
sender_channel.send(verification_data.into()).await.unwrap();
6866
}
6967

70-
// This vector is reversed so that when responses are received, the commitments corresponding
71-
// to that response can simply be popped of this vector.
72-
let verification_data_commitments_rev: Vec<VerificationDataCommitment> =
73-
sent_verification_data
74-
.into_iter()
75-
.map(|vd| vd.into())
76-
.rev()
77-
.collect();
78-
79-
Ok(verification_data_commitments_rev)
68+
//sender_channel will be closed as it falls out of scope, sending a 'None' to the receiver
69+
Ok(true)
8070
}
8171

8272
pub async fn receive(
8373
response_stream: Arc<Mutex<ResponseStream>>,
84-
total_messages: usize,
85-
verification_data_commitments_rev: &mut Vec<VerificationDataCommitment>,
74+
mut receiver_channnel: tokio::sync::mpsc::Receiver<VerificationDataCommitment>,
8675
) -> Result<Vec<AlignedVerificationData>, SubmitError> {
8776
// Responses are filtered to only admit binary or close messages.
8877
let mut response_stream = response_stream.lock().await;
8978
let mut aligned_verification_data: Vec<AlignedVerificationData> = Vec::new();
90-
let mut num_responses: usize = 0;
9179

9280
while let Some(Ok(msg)) = response_stream.next().await {
9381
if let Message::Close(close_frame) = msg {
@@ -102,17 +90,19 @@ pub async fn receive(
10290
));
10391
}
10492

105-
process_batch_inclusion_data(
106-
msg,
107-
&mut aligned_verification_data,
108-
verification_data_commitments_rev,
109-
)
110-
.await?;
111-
112-
num_responses += 1;
113-
if num_responses == total_messages {
114-
info!("All message responses received");
115-
return Ok(aligned_verification_data);
93+
match receiver_channnel.recv().await {
94+
Some(verification_data_commitment) => {
95+
process_batch_inclusion_data(
96+
msg,
97+
&mut aligned_verification_data,
98+
verification_data_commitment,
99+
)
100+
.await?;
101+
}
102+
None => { //channel sends None when writing to it is closed
103+
info!("All message responses received");
104+
return Ok(aligned_verification_data);
105+
}
116106
}
117107
}
118108

@@ -124,7 +114,7 @@ pub async fn receive(
124114
async fn process_batch_inclusion_data(
125115
msg: Message,
126116
aligned_verification_data: &mut Vec<AlignedVerificationData>,
127-
verification_data_commitments_rev: &mut Vec<VerificationDataCommitment>,
117+
verification_data_commitment: VerificationDataCommitment,
128118
) -> Result<(), SubmitError> {
129119

130120
let data = msg.into_data();
@@ -133,7 +123,7 @@ async fn process_batch_inclusion_data(
133123
let _ = handle_batch_inclusion_data(
134124
batch_inclusion_data,
135125
aligned_verification_data,
136-
verification_data_commitments_rev,
126+
verification_data_commitment,
137127
);
138128
}
139129
Ok(ResponseMessage::InvalidNonce) => {

0 commit comments

Comments
 (0)