Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "7.6.5"
version = "7.7.0"
edition = "2021"

[lib]
Expand Down Expand Up @@ -41,13 +41,12 @@ url = "2.5.0"
chrono = { version = "0.4.38", features = [
"clock",
"std",
"serde"
"serde",
], default-features = false }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
thiserror = "1.0.61"
futures-locks = "0.7.1"
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono" ] }

sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] }


[dev-dependencies]
Expand Down
59 changes: 59 additions & 0 deletions apps/fortuna/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ a database to be available at build time. Create a `.env` file in the root of th
DATABASE_URL="sqlite:fortuna.db?mode=rwc"
```

Install sqlx for cargo with:
```bash
cargo install sqlx
```

Next, you need to create the database and apply the schema migrations. You can do this by running:

```bash
Expand All @@ -40,6 +45,60 @@ Please add the changed files in the `.sqlx` folder to your git commit.
The Fortuna binary has a command-line interface to perform useful operations on the contract, such as
registering a new randomness provider, or drawing a random value. To see the available commands, simply run `cargo run`.

## Multiple Replica Setup

Fortuna supports running multiple replica instances for high availability and reliability. This prevents service interruption if one instance goes down and distributes the workload across multiple instances.

### How Replica Assignment Works

- Each replica is assigned a unique `replica_id` (0, 1, 2, etc.)
- Requests are distributed using modulo assignment: `sequence_number % total_replicas`
- Each replica primarily handles requests assigned to its ID
- After a configurable delay, replicas will process requests from other replicas as backup (failover)

### Example Configurations

**Two Replica Setup (Blue/Green):**
```yaml
# Replica 0 (Blue) - handles even sequence numbers (0, 2, 4, ...)
keeper:
replica_config:
replica_id: 0
total_replicas: 2
backup_delay_seconds: 30

# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...)
keeper:
replica_config:
replica_id: 1
total_replicas: 2
backup_delay_seconds: 30
```

**Three Replica Setup:**
```yaml
# Replica 0 - handles sequence numbers 0, 3, 6, 9, ...
keeper:
replica_config:
replica_id: 0
total_replicas: 3
backup_delay_seconds: 30
```

### Deployment Considerations

1. **Separate Wallets**: Each replica MUST use a different private key to avoid nonce conflicts
2. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 30-60 seconds)
3. **Monitoring**: Monitor each replica's processing metrics to ensure proper load distribution
4. **Gas Management**: Each replica needs sufficient ETH balance for gas fees

### Failover Behavior

- Primary replica processes requests immediately
- Backup replicas wait for `backup_delay_seconds` before checking if request is still unfulfilled
- If request is already fulfilled during the delay, backup replica skips processing
- This prevents duplicate transactions and wasted gas while ensuring reliability

## Local Development

To start an instance of the webserver for local testing, you first need to perform a few setup steps:
Expand Down
16 changes: 16 additions & 0 deletions apps/fortuna/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ keeper:
value: 0xabcd
# For production, you can store the private key in a file.
# file: keeper-key.txt

