Skip to content

Commit 8b76d8c

Browse files
authored
refactor(hermes): state->aggregate downcasting (#1479)
1 parent bdc2e96 commit 8b76d8c

25 files changed

+562
-434
lines changed

apps/hermes/src/api.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use {
22
crate::{
3-
aggregate::AggregationEvent,
43
config::RunOptions,
54
state::State,
65
},
@@ -14,7 +13,6 @@ use {
1413
ipnet::IpNet,
1514
serde_qs::axum::QsQueryConfig,
1615
std::sync::Arc,
17-
tokio::sync::broadcast::Sender,
1816
tower_http::cors::CorsLayer,
1917
utoipa::OpenApi,
2018
utoipa_swagger_ui::SwaggerUi,
@@ -27,21 +25,19 @@ pub mod types;
2725
mod ws;
2826

2927
pub struct ApiState<S = State> {
30-
pub state: Arc<S>,
31-
pub ws: Arc<ws::WsState>,
32-
pub metrics: Arc<metrics_middleware::Metrics>,
33-
pub update_tx: Sender<AggregationEvent>,
28+
pub state: Arc<S>,
29+
pub ws: Arc<ws::WsState>,
30+
pub metrics: Arc<metrics_middleware::Metrics>,
3431
}
3532

3633
/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
3734
/// `State` which should not be Clone.
3835
impl<S> Clone for ApiState<S> {
3936
fn clone(&self) -> Self {
4037
Self {
41-
state: self.state.clone(),
42-
ws: self.ws.clone(),
43-
metrics: self.metrics.clone(),
44-
update_tx: self.update_tx.clone(),
38+
state: self.state.clone(),
39+
ws: self.ws.clone(),
40+
metrics: self.metrics.clone(),
4541
}
4642
}
4743
}
@@ -51,7 +47,6 @@ impl ApiState<State> {
5147
state: Arc<State>,
5248
ws_whitelist: Vec<IpNet>,
5349
requester_ip_header_name: String,
54-
update_tx: Sender<AggregationEvent>,
5550
) -> Self {
5651
Self {
5752
metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())),
@@ -61,24 +56,18 @@ impl ApiState<State> {
6156
state.clone(),
6257
)),
6358
state,
64-
update_tx,
6559
}
6660
}
6761
}
6862

