diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index 9aab3ed27b..161dae51a3 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1647,7 +1647,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.6.5" +version = "7.7.0" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index acf3594d72..dcba606baf 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.6.5" +version = "7.7.0" edition = "2021" [lib] @@ -41,13 +41,12 @@ url = "2.5.0" chrono = { version = "0.4.38", features = [ "clock", "std", - "serde" + "serde", ], default-features = false } backoff = { version = "0.4.0", features = ["futures", "tokio"] } thiserror = "1.0.61" futures-locks = "0.7.1" -sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono" ] } - +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] } [dev-dependencies] diff --git a/apps/fortuna/README.md b/apps/fortuna/README.md index 416aa5e486..961b1e4ec0 100644 --- a/apps/fortuna/README.md +++ b/apps/fortuna/README.md @@ -17,6 +17,11 @@ a database to be available at build time. Create a `.env` file in the root of th DATABASE_URL="sqlite:fortuna.db?mode=rwc" ``` +Install sqlx for cargo with: +```bash +cargo install sqlx +``` + Next, you need to create the database and apply the schema migrations. You can do this by running: ```bash @@ -40,6 +45,60 @@ Please add the changed files in the `.sqlx` folder to your git commit. The Fortuna binary has a command-line interface to perform useful operations on the contract, such as registering a new randomness provider, or drawing a random value. To see the available commands, simply run `cargo run`. +## Multiple Replica Setup + +Fortuna supports running multiple replica instances for high availability and reliability. This prevents service interruption if one instance goes down and distributes the workload across multiple instances. + +### How Replica Assignment Works + +- Each replica is assigned a unique `replica_id` (0, 1, 2, etc.) +- Requests are distributed using modulo assignment: `sequence_number % total_replicas` +- Each replica primarily handles requests assigned to its ID +- After a configurable delay, replicas will process requests from other replicas as backup (failover) + +### Example Configurations + +**Two Replica Setup (Blue/Green):** +```yaml +# Replica 0 (Blue) - handles even sequence numbers (0, 2, 4, ...) +keeper: + replica_config: + replica_id: 0 + total_replicas: 2 + backup_delay_seconds: 30 + +# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...) +keeper: + replica_config: + replica_id: 1 + total_replicas: 2 + backup_delay_seconds: 30 +``` + +**Three Replica Setup:** +```yaml +# Replica 0 - handles sequence numbers 0, 3, 6, 9, ... +keeper: + replica_config: + replica_id: 0 + total_replicas: 3 + backup_delay_seconds: 30 +``` + +### Deployment Considerations + +1. **Separate Wallets**: Each replica MUST use a different private key to avoid nonce conflicts +2. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 30-60 seconds) +3. **Monitoring**: Monitor each replica's processing metrics to ensure proper load distribution +4. **Gas Management**: Each replica needs sufficient ETH balance for gas fees + +### Failover Behavior + +- Primary replica processes requests immediately +- Backup replicas wait for `backup_delay_seconds` before checking if request is still unfulfilled +- If request is already fulfilled during the delay, backup replica skips processing +- This prevents duplicate transactions and wasted gas while ensuring reliability + ## Local Development To start an instance of the webserver for local testing, you first need to perform a few setup steps: diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index 6b42e97b43..d7a3bef205 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -86,3 +86,19 @@ keeper: value: 0xabcd # For production, you can store the private key in a file. # file: keeper-key.txt + + # Multi-replica configuration + # Optional: Multi-replica configuration for high availability and load distribution + # Uncomment and configure for production deployments with multiple Fortuna instances + # replica_config: + # replica_id: 0 # Unique identifier for this replica (0, 1, 2, ...) + # total_replicas: 2 # Total number of replica instances running + # backup_delay_seconds: 30 # Seconds to wait before processing other replicas' requests + # + # Example configurations: + # + # Two-replica setup (Blue/Green): + # - Replica 0: handles even sequence numbers (0, 2, 4, ...) + # - Replica 1: handles odd sequence numbers (1, 3, 5, ...) + # + # IMPORTANT: Each replica must use a different private_key to avoid nonce conflicts! diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 704543a986..e1933db21f 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -3,7 +3,7 @@ use { api::{self, ApiBlockChainState, BlockchainState, ChainId}, chain::ethereum::InstrumentedPythContract, command::register_provider::CommitmentMetadata, - config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions}, + config::{Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunOptions}, eth_utils::traced_client::RpcMetrics, history::History, keeper::{self, keeper_metrics::KeeperMetrics}, @@ -94,10 +94,14 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let keeper_metrics: Arc = Arc::new(KeeperMetrics::new(metrics_registry.clone()).await); + let keeper_private_key_option = config.keeper.private_key.load()?; if keeper_private_key_option.is_none() { tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.") } + + let keeper_replica_config = config.keeper.replica_config.clone(); + let chains: Arc>> = Arc::new(RwLock::new( config .chains @@ -110,6 +114,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { keeper_metrics.add_chain(chain_id.clone(), config.provider.address); let keeper_metrics = keeper_metrics.clone(); let keeper_private_key_option = keeper_private_key_option.clone(); + let keeper_replica_config = keeper_replica_config.clone(); let chains = chains.clone(); let secret_copy = secret.clone(); let rpc_metrics = rpc_metrics.clone(); @@ -123,6 +128,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { chain_config.clone(), keeper_metrics.clone(), keeper_private_key_option.clone(), + keeper_replica_config.clone(), chains.clone(), &secret_copy, history.clone(), @@ -173,6 +179,7 @@ async fn setup_chain_and_run_keeper( chain_config: EthereumConfig, keeper_metrics: Arc, keeper_private_key_option: Option, + keeper_replica_config: Option, chains: Arc>>, secret_copy: &str, history: Arc, @@ -195,6 +202,7 @@ async fn setup_chain_and_run_keeper( if let Some(keeper_private_key) = keeper_private_key_option { keeper::run_keeper_threads( keeper_private_key, + keeper_replica_config, chain_config, state, keeper_metrics.clone(), diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 1d96a69c61..84471ef8fc 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -94,6 +94,23 @@ impl Config { } } + if let Some(replica_config) = &config.keeper.replica_config { + if replica_config.total_replicas == 0 { + return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0.")); + } + if config.keeper.private_key.load()?.is_none() { + return Err(anyhow!( + "Keeper replica configuration requires a keeper private key to be specified." + )); + } + if replica_config.replica_id >= replica_config.total_replicas { + return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas.")); + } + if replica_config.backup_delay_seconds == 0 { + return Err(anyhow!("Keeper replica configuration is invalid. backup_delay_seconds must be greater than 0 to prevent race conditions.")); + } + } + Ok(config) } @@ -333,6 +350,18 @@ fn default_chain_sample_interval() -> u64 { 1 } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReplicaConfig { + pub replica_id: u64, + pub total_replicas: u64, + #[serde(default = "default_backup_delay_seconds")] + pub backup_delay_seconds: u64, +} + +fn default_backup_delay_seconds() -> u64 { + 30 +} + /// Configuration values for the keeper service that are shared across chains. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct KeeperConfig { @@ -342,6 +371,9 @@ pub struct KeeperConfig { /// This key *does not need to be a registered provider*. In particular, production deployments /// should ensure this is a different key in order to reduce the severity of security breaches. pub private_key: SecretString, + + #[serde(default)] + pub replica_config: Option, } // A secret is a string that can be provided either as a literal in the config, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b08c217aa9..e65bc2aaf1 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -2,7 +2,7 @@ use { crate::{ api::{BlockchainState, ChainId}, chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract}, - config::EthereumConfig, + config::{EthereumConfig, ReplicaConfig}, eth_utils::traced_client::RpcMetrics, history::History, keeper::{ @@ -56,7 +56,8 @@ pub enum RequestState { /// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] pub async fn run_keeper_threads( - private_key: String, + keeper_private_key: String, + keeper_replica_config: Option, chain_eth_config: EthereumConfig, chain_state: BlockchainState, metrics: Arc, @@ -69,7 +70,7 @@ pub async fn run_keeper_threads( let contract = Arc::new(InstrumentedSignablePythContract::from_config( &chain_eth_config, - &private_key, + &keeper_private_key, chain_state.id.clone(), rpc_metrics.clone(), chain_state.network_id, @@ -85,6 +86,7 @@ pub async fn run_keeper_threads( contract: contract.clone(), gas_limit, escalation_policy: chain_eth_config.escalation_policy.to_policy(), + replica_config: keeper_replica_config, metrics: metrics.clone(), fulfilled_requests_cache, history, diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 6df6e32cb6..10505166d2 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -2,6 +2,7 @@ use { crate::{ api::BlockchainState, chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, + config::ReplicaConfig, eth_utils::utils::EscalationPolicy, history::History, keeper::{ @@ -45,6 +46,7 @@ pub struct ProcessParams { pub gas_limit: U256, pub escalation_policy: EscalationPolicy, pub chain_state: BlockchainState, + pub replica_config: Option, pub metrics: Arc, pub history: Arc, pub fulfilled_requests_cache: Arc>>, diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index 707fcc3413..71ab85a29b 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -35,6 +35,55 @@ pub async fn process_event_with_backoff( return Ok(()); } + // If replica config is present, we're running with multiple instances. + // The incoming request is assigned by modulo operation on the sequence number + // and the total number of replicas. If our replica_id is the primary for this sequence number, + // we process the request directly. If our replica_id is a backup, we wait for the delay and + // then check if the request is still open. If it is, we process it as a backup replica. + if let Some(replica_config) = &process_param.replica_config { + let assigned_replica = event.sequence_number % replica_config.total_replicas; + let is_primary_replica = assigned_replica == replica_config.replica_id; + + if is_primary_replica { + tracing::debug!("Processing request as primary replica"); + } else { + tracing::debug!("Processing request as backup replica"); + + tracing::info!("Waiting before processing as backup replica"); + tokio::time::sleep(tokio::time::Duration::from_secs( + replica_config.backup_delay_seconds, + )) + .await; + + // Check if the request is still open after the delay. + // If it is, we will process it as a backup replica. + match chain_state + .contract + .get_request(event.provider_address, event.sequence_number) + .await + { + Ok(Some(_)) => { + tracing::info!( + delay_seconds = replica_config.backup_delay_seconds, + "Request still open after delay, processing as backup replica" + ); + } + Ok(None) => { + tracing::debug!( + "Request already fulfilled by primary replica during delay, skipping" + ); + return Ok(()); + } + Err(e) => { + tracing::warn!( + error = ?e, + "Error checking request status after delay, processing as backup replica" + ); + } + } + } + } + let account_label = AccountLabel { chain_id: chain_state.id.clone(), address: chain_state.provider_address.to_string(), diff --git a/contract_manager/scripts/load_test_entropy.ts b/contract_manager/scripts/load_test_entropy.ts index 860feaff8e..cdb0c9b2e0 100644 --- a/contract_manager/scripts/load_test_entropy.ts +++ b/contract_manager/scripts/load_test_entropy.ts @@ -9,7 +9,7 @@ const parser = yargs(hideBin(process.argv)) .usage( "Load tests the entropy contract using the EntropyTester contract with many requests in a single transaction\n" + "it does not monitor whether the callbacks are actually submitted or not.\n" + - "Usage: $0 --private-key --chain --tester-address ", + "Usage: $0 --private-key --chain --tester-address --provider-address ", ) .options({ chain: { @@ -22,6 +22,10 @@ const parser = yargs(hideBin(process.argv)) demandOption: true, desc: "Address of the EntropyTester contract", }, + provider: { + type: "string", + desc: "Address of the entropy provider to use for requests (defaults to default provider)", + }, "success-count": { type: "number", default: 100, @@ -66,7 +70,7 @@ async function main() { const privateKey = toPrivateKey(argv.privateKey); const chain = DefaultStore.getChainOrThrow(argv.chain, EvmChain); const contract = findEntropyContract(chain); - const provider = await contract.getDefaultProvider(); + const provider = argv.provider || (await contract.getDefaultProvider()); const fee = await contract.getFee(provider); const web3 = contract.chain.getWeb3(); const testerContract = new web3.eth.Contract(ABI, argv.testerAddress);