Skip to content

Commit fe21a4a

Browse files
MauroToscanoMarcosNicolauJuArce
authored
Restore flush logic (Nuclear option for state recovery) (#2121)
Co-authored-by: Marcos Nicolau <[email protected]> Co-authored-by: JuArce <[email protected]>
1 parent 73661ab commit fe21a4a

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

crates/batcher/src/lib.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,24 @@ impl Batcher {
20532053
warn!("User {:?} has insufficient balance, flushing entire queue as safety measure", address);
20542054

20552055
self.flush_queue_and_clear_nonce_cache().await;
2056+
2057+
for entry in finalized_batch {
2058+
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
2059+
tokio::spawn(send_message(
2060+
ws_sink.clone(),
2061+
SubmitProofResponseMessage::BatchReset,
2062+
));
2063+
} else {
2064+
warn!(
2065+
"Websocket sink was found empty. This should only happen in tests"
2066+
);
2067+
}
2068+
}
2069+
2070+
return Err(BatcherError::StateCorruptedAndFlushed(format!(
2071+
"Queue and user states flushed due to insufficient balance for user {:?}",
2072+
address
2073+
)));
20562074
}
20572075
_ => {
20582076
// Add more cases here if we want in the future
@@ -2090,7 +2108,10 @@ impl Batcher {
20902108
let mut batch_state_lock = self.batch_state.lock().await;
20912109
for (entry, _) in batch_state_lock.batch_queue.iter() {
20922110
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
2093-
send_message(ws_sink.clone(), SubmitProofResponseMessage::BatchReset).await;
2111+
tokio::spawn(send_message(
2112+
ws_sink.clone(),
2113+
SubmitProofResponseMessage::BatchReset,
2114+
));
20942115
} else {
20952116
warn!("Websocket sink was found empty. This should only happen in tests");
20962117
}
@@ -2179,12 +2200,19 @@ impl Batcher {
21792200

21802201
// If batch finalization failed, restore the proofs to the queue
21812202
if let Err(e) = batch_finalization_result {
2182-
error!(
2183-
"Batch finalization failed, restoring proofs to queue: {:?}",
2184-
e
2185-
);
2186-
self.restore_proofs_after_batch_failure(&finalized_batch)
2187-
.await;
2203+
error!("Batch finalization failed: {:?}", e);
2204+
2205+
// If the queue was flushed, don't recover
2206+
match &e {
2207+
BatcherError::StateCorruptedAndFlushed(_) => {
2208+
info!("State was corrupted and flushed - not restoring proofs");
2209+
}
2210+
_ => {
2211+
info!("Restoring proofs to queue after batch failure");
2212+
self.restore_proofs_after_batch_failure(&finalized_batch)
2213+
.await;
2214+
}
2215+
}
21882216
return Err(e);
21892217
}
21902218
}

crates/batcher/src/types/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub enum BatcherError {
6666
WsSinkEmpty,
6767
AddressNotFoundInUserStates(Address),
6868
QueueRemoveError(String),
69+
StateCorruptedAndFlushed(String),
6970
EthereumProviderError(String),
7071
}
7172

@@ -148,6 +149,9 @@ impl fmt::Debug for BatcherError {
148149
BatcherError::QueueRemoveError(e) => {
149150
write!(f, "Error while removing entry from queue: {}", e)
150151
}
152+
BatcherError::StateCorruptedAndFlushed(reason) => {
153+
write!(f, "Batcher state was corrupted and flushed: {}", reason)
154+
}
151155
BatcherError::EthereumProviderError(e) => {
152156
write!(f, "Ethereum provider error: {}", e)
153157
}

0 commit comments

Comments
 (0)