Skip to content

Commit ce07c08

Browse files
committed
feat: handle channel errors
1 parent ff9cf81 commit ce07c08

File tree

1 file changed

+53
-22
lines changed

1 file changed

+53
-22
lines changed

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub async fn send_messages(
3636
wallet: Wallet<SigningKey>,
3737
mut nonce: U256,
3838
sender_channel: tokio::sync::mpsc::Sender<VerificationDataCommitment>,
39-
) -> Result<bool, SubmitError> {
39+
) -> Result<(), SubmitError> {
4040
let chain_id = U256::from(wallet.chain_id());
4141
let mut ws_write = ws_write.lock().await;
4242

@@ -62,11 +62,25 @@ pub async fn send_messages(
6262

6363
debug!("{:?} Message sent", idx);
6464

65-
sender_channel.send(verification_data.into()).await.unwrap();
65+
match sender_channel.send(verification_data.into()).await {//.map_err(|e| SubmitError::GenericError(e.to_string()))?;
66+
Ok(_) => {
67+
debug!("Message sent to channel");
68+
}
69+
Err(e) if e.to_string() == "channel closed" => { // happens when receive has exited, because batcher replied with error
70+
error!("Error sending message because batcher has previously replied with an error");
71+
// return Err(SubmitError::GenericError(("Batcher has previously replied with an error").to_string()));
72+
return Ok(());
73+
}
74+
Err(e) => {
75+
error!("Error sending message to channel: {:?}", e.to_string());
76+
return Err(SubmitError::GenericError(e.to_string()));
77+
}
78+
}
6679
}
6780

6881
//sender_channel will be closed as it falls out of scope, sending a 'None' to the receiver
69-
Ok(true)
82+
info!("All messages sent");
83+
Ok(())
7084
}
7185