# Multi-replica configuration
# Optional: Multi-replica configuration for high availability and load distribution
# Uncomment and configure for production deployments with multiple Fortuna instances
# replica_config:
# replica_id: 0 # Unique identifier for this replica (0, 1, 2, ...)
# total_replicas: 2 # Total number of replica instances running
# backup_delay_seconds: 30 # Seconds to wait before processing other replicas' requests
#
# Example configurations:
#
# Two-replica setup (Blue/Green):
# - Replica 0: handles even sequence numbers (0, 2, 4, ...)
# - Replica 1: handles odd sequence numbers (1, 3, 5, ...)
#
# IMPORTANT: Each replica must use a different private_key to avoid nonce conflicts!
10 changes: 9 additions & 1 deletion apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
api::{self, ApiBlockChainState, BlockchainState, ChainId},
chain::ethereum::InstrumentedPythContract,
command::register_provider::CommitmentMetadata,
config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions},
config::{Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunOptions},
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{self, keeper_metrics::KeeperMetrics},
Expand Down Expand Up @@ -94,10 +94,14 @@ pub async fn run(opts: &RunOptions) -> Result<()> {

let keeper_metrics: Arc<KeeperMetrics> =
Arc::new(KeeperMetrics::new(metrics_registry.clone()).await);

let keeper_private_key_option = config.keeper.private_key.load()?;
if keeper_private_key_option.is_none() {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}

let keeper_replica_config = config.keeper.replica_config.clone();

let chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>> = Arc::new(RwLock::new(
config
.chains
Expand All @@ -110,6 +114,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let keeper_replica_config = keeper_replica_config.clone();
let chains = chains.clone();
let secret_copy = secret.clone();
let rpc_metrics = rpc_metrics.clone();
Expand All @@ -123,6 +128,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
chain_config.clone(),
keeper_metrics.clone(),
keeper_private_key_option.clone(),
keeper_replica_config.clone(),
chains.clone(),
&secret_copy,
history.clone(),
Expand Down Expand Up @@ -173,6 +179,7 @@ async fn setup_chain_and_run_keeper(
chain_config: EthereumConfig,
keeper_metrics: Arc<KeeperMetrics>,
keeper_private_key_option: Option<String>,
keeper_replica_config: Option<ReplicaConfig>,
chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
secret_copy: &str,
history: Arc<History>,
Expand All @@ -195,6 +202,7 @@ async fn setup_chain_and_run_keeper(
if let Some(keeper_private_key) = keeper_private_key_option {
keeper::run_keeper_threads(
keeper_private_key,
keeper_replica_config,
chain_config,
state,
keeper_metrics.clone(),
Expand Down
32 changes: 32 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ impl Config {
}
}

if let Some(replica_config) = &config.keeper.replica_config {
if replica_config.total_replicas == 0 {
return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0."));
}
if config.keeper.private_key.load()?.is_none() {
return Err(anyhow!(
"Keeper replica configuration requires a keeper private key to be specified."
));
}
if replica_config.replica_id >= replica_config.total_replicas {
return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas."));
}
if replica_config.backup_delay_seconds == 0 {
return Err(anyhow!("Keeper replica configuration is invalid. backup_delay_seconds must be greater than 0 to prevent race conditions."));
}
}

Ok(config)
}

Expand Down Expand Up @@ -333,6 +350,18 @@ fn default_chain_sample_interval() -> u64 {
1
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ReplicaConfig {
pub replica_id: u64,
pub total_replicas: u64,
#[serde(default = "default_backup_delay_seconds")]
pub backup_delay_seconds: u64,
}

fn default_backup_delay_seconds() -> u64 {
30
}

/// Configuration values for the keeper service that are shared across chains.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct KeeperConfig {
Expand All @@ -342,6 +371,9 @@ pub struct KeeperConfig {
/// This key *does not need to be a registered provider*. In particular, production deployments
/// should ensure this is a different key in order to reduce the severity of security breaches.
pub private_key: SecretString,

#[serde(default)]
pub replica_config: Option<ReplicaConfig>,
}

// A secret is a string that can be provided either as a literal in the config,
Expand Down
8 changes: 5 additions & 3 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
api::{BlockchainState, ChainId},
chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
config::EthereumConfig,
config::{EthereumConfig, ReplicaConfig},
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{
Expand Down Expand Up @@ -56,7 +56,8 @@ pub enum RequestState {
/// handle any events for the new blocks.
#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
pub async fn run_keeper_threads(
private_key: String,
keeper_private_key: String,
keeper_replica_config: Option<ReplicaConfig>,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
Expand All @@ -69,7 +70,7 @@ pub async fn run_keeper_threads(

let contract = Arc::new(InstrumentedSignablePythContract::from_config(
&chain_eth_config,
&private_key,
&keeper_private_key,
chain_state.id.clone(),
rpc_metrics.clone(),
chain_state.network_id,
Expand All @@ -85,6 +86,7 @@ pub async fn run_keeper_threads(
contract: contract.clone(),
gas_limit,
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
replica_config: keeper_replica_config,
metrics: metrics.clone(),
fulfilled_requests_cache,
history,
Expand Down
2 changes: 2 additions & 0 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
crate::{
api::BlockchainState,
chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
config::ReplicaConfig,
eth_utils::utils::EscalationPolicy,
history::History,
keeper::{
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct ProcessParams {
pub gas_limit: U256,
pub escalation_policy: EscalationPolicy,
pub chain_state: BlockchainState,
pub replica_config: Option<ReplicaConfig>,
pub metrics: Arc<KeeperMetrics>,
pub history: Arc<History>,
pub fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
Expand Down
49 changes: 49 additions & 0 deletions apps/fortuna/src/keeper/process_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,55 @@ pub async fn process_event_with_backoff(
return Ok(());
}

// If replica config is present, we're running with multiple instances.
// The incoming request is assigned by modulo operation on the sequence number
// and the total number of replicas. If our replica_id is the primary for this sequence number,
// we process the request directly. If our replica_id is a backup, we wait for the delay and
// then check if the request is still open. If it is, we process it as a backup replica.
if let Some(replica_config) = &process_param.replica_config {
let assigned_replica = event.sequence_number % replica_config.total_replicas;
let is_primary_replica = assigned_replica == replica_config.replica_id;

if is_primary_replica {
tracing::debug!("Processing request as primary replica");
} else {
tracing::debug!("Processing request as backup replica");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge these 2 logs

tracing::info!("Waiting before processing as backup replica");
tokio::time::sleep(tokio::time::Duration::from_secs(
replica_config.backup_delay_seconds,
))
.await;

// Check if the request is still open after the delay.
// If it is, we will process it as a backup replica.
match chain_state
.contract
.get_request(event.provider_address, event.sequence_number)
.await
{
Ok(Some(_)) => {
tracing::info!(
delay_seconds = replica_config.backup_delay_seconds,
"Request still open after delay, processing as backup replica"
);
}
Ok(None) => {
tracing::debug!(
"Request already fulfilled by primary replica during delay, skipping"
);
return Ok(());
}
Err(e) => {
tracing::warn!(
error = ?e,
"Error checking request status after delay, processing as backup replica"
);
}
}
}
}

let account_label = AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
Expand Down
8 changes: 6 additions & 2 deletions contract_manager/scripts/load_test_entropy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const parser = yargs(hideBin(process.argv))
.usage(
"Load tests the entropy contract using the EntropyTester contract with many requests in a single transaction\n" +
"it does not monitor whether the callbacks are actually submitted or not.\n" +
"Usage: $0 --private-key <private-key> --chain <chain-id> --tester-address <tester-address>",
"Usage: $0 --private-key <private-key> --chain <chain-id> --tester-address <tester-address> --provider-address <provider-address>",
)
.options({
chain: {
Expand All @@ -22,6 +22,10 @@ const parser = yargs(hideBin(process.argv))
demandOption: true,
desc: "Address of the EntropyTester contract",
},
provider: {
type: "string",
desc: "Address of the entropy provider to use for requests (defaults to default provider)",
},
"success-count": {
type: "number",
default: 100,
Expand Down Expand Up @@ -66,7 +70,7 @@ async function main() {
const privateKey = toPrivateKey(argv.privateKey);
const chain = DefaultStore.getChainOrThrow(argv.chain, EvmChain);
const contract = findEntropyContract(chain);
const provider = await contract.getDefaultProvider();
const provider = argv.provider || (await contract.getDefaultProvider());
const fee = await contract.getFee(provider);
const web3 = contract.chain.getWeb3();
const testerContract = new web3.eth.Contract(ABI, argv.testerAddress);
Expand Down
Loading