Skip to content

Commit a66fe6e

Browse files
feat(fortuna): implement multiple replica support with sequence number modulo filtering
Co-Authored-By: Tejas Badadare <[email protected]>
1 parent 53df858 commit a66fe6e

File tree

5 files changed

+76
-0
lines changed

5 files changed

+76
-0
lines changed

apps/fortuna/config.sample.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ chains:
4747
# blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks.
4848
block_delays: [5, 10, 20]
4949

50+
51+
5052
# Historical commitments -- delete this block for local development purposes
5153
commitments:
5254
# prettier-ignore

apps/fortuna/src/config.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ impl Config {
9494
}
9595
}
9696

97+
if let Some(replica_config) = &config.keeper.replica_config {
98+
if replica_config.total_replicas == 0 {
99+
return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0."));
100+
}
101+
if replica_config.replica_id >= replica_config.total_replicas {
102+
return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas."));
103+
}
104+
}
105+
97106
Ok(config)
98107
}
99108

@@ -333,6 +342,12 @@ fn default_chain_sample_interval() -> u64 {
333342
1
334343
}
335344

345+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
346+
pub struct ReplicaConfig {
347+
pub replica_id: u64,
348+
pub total_replicas: u64,
349+
}
350+
336351
/// Configuration values for the keeper service that are shared across chains.
337352
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
338353
pub struct KeeperConfig {
@@ -342,6 +357,9 @@ pub struct KeeperConfig {
342357
/// This key *does not need to be a registered provider*. In particular, production deployments
343358
/// should ensure this is a different key in order to reduce the severity of security breaches.
344359
pub private_key: SecretString,
360+
361+
#[serde(default)]
362+
pub replica_config: Option<ReplicaConfig>,
345363
}
346364

347365
// A secret is a string that can be provided either as a literal in the config,

apps/fortuna/src/keeper.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ pub async fn run_keeper_threads(
8585
contract: contract.clone(),
8686
gas_limit,
8787
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
88+
keeper_config: crate::config::KeeperConfig {
89+
private_key: crate::config::SecretString {
90+
value: None,
91+
file: None,
92+
},
93+
replica_config: None,
94+
},
8895
metrics: metrics.clone(),
8996
fulfilled_requests_cache,
9097
history,

apps/fortuna/src/keeper/block.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct ProcessParams {
4545
pub gas_limit: U256,
4646
pub escalation_policy: EscalationPolicy,
4747
pub chain_state: BlockchainState,
48+
pub keeper_config: crate::config::KeeperConfig,
4849
pub metrics: Arc<KeeperMetrics>,
4950
pub history: Arc<History>,
5051
pub fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,

apps/fortuna/src/keeper/process_event.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,54 @@ pub async fn process_event_with_backoff(
3535
return Ok(());
3636
}
3737

38+
let is_primary_replica =
39+
if let Some(replica_config) = &process_param.keeper_config.replica_config {
40+
let assigned_replica = event.sequence_number % replica_config.total_replicas;
41+
if assigned_replica != replica_config.replica_id {
42+
tracing::debug!(
43+
sequence_number = event.sequence_number,
44+
assigned_replica = assigned_replica,
45+
our_replica_id = replica_config.replica_id,
46+
"Processing request as backup replica"
47+
);
48+
false
49+
} else {
50+
true
51+
}
52+
} else {
53+
true // No replica config, process all requests
54+
};
55+
56+
if !is_primary_replica {
57+
match chain_state
58+
.contract
59+
.get_request(event.provider_address, event.sequence_number)
60+
.await
61+
{
62+
Ok(Some(_)) => {
63+
tracing::info!(
64+
sequence_number = event.sequence_number,
65+
"Request still open, processing as backup replica"
66+
);
67+
}
68+
Ok(None) => {
69+
tracing::debug!(
70+
sequence_number = event.sequence_number,
71+
"Request already fulfilled by primary replica, skipping"
72+
);
73+
return Ok(());
74+
}
75+
Err(e) => {
76+
tracing::warn!(
77+
sequence_number = event.sequence_number,
78+
error = ?e,
79+
"Error checking request status, skipping"
80+
);
81+
return Ok(());
82+
}
83+
}
84+
}
85+
3886
let account_label = AccountLabel {
3987
chain_id: chain_state.id.clone(),
4088
address: chain_state.provider_address.to_string(),

0 commit comments

Comments
 (0)