From a66fe6ec6bf773dbec57872c68cfe14b25065d29 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 24 Jun 2025 21:59:12 +0000 Subject: [PATCH 1/9] feat(fortuna): implement multiple replica support with sequence number modulo filtering Co-Authored-By: Tejas Badadare --- apps/fortuna/config.sample.yaml | 2 + apps/fortuna/src/config.rs | 18 +++++++++ apps/fortuna/src/keeper.rs | 7 ++++ apps/fortuna/src/keeper/block.rs | 1 + apps/fortuna/src/keeper/process_event.rs | 48 ++++++++++++++++++++++++ 5 files changed, 76 insertions(+) diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index 6b42e97b43..e56dfd3b10 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -47,6 +47,8 @@ chains: # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks. block_delays: [5, 10, 20] + + # Historical commitments -- delete this block for local development purposes commitments: # prettier-ignore diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 1d96a69c61..fae28e75d0 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -94,6 +94,15 @@ impl Config { } } + if let Some(replica_config) = &config.keeper.replica_config { + if replica_config.total_replicas == 0 { + return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0.")); + } + if replica_config.replica_id >= replica_config.total_replicas { + return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas.")); + } + } + Ok(config) } @@ -333,6 +342,12 @@ fn default_chain_sample_interval() -> u64 { 1 } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReplicaConfig { + pub replica_id: u64, + pub total_replicas: u64, +} + /// Configuration values for the keeper service that are shared across chains. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct KeeperConfig { @@ -342,6 +357,9 @@ pub struct KeeperConfig { /// This key *does not need to be a registered provider*. In particular, production deployments /// should ensure this is a different key in order to reduce the severity of security breaches. pub private_key: SecretString, + + #[serde(default)] + pub replica_config: Option, } // A secret is a string that can be provided either as a literal in the config, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b08c217aa9..28ff6ea6a7 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -85,6 +85,13 @@ pub async fn run_keeper_threads( contract: contract.clone(), gas_limit, escalation_policy: chain_eth_config.escalation_policy.to_policy(), + keeper_config: crate::config::KeeperConfig { + private_key: crate::config::SecretString { + value: None, + file: None, + }, + replica_config: None, + }, metrics: metrics.clone(), fulfilled_requests_cache, history, diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 6df6e32cb6..ab962cddf4 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -45,6 +45,7 @@ pub struct ProcessParams { pub gas_limit: U256, pub escalation_policy: EscalationPolicy, pub chain_state: BlockchainState, + pub keeper_config: crate::config::KeeperConfig, pub metrics: Arc, pub history: Arc, pub fulfilled_requests_cache: Arc>>, diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index 707fcc3413..c12e51e7f3 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -35,6 +35,54 @@ pub async fn process_event_with_backoff( return Ok(()); } + let is_primary_replica = + if let Some(replica_config) = &process_param.keeper_config.replica_config { + let assigned_replica = event.sequence_number % replica_config.total_replicas; + if assigned_replica != replica_config.replica_id { + tracing::debug!( + sequence_number = event.sequence_number, + assigned_replica = assigned_replica, + our_replica_id = replica_config.replica_id, + "Processing request as backup replica" + ); + false + } else { + true + } + } else { + true // No replica config, process all requests + }; + + if !is_primary_replica { + match chain_state + .contract + .get_request(event.provider_address, event.sequence_number) + .await + { + Ok(Some(_)) => { + tracing::info!( + sequence_number = event.sequence_number, + "Request still open, processing as backup replica" + ); + } + Ok(None) => { + tracing::debug!( + sequence_number = event.sequence_number, + "Request already fulfilled by primary replica, skipping" + ); + return Ok(()); + } + Err(e) => { + tracing::warn!( + sequence_number = event.sequence_number, + error = ?e, + "Error checking request status, skipping" + ); + return Ok(()); + } + } + } + let account_label = AccountLabel { chain_id: chain_state.id.clone(), address: chain_state.provider_address.to_string(), From 48018594629a9f34abbb82c54badbdf7b396252a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 24 Jun 2025 23:37:19 +0000 Subject: [PATCH 2/9] feat(fortuna): add delayed processing for backup replicas - Add configurable time delay before backup replicas check request status - Backup replicas now wait backup_delay_seconds before attempting fulfillment - Add backup_delay_seconds field to ReplicaConfig with default of 30 seconds - Improves reliability by reducing race conditions between replicas Co-Authored-By: Tejas Badadare --- apps/fortuna/src/config.rs | 6 ++++++ apps/fortuna/src/keeper/process_event.rs | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index fae28e75d0..ad576a6017 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -346,6 +346,12 @@ fn default_chain_sample_interval() -> u64 { pub struct ReplicaConfig { pub replica_id: u64, pub total_replicas: u64, + #[serde(default = "default_backup_delay_seconds")] + pub backup_delay_seconds: u64, +} + +fn default_backup_delay_seconds() -> u64 { + 30 } /// Configuration values for the keeper service that are shared across chains. diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index c12e51e7f3..ee77d70c48 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -54,6 +54,18 @@ pub async fn process_event_with_backoff( }; if !is_primary_replica { + if let Some(replica_config) = &process_param.keeper_config.replica_config { + tracing::info!( + sequence_number = event.sequence_number, + delay_seconds = replica_config.backup_delay_seconds, + "Waiting before processing as backup replica" + ); + tokio::time::sleep(tokio::time::Duration::from_secs( + replica_config.backup_delay_seconds, + )) + .await; + } + match chain_state .contract .get_request(event.provider_address, event.sequence_number) @@ -62,13 +74,13 @@ pub async fn process_event_with_backoff( Ok(Some(_)) => { tracing::info!( sequence_number = event.sequence_number, - "Request still open, processing as backup replica" + "Request still open after delay, processing as backup replica" ); } Ok(None) => { tracing::debug!( sequence_number = event.sequence_number, - "Request already fulfilled by primary replica, skipping" + "Request already fulfilled by primary replica during delay, skipping" ); return Ok(()); } @@ -76,7 +88,7 @@ pub async fn process_event_with_backoff( tracing::warn!( sequence_number = event.sequence_number, error = ?e, - "Error checking request status, skipping" + "Error checking request status after delay, skipping" ); return Ok(()); } From 0c76fa6bfc584e5e73215ce56717ede37cc4e01c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 25 Jun 2025 18:05:07 +0000 Subject: [PATCH 3/9] docs(fortuna): add comprehensive multi-replica setup documentation - Add Multiple Replica Setup section to README.md with modulo assignment explanation - Add replica_config examples to config.sample.yaml for 2, 3, and 5 replica setups - Include deployment considerations, failover behavior, and wallet separation requirements - Add validation for backup_delay_seconds > 0 to prevent race conditions Co-Authored-By: Tejas Badadare --- apps/fortuna/README.md | 54 +++++++++++++++++++++++++++++++++ apps/fortuna/config.sample.yaml | 28 +++++++++++++++++ apps/fortuna/src/config.rs | 3 ++ 3 files changed, 85 insertions(+) diff --git a/apps/fortuna/README.md b/apps/fortuna/README.md index 416aa5e486..19fe4a4aed 100644 --- a/apps/fortuna/README.md +++ b/apps/fortuna/README.md @@ -40,6 +40,60 @@ Please add the changed files in the `.sqlx` folder to your git commit. The Fortuna binary has a command-line interface to perform useful operations on the contract, such as registering a new randomness provider, or drawing a random value. To see the available commands, simply run `cargo run`. +## Multiple Replica Setup + +Fortuna supports running multiple replica instances for high availability and reliability. This prevents service interruption if one instance goes down and distributes the workload across multiple instances. + +### How Replica Assignment Works + +- Each replica is assigned a unique `replica_id` (0, 1, 2, etc.) +- Requests are distributed using modulo assignment: `sequence_number % total_replicas` +- Each replica primarily handles requests assigned to its ID +- After a configurable delay, replicas will process requests from other replicas as backup (failover) + +### Example Configurations + +**Two Replica Setup (Blue/Green):** +```yaml +# Replica 0 (Blue) - handles even sequence numbers (0, 2, 4, ...) +keeper: + replica_config: + replica_id: 0 + total_replicas: 2 + backup_delay_seconds: 30 + +# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...) +keeper: + replica_config: + replica_id: 1 + total_replicas: 2 + backup_delay_seconds: 30 +``` + +**Three Replica Setup:** +```yaml +# Replica 0 - handles sequence numbers 0, 3, 6, 9, ... +keeper: + replica_config: + replica_id: 0 + total_replicas: 3 + backup_delay_seconds: 45 +``` + +### Deployment Considerations + +1. **Separate Wallets**: Each replica MUST use a different private key to avoid nonce conflicts +2. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 30-60 seconds) +3. **Monitoring**: Monitor each replica's processing metrics to ensure proper load distribution +4. **Gas Management**: Each replica needs sufficient ETH balance for gas fees + +### Failover Behavior + +- Primary replica processes requests immediately +- Backup replicas wait for `backup_delay_seconds` before checking if request is still unfulfilled +- If request is already fulfilled during the delay, backup replica skips processing +- This prevents duplicate transactions and wasted gas while ensuring reliability + ## Local Development To start an instance of the webserver for local testing, you first need to perform a few setup steps: diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index e56dfd3b10..b14757c363 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -88,3 +88,31 @@ keeper: value: 0xabcd # For production, you can store the private key in a file. # file: keeper-key.txt +# Multi-replica configuration documentation + + # Optional: Multi-replica configuration for high availability and load distribution + # Uncomment and configure for production deployments with multiple Fortuna instances + # replica_config: + # replica_id: 0 # Unique identifier for this replica (0, 1, 2, ...) + # total_replicas: 2 # Total number of replica instances running + # backup_delay_seconds: 30 # Seconds to wait before processing other replicas' requests + # + # Example configurations: + # + # Two-replica setup (Blue/Green): + # - Replica 0: handles even sequence numbers (0, 2, 4, ...) + # - Replica 1: handles odd sequence numbers (1, 3, 5, ...) + # + # Three-replica setup: + # - Replica 0: handles sequence numbers 0, 3, 6, 9, ... + # - Replica 1: handles sequence numbers 1, 4, 7, 10, ... + # - Replica 2: handles sequence numbers 2, 5, 8, 11, ... + # + # Five-replica setup: + # - Replica 0: handles sequence numbers 0, 5, 10, 15, ... + # - Replica 1: handles sequence numbers 1, 6, 11, 16, ... + # - Replica 2: handles sequence numbers 2, 7, 12, 17, ... + # - Replica 3: handles sequence numbers 3, 8, 13, 18, ... + # - Replica 4: handles sequence numbers 4, 9, 14, 19, ... + # + # IMPORTANT: Each replica MUST use a different private_key to avoid nonce conflicts! diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index fae28e75d0..69389e2c75 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -101,6 +101,9 @@ impl Config { if replica_config.replica_id >= replica_config.total_replicas { return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas.")); } + if replica_config.backup_delay_seconds == 0 { + return Err(anyhow!("Keeper replica configuration is invalid. backup_delay_seconds must be greater than 0 to prevent race conditions.")); + } } Ok(config) From 631e71dbbeb1394f549c386e2328ad09dffad626 Mon Sep 17 00:00:00 2001 From: Tejas Badadare <17058023+tejasbadadare@users.noreply.github.com> Date: Wed, 25 Jun 2025 13:25:52 -0700 Subject: [PATCH 4/9] update sample config --- apps/fortuna/config.sample.yaml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index b14757c363..07baa4be74 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -103,16 +103,4 @@ keeper: # - Replica 0: handles even sequence numbers (0, 2, 4, ...) # - Replica 1: handles odd sequence numbers (1, 3, 5, ...) # - # Three-replica setup: - # - Replica 0: handles sequence numbers 0, 3, 6, 9, ... - # - Replica 1: handles sequence numbers 1, 4, 7, 10, ... - # - Replica 2: handles sequence numbers 2, 5, 8, 11, ... - # - # Five-replica setup: - # - Replica 0: handles sequence numbers 0, 5, 10, 15, ... - # - Replica 1: handles sequence numbers 1, 6, 11, 16, ... - # - Replica 2: handles sequence numbers 2, 7, 12, 17, ... - # - Replica 3: handles sequence numbers 3, 8, 13, 18, ... - # - Replica 4: handles sequence numbers 4, 9, 14, 19, ... - # - # IMPORTANT: Each replica MUST use a different private_key to avoid nonce conflicts! + # IMPORTANT: Each replica must use a different private_key to avoid nonce conflicts! From ac5fb7ab9f0510d608872f1ceaca53710258f6c8 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 25 Jun 2025 14:37:39 -0700 Subject: [PATCH 5/9] refactor(fortuna): streamline keeper configuration and improve event processing - Updated config.sample.yaml by removing unnecessary blank lines. - Changed backup_delay_seconds in README.md from 45 to 30 for consistency. - Refactored run_keeper_threads to accept KeeperConfig directly instead of private_key. - Enhanced run function to handle keeper configuration more effectively. - Added comments in process_event_with_backoff to clarify primary and backup replica logic. --- apps/fortuna/README.md | 9 +++++++-- apps/fortuna/config.sample.yaml | 6 ++---- apps/fortuna/src/command/run.rs | 17 +++++++++++------ apps/fortuna/src/keeper.rs | 15 +++++++-------- apps/fortuna/src/keeper/block.rs | 3 ++- apps/fortuna/src/keeper/process_event.rs | 7 +++++-- 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/apps/fortuna/README.md b/apps/fortuna/README.md index 19fe4a4aed..961b1e4ec0 100644 --- a/apps/fortuna/README.md +++ b/apps/fortuna/README.md @@ -17,6 +17,11 @@ a database to be available at build time. Create a `.env` file in the root of th DATABASE_URL="sqlite:fortuna.db?mode=rwc" ``` +Install sqlx for cargo with: +```bash +cargo install sqlx +``` + Next, you need to create the database and apply the schema migrations. You can do this by running: ```bash @@ -62,7 +67,7 @@ keeper: total_replicas: 2 backup_delay_seconds: 30 -# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...) +# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...) keeper: replica_config: replica_id: 1 @@ -77,7 +82,7 @@ keeper: replica_config: replica_id: 0 total_replicas: 3 - backup_delay_seconds: 45 + backup_delay_seconds: 30 ``` ### Deployment Considerations diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index 07baa4be74..d7a3bef205 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -47,8 +47,6 @@ chains: # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks. block_delays: [5, 10, 20] - - # Historical commitments -- delete this block for local development purposes commitments: # prettier-ignore @@ -88,8 +86,8 @@ keeper: value: 0xabcd # For production, you can store the private key in a file. # file: keeper-key.txt -# Multi-replica configuration documentation + # Multi-replica configuration # Optional: Multi-replica configuration for high availability and load distribution # Uncomment and configure for production deployments with multiple Fortuna instances # replica_config: @@ -98,7 +96,7 @@ keeper: # backup_delay_seconds: 30 # Seconds to wait before processing other replicas' requests # # Example configurations: - # + # # Two-replica setup (Blue/Green): # - Replica 0: handles even sequence numbers (0, 2, 4, ...) # - Replica 1: handles odd sequence numbers (1, 3, 5, ...) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 704543a986..70dad67775 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -3,7 +3,7 @@ use { api::{self, ApiBlockChainState, BlockchainState, ChainId}, chain::ethereum::InstrumentedPythContract, command::register_provider::CommitmentMetadata, - config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions}, + config::{Commitment, Config, EthereumConfig, KeeperConfig, ProviderConfig, RunOptions}, eth_utils::traced_client::RpcMetrics, history::History, keeper::{self, keeper_metrics::KeeperMetrics}, @@ -98,6 +98,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> { if keeper_private_key_option.is_none() { tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.") } + let keeper_config_option = if keeper_private_key_option.is_some() { + Some(config.keeper.clone()) + } else { + None + }; let chains: Arc>> = Arc::new(RwLock::new( config .chains @@ -109,7 +114,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { for (chain_id, chain_config) in config.chains.clone() { keeper_metrics.add_chain(chain_id.clone(), config.provider.address); let keeper_metrics = keeper_metrics.clone(); - let keeper_private_key_option = keeper_private_key_option.clone(); + let keeper_config_option = keeper_config_option.clone(); let chains = chains.clone(); let secret_copy = secret.clone(); let rpc_metrics = rpc_metrics.clone(); @@ -122,7 +127,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { &chain_id, chain_config.clone(), keeper_metrics.clone(), - keeper_private_key_option.clone(), + keeper_config_option.clone(), chains.clone(), &secret_copy, history.clone(), @@ -172,7 +177,7 @@ async fn setup_chain_and_run_keeper( chain_id: &ChainId, chain_config: EthereumConfig, keeper_metrics: Arc, - keeper_private_key_option: Option, + keeper_config: Option, chains: Arc>>, secret_copy: &str, history: Arc, @@ -192,9 +197,9 @@ async fn setup_chain_and_run_keeper( chain_id.clone(), ApiBlockChainState::Initialized(state.clone()), ); - if let Some(keeper_private_key) = keeper_private_key_option { + if let Some(keeper_config) = keeper_config { keeper::run_keeper_threads( - keeper_private_key, + keeper_config, chain_config, state, keeper_metrics.clone(), diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 28ff6ea6a7..b225f040a4 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -56,7 +56,7 @@ pub enum RequestState { /// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] pub async fn run_keeper_threads( - private_key: String, + keeper_config: crate::config::KeeperConfig, chain_eth_config: EthereumConfig, chain_state: BlockchainState, metrics: Arc, @@ -67,6 +67,11 @@ pub async fn run_keeper_threads( let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; tracing::info!("Latest safe block: {}", &latest_safe_block); + let private_key = keeper_config + .private_key + .load()? + .ok_or(anyhow::anyhow!("Keeper private key must be provided"))?; + let contract = Arc::new(InstrumentedSignablePythContract::from_config( &chain_eth_config, &private_key, @@ -85,13 +90,7 @@ pub async fn run_keeper_threads( contract: contract.clone(), gas_limit, escalation_policy: chain_eth_config.escalation_policy.to_policy(), - keeper_config: crate::config::KeeperConfig { - private_key: crate::config::SecretString { - value: None, - file: None, - }, - replica_config: None, - }, + keeper_config, metrics: metrics.clone(), fulfilled_requests_cache, history, diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index ab962cddf4..210a8cc431 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -2,6 +2,7 @@ use { crate::{ api::BlockchainState, chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, + config::KeeperConfig, eth_utils::utils::EscalationPolicy, history::History, keeper::{ @@ -45,7 +46,7 @@ pub struct ProcessParams { pub gas_limit: U256, pub escalation_policy: EscalationPolicy, pub chain_state: BlockchainState, - pub keeper_config: crate::config::KeeperConfig, + pub keeper_config: KeeperConfig, pub metrics: Arc, pub history: Arc, pub fulfilled_requests_cache: Arc>>, diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index ee77d70c48..25d5d49b71 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -35,6 +35,8 @@ pub async fn process_event_with_backoff( return Ok(()); } + // Check if we are the primary replica for this request. + // If not, we will wait for a delay before processing the request. let is_primary_replica = if let Some(replica_config) = &process_param.keeper_config.replica_config { let assigned_replica = event.sequence_number % replica_config.total_replicas; @@ -66,6 +68,8 @@ pub async fn process_event_with_backoff( .await; } + // Check if the request is still open after the delay. + // If it is, we will process it as a backup replica. match chain_state .contract .get_request(event.provider_address, event.sequence_number) @@ -88,9 +92,8 @@ pub async fn process_event_with_backoff( tracing::warn!( sequence_number = event.sequence_number, error = ?e, - "Error checking request status after delay, skipping" + "Error checking request status after delay, processing as backup replica" ); - return Ok(()); } } } From 02f280d92186023449cdc273e8b788c3ea2d5a64 Mon Sep 17 00:00:00 2001 From: Tejas Badadare <17058023+tejasbadadare@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:41:01 -0700 Subject: [PATCH 6/9] feat(fortuna): add provider arg to entropy load testing script --- contract_manager/scripts/load_test_entropy.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/contract_manager/scripts/load_test_entropy.ts b/contract_manager/scripts/load_test_entropy.ts index 860feaff8e..cdb0c9b2e0 100644 --- a/contract_manager/scripts/load_test_entropy.ts +++ b/contract_manager/scripts/load_test_entropy.ts @@ -9,7 +9,7 @@ const parser = yargs(hideBin(process.argv)) .usage( "Load tests the entropy contract using the EntropyTester contract with many requests in a single transaction\n" + "it does not monitor whether the callbacks are actually submitted or not.\n" + - "Usage: $0 --private-key --chain --tester-address ", + "Usage: $0 --private-key --chain --tester-address --provider-address ", ) .options({ chain: { @@ -22,6 +22,10 @@ const parser = yargs(hideBin(process.argv)) demandOption: true, desc: "Address of the EntropyTester contract", }, + provider: { + type: "string", + desc: "Address of the entropy provider to use for requests (defaults to default provider)", + }, "success-count": { type: "number", default: 100, @@ -66,7 +70,7 @@ async function main() { const privateKey = toPrivateKey(argv.privateKey); const chain = DefaultStore.getChainOrThrow(argv.chain, EvmChain); const contract = findEntropyContract(chain); - const provider = await contract.getDefaultProvider(); + const provider = argv.provider || (await contract.getDefaultProvider()); const fee = await contract.getFee(provider); const web3 = contract.chain.getWeb3(); const testerContract = new web3.eth.Contract(ABI, argv.testerAddress); From 07d67bc12b39d95157df91b315a5d4e04367b786 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 25 Jun 2025 20:19:19 -0700 Subject: [PATCH 7/9] fix(fortuna): improve config and replica logic --- apps/fortuna/src/command/run.rs | 25 ++++--- apps/fortuna/src/keeper.rs | 14 ++-- apps/fortuna/src/keeper/block.rs | 4 +- apps/fortuna/src/keeper/process_event.rs | 91 ++++++++++-------------- 4 files changed, 60 insertions(+), 74 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 70dad67775..1a4072b2d8 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -3,7 +3,7 @@ use { api::{self, ApiBlockChainState, BlockchainState, ChainId}, chain::ethereum::InstrumentedPythContract, command::register_provider::CommitmentMetadata, - config::{Commitment, Config, EthereumConfig, KeeperConfig, ProviderConfig, RunOptions}, + config::{Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunOptions}, eth_utils::traced_client::RpcMetrics, history::History, keeper::{self, keeper_metrics::KeeperMetrics}, @@ -94,15 +94,14 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let keeper_metrics: Arc = Arc::new(KeeperMetrics::new(metrics_registry.clone()).await); + let keeper_private_key_option = config.keeper.private_key.load()?; if keeper_private_key_option.is_none() { tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.") } - let keeper_config_option = if keeper_private_key_option.is_some() { - Some(config.keeper.clone()) - } else { - None - }; + + let keeper_replica_config = config.keeper.replica_config.clone(); + let chains: Arc>> = Arc::new(RwLock::new( config .chains @@ -114,7 +113,8 @@ pub async fn run(opts: &RunOptions) -> Result<()> { for (chain_id, chain_config) in config.chains.clone() { keeper_metrics.add_chain(chain_id.clone(), config.provider.address); let keeper_metrics = keeper_metrics.clone(); - let keeper_config_option = keeper_config_option.clone(); + let keeper_private_key = keeper_private_key_option.clone(); + let keeper_replica_config = keeper_replica_config.clone(); let chains = chains.clone(); let secret_copy = secret.clone(); let rpc_metrics = rpc_metrics.clone(); @@ -127,7 +127,8 @@ pub async fn run(opts: &RunOptions) -> Result<()> { &chain_id, chain_config.clone(), keeper_metrics.clone(), - keeper_config_option.clone(), + keeper_private_key.clone(), + keeper_replica_config.clone(), chains.clone(), &secret_copy, history.clone(), @@ -177,7 +178,8 @@ async fn setup_chain_and_run_keeper( chain_id: &ChainId, chain_config: EthereumConfig, keeper_metrics: Arc, - keeper_config: Option, + keeper_private_key: Option, + keeper_replica_config: Option, chains: Arc>>, secret_copy: &str, history: Arc, @@ -197,9 +199,10 @@ async fn setup_chain_and_run_keeper( chain_id.clone(), ApiBlockChainState::Initialized(state.clone()), ); - if let Some(keeper_config) = keeper_config { + if let Some(private_key) = keeper_private_key { keeper::run_keeper_threads( - keeper_config, + private_key, + keeper_replica_config, chain_config, state, keeper_metrics.clone(), diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b225f040a4..e65bc2aaf1 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -2,7 +2,7 @@ use { crate::{ api::{BlockchainState, ChainId}, chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract}, - config::EthereumConfig, + config::{EthereumConfig, ReplicaConfig}, eth_utils::traced_client::RpcMetrics, history::History, keeper::{ @@ -56,7 +56,8 @@ pub enum RequestState { /// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] pub async fn run_keeper_threads( - keeper_config: crate::config::KeeperConfig, + keeper_private_key: String, + keeper_replica_config: Option, chain_eth_config: EthereumConfig, chain_state: BlockchainState, metrics: Arc, @@ -67,14 +68,9 @@ pub async fn run_keeper_threads( let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; tracing::info!("Latest safe block: {}", &latest_safe_block); - let private_key = keeper_config - .private_key - .load()? - .ok_or(anyhow::anyhow!("Keeper private key must be provided"))?; - let contract = Arc::new(InstrumentedSignablePythContract::from_config( &chain_eth_config, - &private_key, + &keeper_private_key, chain_state.id.clone(), rpc_metrics.clone(), chain_state.network_id, @@ -90,7 +86,7 @@ pub async fn run_keeper_threads( contract: contract.clone(), gas_limit, escalation_policy: chain_eth_config.escalation_policy.to_policy(), - keeper_config, + replica_config: keeper_replica_config, metrics: metrics.clone(), fulfilled_requests_cache, history, diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 210a8cc431..10505166d2 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -2,7 +2,7 @@ use { crate::{ api::BlockchainState, chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, - config::KeeperConfig, + config::ReplicaConfig, eth_utils::utils::EscalationPolicy, history::History, keeper::{ @@ -46,7 +46,7 @@ pub struct ProcessParams { pub gas_limit: U256, pub escalation_policy: EscalationPolicy, pub chain_state: BlockchainState, - pub keeper_config: KeeperConfig, + pub replica_config: Option, pub metrics: Arc, pub history: Arc, pub fulfilled_requests_cache: Arc>>, diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index 25d5d49b71..b8bf5c18a2 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -35,68 +35,55 @@ pub async fn process_event_with_backoff( return Ok(()); } - // Check if we are the primary replica for this request. - // If not, we will wait for a delay before processing the request. - let is_primary_replica = - if let Some(replica_config) = &process_param.keeper_config.replica_config { - let assigned_replica = event.sequence_number % replica_config.total_replicas; - if assigned_replica != replica_config.replica_id { - tracing::debug!( - sequence_number = event.sequence_number, - assigned_replica = assigned_replica, - our_replica_id = replica_config.replica_id, - "Processing request as backup replica" - ); - false - } else { - true - } + // If replica config is present, we're running with multiple instances + // The incoming request is assigned by modulo operation on the sequence number + // and the total number of replicas. If our replica_id is the primary for this sequence number, + // we process the request directly. If our replica_id is a backup, we wait for the delay and + // then check if the request is still open. If it is, we process it as a failover. + if let Some(replica_config) = &process_param.replica_config { + let assigned_replica = event.sequence_number % replica_config.total_replicas; + let is_primary_replica = assigned_replica == replica_config.replica_id; + + if is_primary_replica { + tracing::debug!("Processing request as primary replica"); } else { - true // No replica config, process all requests - }; + tracing::debug!("Processing request as backup replica"); - if !is_primary_replica { - if let Some(replica_config) = &process_param.keeper_config.replica_config { - tracing::info!( - sequence_number = event.sequence_number, - delay_seconds = replica_config.backup_delay_seconds, - "Waiting before processing as backup replica" - ); + tracing::info!("Waiting before processing as backup replica"); tokio::time::sleep(tokio::time::Duration::from_secs( replica_config.backup_delay_seconds, )) .await; - } - // Check if the request is still open after the delay. - // If it is, we will process it as a backup replica. - match chain_state - .contract - .get_request(event.provider_address, event.sequence_number) - .await - { - Ok(Some(_)) => { - tracing::info!( - sequence_number = event.sequence_number, - "Request still open after delay, processing as backup replica" - ); - } - Ok(None) => { - tracing::debug!( - sequence_number = event.sequence_number, - "Request already fulfilled by primary replica during delay, skipping" - ); - return Ok(()); - } - Err(e) => { - tracing::warn!( - sequence_number = event.sequence_number, - error = ?e, - "Error checking request status after delay, processing as backup replica" - ); + // Check if the request is still open after the delay. + // If it is, we will process it as a backup replica. + match chain_state + .contract + .get_request(event.provider_address, event.sequence_number) + .await + { + Ok(Some(_)) => { + tracing::info!( + delay_seconds = replica_config.backup_delay_seconds, + "Request still open after delay, processing as backup replica" + ); + } + Ok(None) => { + tracing::debug!( + "Request already fulfilled by primary replica during delay, skipping" + ); + return Ok(()); + } + Err(e) => { + tracing::warn!( + error = ?e, + "Error checking request status after delay, processing as backup replica" + ); + } } } } + // If no replica config we are running standalone, process all requests directly let account_label = AccountLabel { chain_id: chain_state.id.clone(), From 3ebfbb9f9566b57b4fcf05d76a806d9af92d44f4 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Wed, 25 Jun 2025 20:26:23 -0700 Subject: [PATCH 8/9] fix(fortuna): names, config check --- apps/fortuna/src/command/run.rs | 10 +++++----- apps/fortuna/src/config.rs | 5 +++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 1a4072b2d8..e1933db21f 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -113,7 +113,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { for (chain_id, chain_config) in config.chains.clone() { keeper_metrics.add_chain(chain_id.clone(), config.provider.address); let keeper_metrics = keeper_metrics.clone(); - let keeper_private_key = keeper_private_key_option.clone(); + let keeper_private_key_option = keeper_private_key_option.clone(); let keeper_replica_config = keeper_replica_config.clone(); let chains = chains.clone(); let secret_copy = secret.clone(); @@ -127,7 +127,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { &chain_id, chain_config.clone(), keeper_metrics.clone(), - keeper_private_key.clone(), + keeper_private_key_option.clone(), keeper_replica_config.clone(), chains.clone(), &secret_copy, @@ -178,7 +178,7 @@ async fn setup_chain_and_run_keeper( chain_id: &ChainId, chain_config: EthereumConfig, keeper_metrics: Arc, - keeper_private_key: Option, + keeper_private_key_option: Option, keeper_replica_config: Option, chains: Arc>>, secret_copy: &str, @@ -199,9 +199,9 @@ async fn setup_chain_and_run_keeper( chain_id.clone(), ApiBlockChainState::Initialized(state.clone()), ); - if let Some(private_key) = keeper_private_key { + if let Some(keeper_private_key) = keeper_private_key_option { keeper::run_keeper_threads( - private_key, + keeper_private_key, keeper_replica_config, chain_config, state, diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 2a96bb34e6..84471ef8fc 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -98,6 +98,11 @@ impl Config { if replica_config.total_replicas == 0 { return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0.")); } + if config.keeper.private_key.load()?.is_none() { + return Err(anyhow!( + "Keeper replica configuration requires a keeper private key to be specified." + )); + } if replica_config.replica_id >= replica_config.total_replicas { return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas.")); } From 4755ce6f3b18ca51afccf8347f608fe980557911 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Thu, 26 Jun 2025 10:11:50 -0700 Subject: [PATCH 9/9] bump version --- apps/fortuna/Cargo.lock | 2 +- apps/fortuna/Cargo.toml | 7 +++---- apps/fortuna/src/keeper/process_event.rs | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index ea81a7e2b9..f25cdec857 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1647,7 +1647,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.6.4" +version = "7.7.0" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index 340cb4f908..db027d512d 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.6.4" +version = "7.7.0" edition = "2021" [lib] @@ -41,13 +41,12 @@ url = "2.5.0" chrono = { version = "0.4.38", features = [ "clock", "std", - "serde" + "serde", ], default-features = false } backoff = { version = "0.4.0", features = ["futures", "tokio"] } thiserror = "1.0.61" futures-locks = "0.7.1" -sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono" ] } - +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] } [dev-dependencies] diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index b8bf5c18a2..71ab85a29b 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -35,11 +35,11 @@ pub async fn process_event_with_backoff( return Ok(()); } - // If replica config is present, we're running with multiple instances + // If replica config is present, we're running with multiple instances. // The incoming request is assigned by modulo operation on the sequence number // and the total number of replicas. If our replica_id is the primary for this sequence number, // we process the request directly. If our replica_id is a backup, we wait for the delay and - // then check if the request is still open. If it is, we process it as a failover. + // then check if the request is still open. If it is, we process it as a backup replica. if let Some(replica_config) = &process_param.replica_config { let assigned_replica = event.sequence_number % replica_config.total_replicas; let is_primary_replica = assigned_replica == replica_config.replica_id; @@ -83,7 +83,6 @@ pub async fn process_event_with_backoff( } } } - // If no replica config we are running standalone, process all requests directly let account_label = AccountLabel { chain_id: chain_state.id.clone(),