69-
#[tracing::instrument(skip(opts, state, update_tx))]
70-
pub async fn spawn(
71-
opts: RunOptions,
72-
state: Arc<State>,
73-
update_tx: Sender<AggregationEvent>,
74-
) -> Result<()> {
63+
#[tracing::instrument(skip(opts, state))]
64+
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
7565
let state = {
7666
let opts = opts.clone();
7767
ApiState::new(
7868
state,
7969
opts.rpc.ws_whitelist,
8070
opts.rpc.requester_ip_header_name,
81-
update_tx,
8271
)
8372
};
8473

apps/hermes/src/api/doc_examples.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::aggregate::UnixTimestamp;
1+
use crate::state::aggregate::UnixTimestamp;
22

33
// Example values for the utoipa API docs.
44
// Note that each of these expressions is only evaluated once when the documentation is created,

apps/hermes/src/api/rest.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use {
22
super::ApiState,
3+
crate::state::aggregate::Aggregates,
34
axum::{
45
http::StatusCode,
56
response::{
@@ -93,11 +94,15 @@ impl IntoResponse for RestError {
9394
}
9495

9596
/// Verify that the price ids exist in the aggregate state.
96-
pub async fn verify_price_ids_exist(
97-
state: &ApiState,
97+
pub async fn verify_price_ids_exist<S>(
98+
state: &ApiState<S>,
9899
price_ids: &[PriceIdentifier],
99-
) -> Result<(), RestError> {
100-
let all_ids = crate::aggregate::get_price_feed_ids(&*state.state).await;
100+
) -> Result<(), RestError>
101+
where
102+
S: Aggregates,
103+
{
104+
let state = &*state.state;
105+
let all_ids = Aggregates::get_price_feed_ids(state).await;
101106
let missing_ids = price_ids
102107
.iter()
103108
.filter(|id| !all_ids.contains(id))

apps/hermes/src/api/rest/get_price_feed.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use {
22
super::verify_price_ids_exist,
33
crate::{
4-
aggregate::{
5-
RequestTime,
6-
UnixTimestamp,
7-
},
84
api::{
95
doc_examples,
106
rest::RestError,
117
types::{
128
PriceIdInput,
139
RpcPriceFeed,
1410
},
11+
ApiState,
12+
},
13+
state::aggregate::{
14+
Aggregates,
15+
RequestTime,
16+
UnixTimestamp,
1517
},
1618
},
1719
anyhow::Result,
@@ -60,16 +62,19 @@ pub struct GetPriceFeedQueryParams {
6062
GetPriceFeedQueryParams
6163
)
6264
)]
63-
pub async fn get_price_feed(
64-
State(state): State<crate::api::ApiState>,
65+
pub async fn get_price_feed<S>(
66+
State(state): State<ApiState<S>>,
6567
QsQuery(params): QsQuery<GetPriceFeedQueryParams>,
66-
) -> Result<Json<RpcPriceFeed>, RestError> {
68+
) -> Result<Json<RpcPriceFeed>, RestError>
69+
where
70+
S: Aggregates,
71+
{
6772
let price_id: PriceIdentifier = params.id.into();
68-
6973
verify_price_ids_exist(&state, &[price_id]).await?;
7074

71-
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
72-
&*state.state,
75+
let state = &*state.state;
76+
let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
77+
state,
7378
&[price_id],
7479
RequestTime::FirstAfter(params.publish_time),
7580
)

apps/hermes/src/api/rest/get_vaa.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use {
22
super::verify_price_ids_exist,
33
crate::{
4-
aggregate::{
5-
get_price_feeds_with_update_data,
6-
RequestTime,
7-
UnixTimestamp,
8-
},
94
api::{
105
doc_examples,
116
rest::RestError,
127
types::PriceIdInput,
8+
ApiState,
9+
},
10+
state::aggregate::{
11+
Aggregates,
12+
RequestTime,
13+
UnixTimestamp,
1314
},
1415
},
1516
anyhow::Result,
@@ -68,16 +69,19 @@ pub struct GetVaaResponse {
6869
GetVaaQueryParams
6970
)
7071
)]
71-
pub async fn get_vaa(
72-
State(state): State<crate::api::ApiState>,
72+
pub async fn get_vaa<S>(
73+
State(state): State<ApiState<S>>,
7374
QsQuery(params): QsQuery<GetVaaQueryParams>,
74-
) -> Result<Json<GetVaaResponse>, RestError> {
75+
) -> Result<Json<GetVaaResponse>, RestError>
76+
where
77+
S: Aggregates,
78+
{
7579
let price_id: PriceIdentifier = params.id.into();
76-
7780
verify_price_ids_exist(&state, &[price_id]).await?;
7881

79-
let price_feeds_with_update_data = get_price_feeds_with_update_data(
80-
&*state.state,
82+
let state = &*state.state;
83+
let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
84+
state,
8185
&[price_id],
8286
RequestTime::FirstAfter(params.publish_time),
8387
)

apps/hermes/src/api/rest/get_vaa_ccip.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use {
22
super::verify_price_ids_exist,
33
crate::{
4-
aggregate::{
4+
api::{
5+
rest::RestError,
6+
ApiState,
7+
},
8+
state::aggregate::{
9+
Aggregates,
510
RequestTime,
611
UnixTimestamp,
712
},
8-
api::rest::RestError,
913
},
1014
anyhow::Result,
1115
axum::{
@@ -56,25 +60,29 @@ pub struct GetVaaCcipResponse {
5660
GetVaaCcipQueryParams
5761
)
5862
)]
59-
pub async fn get_vaa_ccip(
60-
State(state): State<crate::api::ApiState>,
63+
pub async fn get_vaa_ccip<S>(
64+
State(state): State<ApiState<S>>,
6165
QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
62-
) -> Result<Json<GetVaaCcipResponse>, RestError> {
66+
) -> Result<Json<GetVaaCcipResponse>, RestError>
67+
where
68+
S: Aggregates,
69+
{
6370
let price_id: PriceIdentifier = PriceIdentifier::new(
6471
params.data[0..32]
6572
.try_into()
6673
.map_err(|_| RestError::InvalidCCIPInput)?,
6774
);
75+
verify_price_ids_exist(&state, &[price_id]).await?;
76+
6877
let publish_time = UnixTimestamp::from_be_bytes(
6978
params.data[32..40]
7079
.try_into()
7180
.map_err(|_| RestError::InvalidCCIPInput)?,
7281
);
7382

74-
verify_price_ids_exist(&state, &[price_id]).await?;
75-
76-
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
77-
&*state.state,
83+
let state = &*state.state;
84+
let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
85+
state,
7886
&[price_id],
7987
RequestTime::FirstAfter(publish_time),
8088
)

apps/hermes/src/api/rest/latest_price_feeds.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use {
22
super::verify_price_ids_exist,
33
crate::{
4-
aggregate::RequestTime,
54
api::{
65
rest::RestError,
76
types::{
87
PriceIdInput,
98
RpcPriceFeed,
109
},
10+
ApiState,
11+
},
12+
state::aggregate::{
13+
Aggregates,
14+
RequestTime,
1115
},
1216
},
1317
anyhow::Result,
@@ -59,28 +63,28 @@ pub struct LatestPriceFeedsQueryParams {
5963
LatestPriceFeedsQueryParams
6064
)
6165
)]
62-
pub async fn latest_price_feeds(
63-
State(state): State<crate::api::ApiState>,
66+
pub async fn latest_price_feeds<S>(
67+
State(state): State<ApiState<S>>,
6468
QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
65-
) -> Result<Json<Vec<RpcPriceFeed>>, RestError> {
69+
) -> Result<Json<Vec<RpcPriceFeed>>, RestError>
70+
where
71+
S: Aggregates,
72+
{
6673
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
67-
6874
verify_price_ids_exist(&state, &price_ids).await?;
6975

70-
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
71-
&*state.state,
72-
&price_ids,
73-
RequestTime::Latest,
74-
)
75-
.await
76-
.map_err(|e| {
77-
tracing::warn!(
78-
"Error getting price feeds {:?} with update data: {:?}",
79-
price_ids,
80-
e
81-
);
82-
RestError::UpdateDataNotFound
83-
})?;
76+
let state = &*state.state;
77+
let price_feeds_with_update_data =
78+
Aggregates::get_price_feeds_with_update_data(state, &price_ids, RequestTime::Latest)
79+
.await
80+
.map_err(|e| {
81+
tracing::warn!(
82+
"Error getting price feeds {:?} with update data: {:?}",
83+
price_ids,
84+
e
85+
);
86+
RestError::UpdateDataNotFound
87+
})?;
8488

8589
Ok(Json(
8690
price_feeds_with_update_data

apps/hermes/src/api/rest/latest_vaas.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use {
22
super::verify_price_ids_exist,
33
crate::{
4-
aggregate::RequestTime,
54
api::{
65
doc_examples,
76
rest::RestError,
87
types::PriceIdInput,
8+
ApiState,
9+
},
10+
state::aggregate::{
11+
Aggregates,
12+
RequestTime,
913
},
1014
},
1115
anyhow::Result,
@@ -54,28 +58,28 @@ pub struct LatestVaasQueryParams {
5458
(status = 200, description = "VAAs retrieved successfully", body = Vec<String>, example=json!([doc_examples::vaa_example()]))
5559
),
5660
)]
57-
pub async fn latest_vaas(
58-
State(state): State<crate::api::ApiState>,
61+
pub async fn latest_vaas<S>(
62+
State(state): State<ApiState<S>>,
5963
QsQuery(params): QsQuery<LatestVaasQueryParams>,
60-
) -> Result<Json<Vec<String>>, RestError> {
64+
) -> Result<Json<Vec<String>>, RestError>
65+
where
66+
S: Aggregates,
67+
{
6168
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
62-
6369
verify_price_ids_exist(&state, &price_ids).await?;
6470

65-
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
66-
&*state.state,
67-
&price_ids,
68-
RequestTime::Latest,
69-
)
70-
.await
71-
.map_err(|e| {
72-
tracing::warn!(
73-
"Error getting price feeds {:?} with update data: {:?}",
74-
price_ids,
75-
e
76-
);
77-
RestError::UpdateDataNotFound
78-
})?;
71+
let state = &*state.state;
72+
let price_feeds_with_update_data =
73+
Aggregates::get_price_feeds_with_update_data(state, &price_ids, RequestTime::Latest)
74+
.await
75+
.map_err(|e| {
76+
tracing::warn!(
77+
"Error getting price feeds {:?} with update data: {:?}",
78+
price_ids,
79+
e
80+
);
81+
RestError::UpdateDataNotFound
82+
})?;
7983

8084
Ok(Json(
8185
price_feeds_with_update_data

0 commit comments

Comments
 (0)