Skip to content

Commit 8924dbe

Browse files
JuArceMauroToscano
andauthored
feat(batcher): poll for unlock events and remove users with unlocked funds (#2123)
Co-authored-by: MauroFab <[email protected]> Co-authored-by: Mauro Toscano <[email protected]>
1 parent 4705e18 commit 8924dbe

File tree

14 files changed

+397
-21
lines changed

14 files changed

+397
-21
lines changed

config-files/config-batcher-docker.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ batcher:
3737
# When replacing the message, how much higher should the max fee in comparison to the original one
3838
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
3939
min_bump_percentage: 10
40+
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
41+
balance_unlock_polling_interval_seconds: 600
4042

config-files/config-batcher-ethereum-package.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ batcher:
3535
# When replacing the message, how much higher should the max fee in comparison to the original one
3636
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
3737
min_bump_percentage: 10
38+
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
39+
balance_unlock_polling_interval_seconds: 600
3840

config-files/config-batcher.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,5 @@ batcher:
3737
# When replacing the message, how much higher should the max fee in comparison to the original one
3838
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
3939
min_bump_percentage: 10
40+
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
41+
balance_unlock_polling_interval_seconds: 600

crates/batcher/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct BatcherConfigFromYaml {
5353
pub non_paying: Option<NonPayingConfigFromYaml>,
5454
pub amount_of_proofs_for_min_max_fee: usize,
5555
pub min_bump_percentage: u64,
56+
pub balance_unlock_polling_interval_seconds: u64,
5657
}
5758

5859
#[derive(Debug, Deserialize)]

crates/batcher/src/lib.rs

Lines changed: 227 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use eth::utils::{calculate_bumped_gas_price, get_batcher_signer, get_gas_price};
77
use ethers::contract::ContractError;
88
use ethers::signers::Signer;
99
use retry::batcher_retryables::{
10-
cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable,
11-
get_user_nonce_from_ethereum_retryable, simulate_create_new_task_retryable,
12-
user_balance_is_unlocked_retryable,
10+
cancel_create_new_task_retryable, create_new_task_retryable,
11+
get_current_block_number_retryable, get_user_balance_retryable,
12+
get_user_nonce_from_ethereum_retryable, query_balance_unlocked_events_retryable,
13+
simulate_create_new_task_retryable, user_balance_is_unlocked_retryable,
1314
};
1415
use retry::{retry_function, RetryError};
1516
use tokio::time::{timeout, Instant};
@@ -39,8 +40,8 @@ use aligned_sdk::common::types::{
3940

4041
use aws_sdk_s3::client::Client as S3Client;
4142
use eth::payment_service::{BatcherPaymentService, CreateNewTaskFeeParams, SignerMiddlewareT};
42-
use ethers::prelude::{Middleware, Provider};
43-
use ethers::types::{Address, Signature, TransactionReceipt, U256};
43+
use ethers::prelude::{Http, Middleware, Provider};
44+
use ethers::types::{Address, Signature, TransactionReceipt, U256, U64};
4445
use futures_util::{future, join, SinkExt, StreamExt, TryStreamExt};
4546
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
4647
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
@@ -50,6 +51,7 @@ use tokio::sync::{Mutex, MutexGuard, RwLock};
5051

5152
// Message handler lock timeout
5253
const MESSAGE_HANDLER_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
54+
const POLLING_EVENTS_LOCK_TIMEOUT: Duration = Duration::from_secs(300);
5355
use tokio_tungstenite::tungstenite::{Error, Message};
5456
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
5557
use types::errors::{BatcherError, TransactionSendError};
@@ -86,6 +88,8 @@ pub struct Batcher {
8688
eth_ws_url_fallback: String,
8789
batcher_signer: Arc<SignerMiddlewareT>,
8890
batcher_signer_fallback: Arc<SignerMiddlewareT>,
91+
eth_http_provider: Provider<Http>,
92+
eth_http_provider_fallback: Provider<Http>,
8993
chain_id: U256,
9094
payment_service: BatcherPaymentService,
9195
payment_service_fallback: BatcherPaymentService,
@@ -103,6 +107,7 @@ pub struct Batcher {
103107
current_min_max_fee: RwLock<U256>,
104108
amount_of_proofs_for_min_max_fee: usize,
105109
min_bump_percentage: U256,
110+
balance_unlock_polling_interval_seconds: u64,
106111

107112
// Shared state access:
108113
// Two kinds of threads interact with the shared state:
@@ -315,6 +320,8 @@ impl Batcher {
315320
eth_ws_url_fallback: config.eth_ws_url_fallback,
316321
batcher_signer,
317322
batcher_signer_fallback,
323+
eth_http_provider,
324+
eth_http_provider_fallback,
318325
chain_id,
319326
payment_service,
320327
payment_service_fallback,
@@ -327,6 +334,9 @@ impl Batcher {
327334
max_batch_proof_qty: config.batcher.max_batch_proof_qty,
328335
amount_of_proofs_for_min_max_fee: config.batcher.amount_of_proofs_for_min_max_fee,
329336
min_bump_percentage: U256::from(config.batcher.min_bump_percentage),
337+
balance_unlock_polling_interval_seconds: config
338+
.batcher
339+
.balance_unlock_polling_interval_seconds,
330340
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
331341
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
332342
non_paying_config,
@@ -436,12 +446,16 @@ impl Batcher {
436446
}
437447
}
438448

439-
/// Helper to apply 15-second timeout to batch lock acquisition with consistent logging and metrics
440-
async fn try_batch_lock_with_timeout<F, T>(&self, lock_future: F) -> Option<T>
449+
/// Helper to apply `duration` timeout to batch lock acquisition with consistent logging and metrics
450+
async fn try_batch_lock_with_timeout<F, T>(
451+
&self,
452+
lock_future: F,
453+
duration: Duration,
454+
) -> Option<T>
441455
where
442456
F: std::future::Future<Output = T>,
443457
{
444-
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
458+
match timeout(duration, lock_future).await {
445459
Ok(result) => Some(result),
446460
Err(_) => {
447461
warn!("Batch lock acquisition timed out");
@@ -491,6 +505,204 @@ impl Batcher {
491505
.map_err(|e| e.inner())
492506
}
493507

508+
/// Poll for BalanceUnlocked events from BatcherPaymentService contract.
509+
/// Runs at configurable intervals and checks recent blocks for events (2x the polling interval).
510+
/// When an event is detected, removes user's proofs from queue and resets UserState.
511+
pub async fn poll_balance_unlocked_events(self: Arc<Self>) -> Result<(), BatcherError> {
512+
let mut interval = tokio::time::interval(Duration::from_secs(
513+
self.balance_unlock_polling_interval_seconds,
514+
));
515+
let mut from_block = self.get_current_block_number().await.map_err(|e| {
516+
BatcherError::EthereumProviderError(format!(
517+
"Failed to get current block number: {:?}",
518+
e
519+
))
520+
})?;
521+
522+
loop {
523+
interval.tick().await;
524+
525+
match self.process_balance_unlocked_events(from_block).await {
526+
Ok(current_block) => {
527+
from_block = current_block;
528+
}
529+
Err(e) => {
530+
error!("Error processing BalanceUnlocked events: {:?}", e);
531+
// On error, keep from_block unchanged to retry the same range next time
532+
}
533+
}
534+
}
535+
}
536+
537+
async fn process_balance_unlocked_events(&self, from_block: U64) -> Result<U64, BatcherError> {
538+
// Get current block number using HTTP providers
539+
let current_block = self.get_current_block_number().await.map_err(|e| {
540+
BatcherError::EthereumProviderError(format!(
541+
"Failed to get current block number: {:?}",
542+
e
543+
))
544+
})?;
545+
546+
// Query events with retry logic
547+
let events = self
548+
.query_balance_unlocked_events(from_block, current_block)
549+
.await
550+
.map_err(|e| {
551+
BatcherError::EthereumProviderError(format!(
552+
"Failed to query BalanceUnlocked events: {:?}",
553+
e
554+
))
555+
})?;
556+
557+
info!(
558+
"Found {} BalanceUnlocked events in blocks {} to {}",
559+
events.len(),
560+
from_block,
561+
current_block
562+
);
563+
564+
// Process each event
565+
for event in events {
566+
let user_address = event.user;
567+
debug!(
568+
"Processing BalanceUnlocked event for user: {:?}",
569+
user_address
570+
);
571+
572+
// Check if user has proofs in queue
573+
//
574+
// Double-check that funds are still unlocked by calling the contract
575+
// This is necessary because we query events over a block range, and the
576+
// user’s state may have changed (e.g., funds could be locked again) after
577+
// the event was emitted. Verifying on-chain ensures we don’t act on stale data.
578+
//
579+
// There is a brief period between the checks and the removal during which the user's
580+
// proofs could be sent. This is acceptable, as the removal will not fail;
581+
// it will simply clear the user's state.
582+
if self.user_has_proofs_in_queue(user_address).await
583+
&& self.user_balance_is_unlocked(&user_address).await
584+
{
585+
info!(
586+
"User {:?} has proofs in queue and funds are unlocked, proceeding to remove proofs and resetting UserState",
587+
user_address
588+
);
589+
self.remove_user_proofs_and_reset_state(user_address).await;
590+
}
591+
}
592+
593+
Ok(current_block)
594+
}
595+
596+
/// Gets the current block number from Ethereum.
597+
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
598+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
599+
async fn get_current_block_number(&self) -> Result<U64, RetryError<String>> {
600+
retry_function(
601+
|| {
602+
get_current_block_number_retryable(
603+
&self.eth_http_provider,
604+
&self.eth_http_provider_fallback,
605+
)
606+
},
607+
ETHEREUM_CALL_MIN_RETRY_DELAY,
608+
ETHEREUM_CALL_BACKOFF_FACTOR,
609+
ETHEREUM_CALL_MAX_RETRIES,
610+
ETHEREUM_CALL_MAX_RETRY_DELAY,
611+
)
612+
.await
613+
}
614+
615+
/// Queries BalanceUnlocked events from the BatcherPaymentService contract.
616+
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
617+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
618+
async fn query_balance_unlocked_events(
619+
&self,
620+
from_block: U64,
621+
to_block: U64,
622+
) -> Result<
623+
Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>,
624+
RetryError<String>,
625+
> {
626+
retry_function(
627+
|| {
628+
query_balance_unlocked_events_retryable(
629+
&self.payment_service,
630+
&self.payment_service_fallback,
631+
from_block,
632+
to_block,
633+
)
634+
},
635+
ETHEREUM_CALL_MIN_RETRY_DELAY,
636+
ETHEREUM_CALL_BACKOFF_FACTOR,
637+
ETHEREUM_CALL_MAX_RETRIES,
638+
ETHEREUM_CALL_MAX_RETRY_DELAY,
639+
)
640+
.await
641+
}
642+
643+
async fn user_has_proofs_in_queue(&self, user_address: Address) -> bool {
644+
let user_states = self.user_states.read().await;
645+
let Some(user_state) = user_states.get(&user_address) else {
646+
return false;
647+
};
648+
649+
let Some(user_state_guard) = self
650+
.try_user_lock_with_timeout(user_address, user_state.lock())
651+
.await
652+
else {
653+
return false;
654+
};
655+
656+
user_state_guard.proofs_in_batch > 0
657+
}
658+
659+
async fn remove_user_proofs_and_reset_state(&self, user_address: Address) {
660+
let mut user_states = self.user_states.write().await;
661+
662+
let mut batch_state_guard = match self
663+
.try_batch_lock_with_timeout(self.batch_state.lock(), POLLING_EVENTS_LOCK_TIMEOUT)
664+
.await
665+
{
666+
Some(guard) => guard,
667+
None => {
668+
error!(
669+
"Failed to acquire batch lock when trying to remove proofs from user {:?}, skipping removal",
670+
user_address
671+
);
672+
self.metrics.inc_unlocked_event_polling_batch_lock_timeout();
673+
return;
674+
}
675+
};
676+
677+
let removed_entries = batch_state_guard
678+
.batch_queue
679+
.extract_if(|entry, _| entry.sender == user_address);
680+
681+
// Notify user via websocket
682+
for (entry, _) in removed_entries {
683+
if let Some(ws_sink) = entry.messaging_sink {
684+
let ws_sink_clone = ws_sink.clone();
685+
tokio::spawn(async move {
686+
send_message(
687+
ws_sink_clone.clone(),
688+
SubmitProofResponseMessage::UserFundsUnlocked,
689+
)
690+
.await;
691+
});
692+
}
693+
info!(
694+
"Removed proof with nonce {} for user {:?} from batch queue",
695+
entry.nonced_verification_data.nonce, user_address
696+
);
697+
}
698+
699+
user_states.remove(&user_address);
700+
info!(
701+
"Removed UserState entry for user {:?} after processing BalanceUnlocked event",
702+
user_address
703+
);
704+
}
705+
494706
pub async fn listen_new_blocks_retryable(
495707
self: Arc<Self>,
496708
) -> Result<(), RetryError<BatcherError>> {
@@ -1052,7 +1264,7 @@ impl Batcher {
10521264
// * ---------------------------------------------------------------------*
10531265

10541266
let Some(mut batch_state_lock) = self
1055-
.try_batch_lock_with_timeout(self.batch_state.lock())
1267+
.try_batch_lock_with_timeout(self.batch_state.lock(), MESSAGE_HANDLER_LOCK_TIMEOUT)
10561268
.await
10571269
else {
10581270
send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await;
@@ -1222,7 +1434,7 @@ impl Batcher {
12221434
let replacement_max_fee = nonced_verification_data.max_fee;
12231435
let nonce = nonced_verification_data.nonce;
12241436
let Some(mut batch_state_guard) = self
1225-
.try_batch_lock_with_timeout(self.batch_state.lock())
1437+
.try_batch_lock_with_timeout(self.batch_state.lock(), MESSAGE_HANDLER_LOCK_TIMEOUT)
12261438
.await
12271439
else {
12281440
drop(user_state_guard);
@@ -1817,14 +2029,16 @@ impl Batcher {
18172029
warn!("Failed to send task status to telemetry: {:?}", e);
18182030
}
18192031

1820-
// decide if i want to flush the queue:
18212032
match e {
2033+
// This should never happen, there is a task that regularly cleans up
2034+
// user proofs with unlocked states
2035+
// (and it runs more frequently than the 1H the user needs to withdraw funds)
18222036
BatcherError::TransactionSendError(
18232037
TransactionSendError::SubmissionInsufficientBalance(address),
18242038
) => {
2039+
// In the future we could do a more granular recovery
18252040
warn!("User {:?} has insufficient balance, flushing entire queue as safety measure", address);
1826-
// TODO: In the future, we should re-add the failed batch back to the queue
1827-
// For now, we flush everything as a safety measure
2041+
18282042
self.flush_queue_and_clear_nonce_cache().await;
18292043
}
18302044
_ => {

crates/batcher/src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ async fn main() -> Result<(), BatcherError> {
5555
}
5656
});
5757

58+
// spawn task to poll for BalanceUnlocked events
59+
tokio::spawn({
60+
let app = batcher.clone();
61+
async move {
62+
app.poll_balance_unlocked_events()
63+
.await
64+
.expect("Error polling BalanceUnlocked events")
65+
}
66+
});
67+
5868
batcher.metrics.inc_batcher_restart();
5969

6070
batcher.listen_connections(&address).await?;

0 commit comments

Comments
 (0)