Skip to content

Commit 7d86f47

Browse files
committed
Add improved failure handling
1 parent 6c4689a commit 7d86f47

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

crates/batcher/src/lib.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,12 @@ pub struct Batcher {
102102
/// - Batch creation needs to be able to change all the states, so all processing
103103
/// needs to be stopped, and all user_states locks need to be taken
104104
batch_state: Mutex<BatchState>,
105-
/// Flag to indicate when restoration is in progress
105+
106+
/// Flag to indicate when recovery is in progress
106107
/// When true, message handlers will return ServerBusy responses
107108
/// It's used a way to "lock" all the user_states at the same time
108109
/// If one needed is taken in the handle message it will timeout
109-
is_restoration_in_progress: RwLock<bool>,
110+
is_recovering_from_submission_failure: RwLock<bool>,
110111
user_states: DashMap<Address, Arc<Mutex<UserState>>>,
111112

112113
last_uploaded_batch_block: Mutex<u64>,
@@ -118,8 +119,6 @@ pub struct Batcher {
118119

119120
disabled_verifiers: Mutex<U256>,
120121

121-
122-
123122
// Observability and monitoring
124123
pub metrics: metrics::BatcherMetrics,
125124
pub telemetry: TelemetrySender,
@@ -299,7 +298,7 @@ impl Batcher {
299298
batch_state: Mutex::new(batch_state),
300299
user_states,
301300
disabled_verifiers: Mutex::new(disabled_verifiers),
302-
is_restoration_in_progress: RwLock::new(false),
301+
is_recovering_from_submission_failure: RwLock::new(false),
303302
metrics,
304303
telemetry,
305304
}
@@ -674,7 +673,7 @@ impl Batcher {
674673
ws_conn_sink: WsMessageSink,
675674
) -> Result<(), Error> {
676675
// Check if restoration is in progress
677-
if *self.is_restoration_in_progress.read().await {
676+
if *self.is_recovering_from_submission_failure.read().await {
678677
warn!(
679678
"Rejecting nonce request from {} during restoration",
680679
address
@@ -769,7 +768,7 @@ impl Batcher {
769768
self.metrics.received_proofs.inc();
770769

771770
// Check if restoration is in progress
772-
if *self.is_restoration_in_progress.read().await {
771+
if *self.is_recovering_from_submission_failure.read().await {
773772
warn!(
774773
"Rejecting proof submission from {} during restoration (nonce: {})",
775774
client_msg.verification_data.verification_data.proof_generator_addr, msg_nonce
@@ -1561,7 +1560,7 @@ impl Batcher {
15611560
);
15621561

15631562
// Set restoration flag to stop handling new user messages
1564-
*self.is_restoration_in_progress.write().await = true;
1563+
*self.is_recovering_from_submission_failure.write().await = true;
15651564

15661565
let mut batch_state_lock = self.batch_state.lock().await;
15671566
let mut restored_entries = Vec::new();
@@ -1624,10 +1623,15 @@ impl Batcher {
16241623
let users_with_restored_proofs: std::collections::HashSet<Address> =
16251624
restored_entries.iter().map(|entry| entry.sender).collect();
16261625

1627-
// Update user states for successfully restored proofs
1628-
info!("Updating user states after proof restoration...");
1629-
// TODO: We may have ejected some users that didn't have restored proofs,
1630-
// we should include in this list the ejected users
1626+
// At this point we have a valid queue with updated evicted users states
1627+
// Only auxiliary user data (max_min_fee) can be "inconsistent"
1628+
// but we can keep updating it without locking the queue
1629+
info!("Queue recovered from submission failure, resuming user processing and updating user states metadata");
1630+
std::mem::drop(batch_state_lock);
1631+
*self.is_recovering_from_submission_failure.write().await = false;
1632+
1633+
1634+
info!("Updating user states after proof restoration...");
16311635
if let Err(e) = self
16321636
.update_user_states_from_queue_state(users_with_restored_proofs)
16331637
.await
@@ -1638,9 +1642,6 @@ impl Batcher {
16381642
);
16391643
}
16401644

1641-
// Clear restoration flag to allow normal user message handling
1642-
*self.is_restoration_in_progress.write().await = false;
1643-
info!("Proof restoration completed, resuming normal operations");
16441645
}
16451646

16461647
/// Takes the finalized batch as input and:

0 commit comments

Comments
 (0)