7286
pub async fn receive(
@@ -77,38 +91,32 @@ pub async fn receive(
7791
let mut response_stream = response_stream.lock().await;
7892
let mut aligned_verification_data: Vec<AlignedVerificationData> = Vec::new();
7993

80-
while let Some(Ok(msg)) = response_stream.next().await {
94+
while let Some(verification_data_commitment) = receiver_channnel.recv().await { //while there are messages in the channel
95+
// Read from WS
96+
let msg = response_stream.next().await.unwrap().map_err(SubmitError::WebSocketConnectionError)?;
97+
98+
// If websocket was closed prematurely:
8199
if let Message::Close(close_frame) = msg {
82100
if let Some(close_msg) = close_frame {
83101
return Err(SubmitError::WebSocketClosedUnexpectedlyError(
84102
close_msg.to_owned(),
85103
));
86104
}
87105
return Err(SubmitError::GenericError(
88-
"Connection was closed without close message before receiving all messages"
106+
"Connection was closed before receive() processed all sent messages "
89107
.to_string(),
90108
));
91109
}
92110

93-
match receiver_channnel.recv().await {
94-
Some(verification_data_commitment) => {
95-
process_batch_inclusion_data(
96-
msg,
97-
&mut aligned_verification_data,
98-
verification_data_commitment,
99-
)
100-
.await?;
101-
}
102-
None => { //channel sends None when writing to it is closed
103-
info!("All message responses received");
104-
return Ok(aligned_verification_data);
105-
}
106-
}
111+
process_batch_inclusion_data(
112+
msg,
113+
&mut aligned_verification_data,
114+
verification_data_commitment,
115+
).await?; // If batcher returned an error, this will close the channel and return the error
107116
}
108117

109-
Err(SubmitError::GenericError(
110-
"Connection was closed without close message before receiving all messages".to_string(),
111-
))
118+
info!("All message responses handled succesfully");
119+
Ok(aligned_verification_data)
112120
}
113121

114122
async fn process_batch_inclusion_data(
@@ -126,62 +134,85 @@ async fn process_batch_inclusion_data(
126134
verification_data_commitment,
127135
);
128136
}
137+
Ok(ResponseMessage::ReplacementMessageReceived) => {
138+
// This message is not processed, it is only used to signal the client that the replacement message was received by the batcher.
139+
// This is because the sender expects to receive the same amount of messages as it has sent.
140+
}
129141
Ok(ResponseMessage::InvalidNonce) => {
142+
error!("Batcher responded with invalid nonce");
130143
return Err(SubmitError::InvalidNonce);
131144
}
132145
Ok(ResponseMessage::InvalidSignature) => {
146+
error!("Batcher responded with invalid signature");
133147
return Err(SubmitError::InvalidSignature);
134148
}
135149
Ok(ResponseMessage::ProofTooLarge) => {
150+
error!("Batcher responded with proof too large");
136151
return Err(SubmitError::ProofTooLarge);
137152
}
138153
Ok(ResponseMessage::InvalidMaxFee) => {
154+
error!("Batcher responded with invalid max fee");
139155
return Err(SubmitError::InvalidMaxFee);
140156
}
141157
Ok(ResponseMessage::InsufficientBalance(addr)) => {
158+
error!("Batcher responded with insufficient balance");
142159
return Err(SubmitError::InsufficientBalance(addr));
143160
}
144161
Ok(ResponseMessage::InvalidChainId) => {
162+
error!("Batcher responded with invalid chain id");
145163
return Err(SubmitError::InvalidChainId);
146164
}
147165
Ok(ResponseMessage::InvalidReplacementMessage) => {
166+
error!("Batcher responded with invalid replacement message");
148167
return Err(SubmitError::InvalidReplacementMessage);
149168
}
150169
Ok(ResponseMessage::AddToBatchError) => {
170+
error!("Batcher responded with add to batch error");
151171
return Err(SubmitError::AddToBatchError);
152172
}
153173
Ok(ResponseMessage::EthRpcError) => {
174+
error!("Batcher experienced Eth RPC connection error");
154175
return Err(SubmitError::EthereumProviderError(
155176
"Batcher experienced Eth RPC connection error".to_string(),
156177
));
157178
}
158179
Ok(ResponseMessage::InvalidPaymentServiceAddress(received_addr, expected_addr)) => {
180+
error!(
181+
"Batcher responded with invalid payment service address: {:?}, expected: {:?}",
182+
received_addr, expected_addr
183+
);
159184
return Err(SubmitError::InvalidPaymentServiceAddress(
160185
received_addr,
161186
expected_addr,
162187
));
163188
}
164189
Ok(ResponseMessage::InvalidProof(reason)) => {
190+
error!("Batcher responded with invalid proof: {}", reason);
165191
return Err(SubmitError::InvalidProof(reason));
166192
}
167193
Ok(ResponseMessage::CreateNewTaskError(merkle_root, error)) => {
194+
error!("Batcher responded with create new task error: {}", error);
168195
return Err(SubmitError::BatchSubmissionFailed(
169196
"Could not create task with merkle root ".to_owned() + &merkle_root + ", failed with error: " + &error,
170197
));
171198
}
172199
Ok(ResponseMessage::ProtocolVersion(_)) => {
200+
error!("Batcher responded with protocol version instead of batch inclusion data");
173201
return Err(SubmitError::UnexpectedBatcherResponse(
174202
"Batcher responded with protocol version instead of batch inclusion data"
175203
.to_string(),
176204
));
177205
}
178206
Ok(ResponseMessage::BatchReset) => {
207+
error!("Batcher responded with batch reset");
179208
return Err(SubmitError::ProofQueueFlushed);
180209
}
181210
Ok(ResponseMessage::Error(e)) => {
182211
error!("Batcher responded with error: {}", e);
212+
error!("Batcher responded with error: {}", e);
183213
}
184214
Err(e) => {
215+
error!("Error while deserializing batch inclusion data: {}", e);
185216
return Err(SubmitError::SerializationError(e));
186217
}
187218
}

0 commit comments

Comments
 (0)