Skip to content

Commit ce4019b

Browse files
committed
refactor(hermes): state->price_feed_metadata downcasting
1 parent d1c5d93 commit ce4019b

File tree

6 files changed

+134
-75
lines changed

6 files changed

+134
-75
lines changed

hermes/src/aggregate.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ where
426426

427427
pub async fn is_ready(state: &State) -> bool {
428428
let metadata = state.aggregate_state.read().await;
429-
let price_feeds_metadata = state.price_feeds_metadata.read().await;
429+
let price_feeds_metadata = state.price_feed_meta.data.read().await;
430430

431431
let has_completed_recently = match metadata.latest_completed_update_at.as_ref() {
432432
Some(latest_completed_update_time) => {
@@ -456,7 +456,7 @@ mod test {
456456
super::*,
457457
crate::{
458458
api::types::PriceFeedMetadata,
459-
price_feeds_metadata::store_price_feeds_metadata,
459+
price_feeds_metadata::PriceFeedMeta,
460460
state::test::setup_state,
461461
},
462462
futures::future::join_all,
@@ -809,15 +809,13 @@ mod test {
809809

810810

811811
// Add a dummy price feeds metadata
812-
store_price_feeds_metadata(
813-
&state,
814-
&[PriceFeedMetadata {
812+
state
813+
.store_price_feeds_metadata(&[PriceFeedMetadata {
815814
id: PriceIdentifier::new([100; 32]),
816815
attributes: Default::default(),
817-
}],
818-
)
819-
.await
820-
.unwrap();
816+
}])
817+
.await
818+
.unwrap();
821819

822820
// Check the state is ready
823821
assert!(is_ready(&state).await);

hermes/src/api.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,27 @@ mod rest;
2626
pub mod types;
2727
mod ws;
2828

29-
#[derive(Clone)]
30-
pub struct ApiState {
31-
pub state: Arc<State>,
29+
pub struct ApiState<S = State> {
30+
pub state: Arc<S>,
3231
pub ws: Arc<ws::WsState>,
3332
pub metrics: Arc<metrics_middleware::Metrics>,
3433
pub update_tx: Sender<AggregationEvent>,
3534
}
3635

37-
impl ApiState {
36+
/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
37+
/// `State` which should not be Clone.
38+
impl<S> Clone for ApiState<S> {
39+
fn clone(&self) -> Self {
40+
Self {
41+
state: self.state.clone(),
42+
ws: self.ws.clone(),
43+
metrics: self.metrics.clone(),
44+
update_tx: self.update_tx.clone(),
45+
}
46+
}
47+
}
48+
49+
impl ApiState<State> {
3850
pub fn new(
3951
state: Arc<State>,
4052
ws_whitelist: Vec<IpNet>,

hermes/src/api/rest/v2/price_feeds_metadata.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use {
66
AssetType,
77
PriceFeedMetadata,
88
},
9+
ApiState,
910
},
10-
price_feeds_metadata::get_price_feeds_metadata,
11+
price_feeds_metadata::PriceFeedMeta,
1112
},
1213
anyhow::Result,
1314
axum::{
@@ -46,19 +47,23 @@ pub struct PriceFeedsMetadataQueryParams {
4647
PriceFeedsMetadataQueryParams
4748
)
4849
)]
49-
pub async fn price_feeds_metadata(
50-
State(state): State<crate::api::ApiState>,
50+
pub async fn price_feeds_metadata<S>(
51+
State(state): State<ApiState<S>>,
5152
QsQuery(params): QsQuery<PriceFeedsMetadataQueryParams>,
52-
) -> Result<Json<Vec<PriceFeedMetadata>>, RestError> {
53-
let price_feeds_metadata =
54-
get_price_feeds_metadata(&state.state, params.query, params.asset_type)
55-
.await
56-
.map_err(|e| {
57-
tracing::warn!("RPC connection error: {}", e);
58-
RestError::RpcConnectionError {
59-
message: format!("RPC connection error: {}", e),
60-
}
61-
})?;
53+
) -> Result<Json<Vec<PriceFeedMetadata>>, RestError>
54+
where
55+
S: PriceFeedMeta,
56+
{
57+
let state = &state.state;
58+
let price_feeds_metadata = state
59+
.get_price_feeds_metadata(params.query, params.asset_type)
60+
.await
61+
.map_err(|e| {
62+
tracing::warn!("RPC connection error: {}", e);
63+
RestError::RpcConnectionError {
64+
message: format!("RPC connection error: {}", e),
65+
}
66+
})?;
6267

6368
Ok(Json(price_feeds_metadata))
6469
}

hermes/src/network/pythnet.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use {
1717
GuardianSetData,
1818
},
1919
price_feeds_metadata::{
20-
store_price_feeds_metadata,
20+
PriceFeedMeta,
2121
DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL,
2222
},
2323
state::State,
@@ -353,13 +353,18 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
353353
}
354354

355355

356-
pub async fn fetch_and_store_price_feeds_metadata(
357-
state: &State,
356+
pub async fn fetch_and_store_price_feeds_metadata<S>(
357+
state: &S,
358358
mapping_address: &Pubkey,
359359
rpc_client: &RpcClient,
360-
) -> Result<Vec<PriceFeedMetadata>> {
360+
) -> Result<Vec<PriceFeedMetadata>>
361+
where
362+
S: PriceFeedMeta,
363+
{
361364
let price_feeds_metadata = fetch_price_feeds_metadata(mapping_address, rpc_client).await?;
362-
store_price_feeds_metadata(state, &price_feeds_metadata).await?;
365+
state
366+
.store_price_feeds_metadata(&price_feeds_metadata)
367+
.await?;
363368
Ok(price_feeds_metadata)
364369
}
365370

hermes/src/price_feeds_metadata.rs

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,49 +7,88 @@ use {
77
state::State,
88
},
99
anyhow::Result,
10+
tokio::sync::RwLock,
1011
};
1112

1213
pub const DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL: u64 = 600;
1314

14-
pub async fn retrieve_price_feeds_metadata(state: &State) -> Result<Vec<PriceFeedMetadata>> {
15-
let price_feeds_metadata = state.price_feeds_metadata.read().await;
16-
Ok(price_feeds_metadata.clone())
15+
pub struct PriceFeedMetaState {
16+
pub data: RwLock<Vec<PriceFeedMetadata>>,
1717
}
1818

19-
pub async fn store_price_feeds_metadata(
20-
state: &State,
21-
price_feeds_metadata: &[PriceFeedMetadata],
22-
) -> Result<()> {
23-
let mut price_feeds_metadata_write_guard = state.price_feeds_metadata.write().await;
24-
*price_feeds_metadata_write_guard = price_feeds_metadata.to_vec();
25-
Ok(())
19+
impl PriceFeedMetaState {
20+
pub fn new() -> Self {
21+
Self {
22+
data: RwLock::new(Vec::new()),
23+
}
24+
}
2625
}
2726

27+
/// Allow downcasting State into CacheState for functions that depend on the `Cache` service.
28+
impl<'a> From<&'a State> for &'a PriceFeedMetaState {
29+
fn from(state: &'a State) -> &'a PriceFeedMetaState {
30+
&state.price_feed_meta
31+
}
32+
}
33+
34+
pub trait PriceFeedMeta {
35+
async fn retrieve_price_feeds_metadata(&self) -> Result<Vec<PriceFeedMetadata>>;
36+
async fn store_price_feeds_metadata(
37+
&self,
38+
price_feeds_metadata: &[PriceFeedMetadata],
39+
) -> Result<()>;
40+
async fn get_price_feeds_metadata(
41+
&self,
42+
query: Option<String>,
43+
asset_type: Option<AssetType>,
44+
) -> Result<Vec<PriceFeedMetadata>>;
45+
}
2846

29-
pub async fn get_price_feeds_metadata(
30-
state: &State,
31-
query: Option<String>,
32-
asset_type: Option<AssetType>,
33-
) -> Result<Vec<PriceFeedMetadata>> {
34-
let mut price_feeds_metadata = retrieve_price_feeds_metadata(state).await?;
35-
36-
// Filter by query if provided
37-
if let Some(query_str) = &query {
38-
price_feeds_metadata.retain(|feed| {
39-
feed.attributes.get("symbol").map_or(false, |symbol| {
40-
symbol.to_lowercase().contains(&query_str.to_lowercase())
41-
})
42-
});
47+
impl<T> PriceFeedMeta for T
48+
where
49+
for<'a> &'a T: Into<&'a PriceFeedMetaState>,
50+
T: Sync,
51+
{
52+
async fn retrieve_price_feeds_metadata(&self) -> Result<Vec<PriceFeedMetadata>> {
53+
let price_feeds_metadata = self.into().data.read().await;
54+
Ok(price_feeds_metadata.clone())
4355
}
4456

45-
// Filter by asset_type if provided
46-
if let Some(asset_type) = &asset_type {
47-
price_feeds_metadata.retain(|feed| {
48-
feed.attributes.get("asset_type").map_or(false, |type_str| {
49-
type_str.to_lowercase() == asset_type.to_string().to_lowercase()
50-
})
51-
});
57+
async fn store_price_feeds_metadata(
58+
&self,
59+
price_feeds_metadata: &[PriceFeedMetadata],
60+
) -> Result<()> {
61+
let mut price_feeds_metadata_write_guard = self.into().data.write().await;
62+
*price_feeds_metadata_write_guard = price_feeds_metadata.to_vec();
63+
Ok(())
5264
}
5365

54-
Ok(price_feeds_metadata)
66+
67+
async fn get_price_feeds_metadata(
68+
&self,
69+
query: Option<String>,
70+
asset_type: Option<AssetType>,
71+
) -> Result<Vec<PriceFeedMetadata>> {
72+
let mut price_feeds_metadata = self.retrieve_price_feeds_metadata().await?;
73+
74+
// Filter by query if provided
75+
if let Some(query_str) = &query {
76+
price_feeds_metadata.retain(|feed| {
77+
feed.attributes.get("symbol").map_or(false, |symbol| {
78+
symbol.to_lowercase().contains(&query_str.to_lowercase())
79+
})
80+
});
81+
}
82+
83+
// Filter by asset_type if provided
84+
if let Some(asset_type) = &asset_type {
85+
price_feeds_metadata.retain(|feed| {
86+
feed.attributes.get("asset_type").map_or(false, |type_str| {
87+
type_str.to_lowercase() == asset_type.to_string().to_lowercase()
88+
})
89+
});
90+
}
91+
92+
Ok(price_feeds_metadata)
93+
}
5594
}

hermes/src/state.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use {
1010
AggregateState,
1111
AggregationEvent,
1212
},
13-
api::types::PriceFeedMetadata,
1413
network::wormhole::GuardianSet,
14+
price_feeds_metadata::PriceFeedMetaState,
1515
},
1616
prometheus_client::registry::Registry,
1717
reqwest::Url,
@@ -38,6 +38,9 @@ pub struct State {
3838
/// State for the `Benchmarks` service for looking up historical updates.
3939
pub benchmarks: BenchmarksState,
4040

41+
/// State for the `PriceFeedMeta` service for looking up metadata related to Pyth price feeds.
42+
pub price_feed_meta: PriceFeedMetaState,
43+
4144
/// Sequence numbers of lately observed Vaas. Store uses this set
4245
/// to ignore the previously observed Vaas as a performance boost.
4346
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
@@ -53,9 +56,6 @@ pub struct State {
5356

5457
/// Metrics registry
5558
pub metrics_registry: RwLock<Registry>,
56-
57-
/// Price feeds metadata
58-
pub price_feeds_metadata: RwLock<Vec<PriceFeedMetadata>>,
5959
}
6060

6161
impl State {
@@ -66,14 +66,14 @@ impl State {
6666
) -> Arc<Self> {
6767
let mut metrics_registry = Registry::default();
6868
Arc::new(Self {
69-
cache: CacheState::new(cache_size),
70-
benchmarks: BenchmarksState::new(benchmarks_endpoint),
71-
observed_vaa_seqs: RwLock::new(Default::default()),
72-
guardian_set: RwLock::new(Default::default()),
73-
api_update_tx: update_tx,
74-
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
75-
metrics_registry: RwLock::new(metrics_registry),
76-
price_feeds_metadata: RwLock::new(Default::default()),
69+
cache: CacheState::new(cache_size),
70+
benchmarks: BenchmarksState::new(benchmarks_endpoint),
71+
price_feed_meta: PriceFeedMetaState::new(),
72+
observed_vaa_seqs: RwLock::new(Default::default()),
73+
guardian_set: RwLock::new(Default::default()),
74+
api_update_tx: update_tx,
75+
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
76+
metrics_registry: RwLock::new(metrics_registry),
7777
})
7878
}
7979
}

0 commit comments

Comments
 (0)