Skip to content

Commit c3b61f2

Browse files
authored
feat(hermes): make readiness thresholds configurable (#1687)
1 parent 045cfee commit c3b61f2

File tree

7 files changed

+102
-37
lines changed

7 files changed

+102
-37
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.5.13"
3+
version = "0.5.14"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/config.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use clap::{
77
Parser,
88
};
99

10+
mod aggregate;
1011
mod benchmarks;
1112
mod metrics;
1213
mod pythnet;
@@ -30,9 +31,17 @@ pub enum Options {
3031

3132
#[derive(Args, Clone, Debug)]
3233
pub struct RunOptions {
33-
/// Wormhole Options.
34+
/// Aggregate Options
3435
#[command(flatten)]
35-
pub wormhole: wormhole::Options,
36+
pub aggregate: aggregate::Options,
37+
38+
/// Benchmarks Options
39+
#[command(flatten)]
40+
pub benchmarks: benchmarks::Options,
41+
42+
/// Metrics Options
43+
#[command(flatten)]
44+
pub metrics: metrics::Options,
3645

3746
/// PythNet Options
3847
#[command(flatten)]
@@ -42,13 +51,9 @@ pub struct RunOptions {
4251
#[command(flatten)]
4352
pub rpc: rpc::Options,
4453

45-
/// Benchmarks Options
46-
#[command(flatten)]
47-
pub benchmarks: benchmarks::Options,
48-
49-
/// Metrics Options
54+
/// Wormhole Options.
5055
#[command(flatten)]
51-
pub metrics: metrics::Options,
56+
pub wormhole: wormhole::Options,
5257
}
5358

5459
#[derive(Args, Clone, Debug)]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use {
2+
clap::Args,
3+
humantime::Duration,
4+
};
5+
6+
#[derive(Args, Clone, Debug)]
7+
#[command(next_help_heading = "Aggregate Options")]
8+
#[group(id = "Aggregate")]
9+
pub struct Options {
10+
/// The duration of no aggregation after which the readiness of the state is considered stale.
11+
#[arg(long = "aggregate-readiness-staleness-threshold")]
12+
#[arg(env = "AGGREGATE_READINESS_STALENESS_THRESHOLD")]
13+
#[arg(default_value = "30s")]
14+
pub readiness_staleness_threshold: Duration,
15+
16+
/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
17+
#[arg(long = "aggregate-readiness-max-allowed-slot-lag")]
18+
#[arg(env = "AGGREGATE_READINESS_MAX_ALLOWED_SLOT_LAG")]
19+
#[arg(default_value = "10")]
20+
pub readiness_max_allowed_slot_lag: u64,
21+
}

apps/hermes/server/src/main.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@ async fn init() -> Result<()> {
5151
let (update_tx, _) = tokio::sync::broadcast::channel(1000);
5252

5353
// Initialize a cache store with a 1000 element circular buffer.
54-
let state = state::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone());
54+
let state = state::new(
55+
update_tx.clone(),
56+
1000,
57+
opts.benchmarks.endpoint.clone(),
58+
opts.aggregate.readiness_staleness_threshold.into(),
59+
opts.aggregate.readiness_max_allowed_slot_lag,
60+
);
5561

5662
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
5763
spawn(async move {

apps/hermes/server/src/state.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ use {
1212
price_feeds_metadata::PriceFeedMetaState,
1313
wormhole::WormholeState,
1414
},
15+
aggregate::Slot,
1516
prometheus_client::registry::Registry,
1617
reqwest::Url,
17-
std::sync::Arc,
18+
std::{
19+
sync::Arc,
20+
time::Duration,
21+
},
1822
tokio::sync::broadcast::Sender,
1923
};
2024

@@ -64,13 +68,20 @@ pub fn new(
6468
update_tx: Sender<AggregationEvent>,
6569
cache_size: u64,
6670
benchmarks_endpoint: Option<Url>,
71+
readiness_staleness_threshold: Duration,
72+
readiness_max_allowed_slot_lag: Slot,
6773
) -> Arc<impl Metrics + Wormhole> {
6874
let mut metrics_registry = Registry::default();
6975
Arc::new(State {
7076
cache: CacheState::new(cache_size),
7177
benchmarks: BenchmarksState::new(benchmarks_endpoint),
7278
price_feed_meta: PriceFeedMetaState::new(),
73-
aggregates: AggregateState::new(update_tx, &mut metrics_registry),
79+
aggregates: AggregateState::new(
80+
update_tx,
81+
readiness_staleness_threshold,
82+
readiness_max_allowed_slot_lag,
83+
&mut metrics_registry,
84+
),
7485
wormhole: WormholeState::new(),
7586
metrics: MetricsState::new(metrics_registry),
7687
})
@@ -85,15 +96,18 @@ pub mod test {
8596
Wormhole,
8697
},
8798
crate::network::wormhole::GuardianSet,
88-
std::sync::Arc,
99+
std::{
100+
sync::Arc,
101+
time::Duration,
102+
},
89103
tokio::sync::broadcast::Receiver,
90104
};
91105

92106
pub async fn setup_state(
93107
cache_size: u64,
94108
) -> (Arc<impl Aggregates>, Receiver<AggregationEvent>) {
95109
let (update_tx, update_rx) = tokio::sync::broadcast::channel(1000);
96-
let state = super::new(update_tx, cache_size, None);
110+
let state = super::new(update_tx, cache_size, None, Duration::from_secs(30), 10);
97111

98112
// Add an initial guardian set with public key 0
99113
Wormhole::update_guardian_set(

apps/hermes/server/src/state/aggregate.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use {
1919
WormholeMerkleState,
2020
},
2121
crate::{
22-
api::types::RpcPriceIdentifier,
2322
network::wormhole::VaaBytes,
2423
state::{
2524
benchmarks::Benchmarks,
@@ -123,17 +122,29 @@ pub struct AggregateStateData {
123122
/// probes.
124123
pub latest_observed_slot: Option<Slot>,
125124

125+
/// The duration of no aggregation after which the readiness of the state is considered stale.
126+
pub readiness_staleness_threshold: Duration,
127+
128+
/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
129+
pub readiness_max_allowed_slot_lag: Slot,
130+
126131
/// Aggregate Specific Metrics
127132
pub metrics: metrics::Metrics,
128133
}
129134

130135
impl AggregateStateData {
131-
pub fn new(metrics_registry: &mut Registry) -> Self {
136+
pub fn new(
137+
readiness_staleness_threshold: Duration,
138+
readiness_max_allowed_slot_lag: Slot,
139+
metrics_registry: &mut Registry,
140+
) -> Self {
132141
Self {
133-
latest_completed_slot: None,
142+
latest_completed_slot: None,
134143
latest_completed_update_at: None,
135-
latest_observed_slot: None,
136-
metrics: metrics::Metrics::new(metrics_registry),
144+
latest_observed_slot: None,
145+
metrics: metrics::Metrics::new(metrics_registry),
146+
readiness_staleness_threshold,
147+
readiness_max_allowed_slot_lag,
137148
}
138149
}
139150
}
@@ -144,9 +155,18 @@ pub struct AggregateState {
144155
}
145156

146157
impl AggregateState {
147-
pub fn new(update_tx: Sender<AggregationEvent>, metrics_registry: &mut Registry) -> Self {
158+
pub fn new(
159+
update_tx: Sender<AggregationEvent>,
160+
readiness_staleness_threshold: Duration,
161+
readiness_max_allowed_slot_lag: Slot,
162+
metrics_registry: &mut Registry,
163+
) -> Self {
148164
Self {
149-
data: RwLock::new(AggregateStateData::new(metrics_registry)),
165+
data: RwLock::new(AggregateStateData::new(
166+
readiness_staleness_threshold,
167+
readiness_max_allowed_slot_lag,
168+
metrics_registry,
169+
)),
150170
api_update_tx: update_tx,
151171
}
152172
}
@@ -193,12 +213,6 @@ pub struct PriceFeedsWithUpdateData {
193213
pub update_data: Vec<Vec<u8>>,
194214
}
195215

196-
const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
197-
198-
/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
199-
/// 10 slots is almost 5 seconds.
200-
const READINESS_MAX_ALLOWED_SLOT_LAG: Slot = 10;
201-
202216
#[async_trait::async_trait]
203217
pub trait Aggregates
204218
where
@@ -388,24 +402,25 @@ where
388402
}
389403

390404
async fn is_ready(&self) -> bool {
391-
let metadata = self.into().data.read().await;
405+
let state_data = self.into().data.read().await;
392406
let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self)
393407
.await
394408
.unwrap();
395409

396-
let has_completed_recently = match metadata.latest_completed_update_at.as_ref() {
410+
let has_completed_recently = match state_data.latest_completed_update_at.as_ref() {
397411
Some(latest_completed_update_time) => {
398-
latest_completed_update_time.elapsed() < READINESS_STALENESS_THRESHOLD
412+
latest_completed_update_time.elapsed() < state_data.readiness_staleness_threshold
399413
}
400414
None => false,
401415
};
402416

403417
let is_not_behind = match (
404-
metadata.latest_completed_slot,
405-
metadata.latest_observed_slot,
418+
state_data.latest_completed_slot,
419+
state_data.latest_observed_slot,
406420
) {
407421
(Some(latest_completed_slot), Some(latest_observed_slot)) => {
408-
latest_observed_slot - latest_completed_slot <= READINESS_MAX_ALLOWED_SLOT_LAG
422+
latest_observed_slot - latest_completed_slot
423+
<= state_data.readiness_max_allowed_slot_lag
409424
}
410425
_ => false,
411426
};
@@ -512,7 +527,10 @@ mod test {
512527
use {
513528
super::*,
514529
crate::{
515-
api::types::PriceFeedMetadata,
530+
api::types::{
531+
PriceFeedMetadata,
532+
RpcPriceIdentifier,
533+
},
516534
state::test::setup_state,
517535
},
518536
futures::future::join_all,
@@ -881,8 +899,9 @@ mod test {
881899
assert!(state.is_ready().await);
882900

883901
// Advance the clock to make the prices stale
884-
MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
885-
MockClock::advance(READINESS_STALENESS_THRESHOLD);
902+
let staleness_threshold = Duration::from_secs(30);
903+
MockClock::advance_system_time(staleness_threshold);
904+
MockClock::advance(staleness_threshold);
886905
// Check the state is not ready
887906
assert!(!state.is_ready().await);
888907
}

0 commit comments

Comments
 (0)