Skip to content

Commit 6c4689a

Browse files
committed
Stop processing users while restoring the queue
1 parent 78169a3 commit 6c4689a

File tree

1 file changed

+39
-2
lines changed

1 file changed

+39
-2
lines changed

crates/batcher/src/lib.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ pub struct Batcher {
102102
/// - Batch creation needs to be able to change all the states, so all processing
103103
/// needs to be stopped, and all user_states locks need to be taken
104104
batch_state: Mutex<BatchState>,
105+
/// Flag to indicate when restoration is in progress
106+
/// When true, message handlers will return ServerBusy responses
107+
/// It's used a way to "lock" all the user_states at the same time
108+
/// If one needed is taken in the handle message it will timeout
109+
is_restoration_in_progress: RwLock<bool>,
105110
user_states: DashMap<Address, Arc<Mutex<UserState>>>,
106111

107112
last_uploaded_batch_block: Mutex<u64>,
@@ -113,6 +118,8 @@ pub struct Batcher {
113118

114119
disabled_verifiers: Mutex<U256>,
115120

121+
122+
116123
// Observability and monitoring
117124
pub metrics: metrics::BatcherMetrics,
118125
pub telemetry: TelemetrySender,
@@ -292,6 +299,7 @@ impl Batcher {
292299
batch_state: Mutex::new(batch_state),
293300
user_states,
294301
disabled_verifiers: Mutex::new(disabled_verifiers),
302+
is_restoration_in_progress: RwLock::new(false),
295303
metrics,
296304
telemetry,
297305
}
@@ -665,6 +673,17 @@ impl Batcher {
665673
mut address: Address,
666674
ws_conn_sink: WsMessageSink,
667675
) -> Result<(), Error> {
676+
// Check if restoration is in progress
677+
if *self.is_restoration_in_progress.read().await {
678+
warn!(
679+
"Rejecting nonce request from {} during restoration",
680+
address
681+
);
682+
let response = GetNonceResponseMessage::ServerBusy;
683+
send_message(ws_conn_sink, response).await;
684+
return Ok(());
685+
}
686+
668687
// If the address is not paying, we will return the nonce of the aligned_payment_address
669688
if !self.has_to_pay(&address) {
670689
info!("Handling nonpaying message");
@@ -749,6 +768,17 @@ impl Batcher {
749768
debug!("Received message with nonce: {msg_nonce:?}");
750769
self.metrics.received_proofs.inc();
751770

771+
// Check if restoration is in progress
772+
if *self.is_restoration_in_progress.read().await {
773+
warn!(
774+
"Rejecting proof submission from {} during restoration (nonce: {})",
775+
client_msg.verification_data.verification_data.proof_generator_addr, msg_nonce
776+
);
777+
let response = SubmitProofResponseMessage::ServerBusy;
778+
send_message(ws_conn_sink, response).await;
779+
return Ok(());
780+
}
781+
752782
// * ---------------------------------------------------*
753783
// * Perform validations over the message *
754784
// * ---------------------------------------------------*
@@ -1530,6 +1560,9 @@ impl Batcher {
15301560
failed_batch.len()
15311561
);
15321562

1563+
// Set restoration flag to stop handling new user messages
1564+
*self.is_restoration_in_progress.write().await = true;
1565+
15331566
let mut batch_state_lock = self.batch_state.lock().await;
15341567
let mut restored_entries = Vec::new();
15351568

@@ -1591,10 +1624,10 @@ impl Batcher {
15911624
let users_with_restored_proofs: std::collections::HashSet<Address> =
15921625
restored_entries.iter().map(|entry| entry.sender).collect();
15931626

1594-
drop(batch_state_lock); // Release batch lock before user state updates
1595-
15961627
// Update user states for successfully restored proofs
15971628
info!("Updating user states after proof restoration...");
1629+
// TODO: We may have ejected some users that didn't have restored proofs,
1630+
// we should include in this list the ejected users
15981631
if let Err(e) = self
15991632
.update_user_states_from_queue_state(users_with_restored_proofs)
16001633
.await
@@ -1604,6 +1637,10 @@ impl Batcher {
16041637
e
16051638
);
16061639
}
1640+
1641+
// Clear restoration flag to allow normal user message handling
1642+
*self.is_restoration_in_progress.write().await = false;
1643+
info!("Proof restoration completed, resuming normal operations");
16071644
}
16081645

16091646
/// Takes the finalized batch as input and:

0 commit comments

Comments
 (0)