Skip to content

Commit 454cd03

Browse files
committed
updates
1 parent 065fba2 commit 454cd03

File tree

7 files changed

+179
-124
lines changed

7 files changed

+179
-124
lines changed

hermes/src/api/rest.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl IntoResponse for RestError {
6565
pub async fn price_feed_ids(
6666
State(state): State<super::State>,
6767
) -> Result<Json<HashSet<PriceIdentifier>>, RestError> {
68-
let price_feeds = state.store.get_price_feed_ids();
68+
let price_feeds = state.store.get_price_feed_ids().await;
6969
Ok(Json(price_feeds))
7070
}
7171

@@ -83,6 +83,7 @@ pub async fn latest_vaas(
8383
let price_feeds_with_update_data = state
8484
.store
8585
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
86+
.await
8687
.map_err(|_| RestError::UpdateDataNotFound)?;
8788
Ok(Json(
8889
price_feeds_with_update_data
@@ -111,6 +112,7 @@ pub async fn latest_price_feeds(
111112
let price_feeds_with_update_data = state
112113
.store
113114
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
115+
.await
114116
.map_err(|_| RestError::UpdateDataNotFound)?;
115117
Ok(Json(
116118
price_feeds_with_update_data
@@ -148,6 +150,7 @@ pub async fn get_vaa(
148150
vec![price_id],
149151
RequestTime::FirstAfter(params.publish_time),
150152
)
153+
.await
151154
.map_err(|_| RestError::UpdateDataNotFound)?;
152155

153156
let vaa = price_feeds_with_update_data
@@ -198,6 +201,7 @@ pub async fn get_vaa_ccip(
198201
let price_feeds_with_update_data = state
199202
.store
200203
.get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time))
204+
.await
201205
.map_err(|_| RestError::CcipUpdateDataNotFound)?;
202206

203207
let bytes = price_feeds_with_update_data

hermes/src/api/ws.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -161,31 +161,34 @@ impl Subscriber {
161161
}
162162

163163
async fn handle_price_feeds_update(&mut self) -> Result<()> {
164-
let messages = self
165-
.price_feeds_with_config
166-
.iter()
167-
.map(|(price_feed_id, config)| {
168-
let price_feeds_with_update_data = self
169-
.store
170-
.get_price_feeds_with_update_data(vec![*price_feed_id], RequestTime::Latest)?;
171-
let price_feed = price_feeds_with_update_data
172-
.price_feeds
173-
.into_iter()
174-
.next()
175-
.ok_or_else(|| {
176-
anyhow::anyhow!("Price feed {} not found.", price_feed_id.to_string())
177-
})?;
178-
let price_feed =
179-
RpcPriceFeed::from_price_feed_update(price_feed, config.verbose, config.binary);
180-
181-
Ok(Message::Text(serde_json::to_string(
182-
&ServerMessage::PriceUpdate { price_feed },
164+
let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect();
165+
for update in self
166+
.store
167+
.get_price_feeds_with_update_data(price_feed_ids, RequestTime::Latest)
168+
.await?
169+
.price_feeds
170+
{
171+
let config = self
172+
.price_feeds_with_config
173+
.get(&PriceIdentifier::new(update.price_feed.id))
174+
.ok_or(anyhow::anyhow!(
175+
"Config missing, price feed list was poisoned during iteration."
176+
))?;
177+
178+
self.sender
179+
.feed(Message::Text(serde_json::to_string(
180+
&ServerMessage::PriceUpdate {
181+
price_feed: RpcPriceFeed::from_price_feed_update(
182+
update,
183+
config.verbose,
184+
config.binary,
185+
),
186+
},
183187
)?))
184-
})
185-
.collect::<Result<Vec<Message>>>()?;
186-
self.sender
187-
.send_all(&mut iter(messages.into_iter().map(Ok)))
188-
.await?;
188+
.await?;
189+
}
190+
191+
self.sender.flush().await?;
189192
Ok(())
190193
}
191194

hermes/src/store.rs

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use {
22
self::{
33
proof::wormhole_merkle::construct_update_data,
4-
storage::StorageInstance,
4+
storage::{
5+
MessageStateFilter,
6+
StorageInstance,
7+
},
58
types::{
69
AccumulatorMessages,
710
MessageType,
@@ -17,6 +20,7 @@ use {
1720
construct_message_states_proofs,
1821
store_wormhole_merkle_verified_message,
1922
},
23+
storage::AccumulatorState,
2024
types::{
2125
MessageState,
2226
ProofSet,
@@ -41,7 +45,6 @@ use {
4145
collections::HashSet,
4246
sync::Arc,
4347
time::{
44-
Duration,
4548
SystemTime,
4649
UNIX_EPOCH,
4750
},
@@ -62,28 +65,16 @@ pub mod storage;
6265
pub mod types;
6366
pub mod wormhole;
6467

65-
#[derive(Clone, PartialEq, Debug, Builder)]
66-
#[builder(derive(Debug), pattern = "immutable")]
67-
pub struct AccumulatorState {
68-
pub accumulator_messages: AccumulatorMessages,
69-
pub wormhole_merkle_proof: (WormholeMerkleRoot, Vec<u8>),
70-
}
71-
7268
pub struct Store {
73-
pub storage: StorageInstance,
74-
pub pending_accumulations: Cache<Slot, AccumulatorStateBuilder>,
75-
pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>,
76-
pub update_tx: Sender<()>,
69+
pub storage: StorageInstance,
70+
pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>,
71+
pub update_tx: Sender<()>,
7772
}
7873

7974
impl Store {
80-
pub fn new_with_local_cache(update_tx: Sender<()>, max_size_per_key: usize) -> Arc<Self> {
75+
pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
8176
Arc::new(Self {
82-
storage: storage::local_storage::LocalStorage::new_instance(max_size_per_key),
83-
pending_accumulations: Cache::builder()
84-
.max_capacity(10_000)
85-
.time_to_live(Duration::from_secs(60 * 5))
86-
.build(), // FIXME: Make this configurable
77+
storage: storage::local_storage::LocalStorage::new_instance(cache_size),
8778
guardian_set: RwLock::new(None),
8879
update_tx,
8980
})
@@ -117,44 +108,47 @@ impl Store {
117108
}
118109
}
119110
}
111+
120112
Update::AccumulatorMessages(accumulator_messages) => {
121113
let slot = accumulator_messages.slot;
122-
123114
log::info!("Storing accumulator messages for slot {:?}.", slot,);
124-
125-
let pending_acc = self
126-
.pending_accumulations
127-
.entry(slot)
128-
.or_default()
129-
.await
130-
.into_value();
131-
self.pending_accumulations
132-
.insert(slot, pending_acc.accumulator_messages(accumulator_messages))
133-
.await;
134-
115+
let mut accumulator_state = self
116+
.storage
117+
.fetch_accumulator_state(slot)
118+
.await?
119+
.unwrap_or(AccumulatorState {
120+
slot,
121+
accumulator_messages: None,
122+
wormhole_merkle_proof: None,
123+
});
124+
accumulator_state.accumulator_messages = Some(accumulator_messages);
125+
self.storage
126+
.store_accumulator_state(accumulator_state)
127+
.await?;
135128
slot
136129
}
137130
};
138131

139-
let pending_state = self.pending_accumulations.get(&slot);
140-
let pending_state = match pending_state {
141-
Some(pending_state) => pending_state,
142-
// Due to some race conditions this might happen when it's processed before
132+
let state = match self.storage.fetch_accumulator_state(slot).await? {
133+
Some(state) => state,
143134
None => return Ok(()),
144135
};
145136

146-
let state = match pending_state.build() {
147-
Ok(state) => state,
148-
Err(_) => return Ok(()),
149-
};
137+
let (accumulator_messages, wormhole_merkle_proof) =
138+
match (state.accumulator_messages, state.wormhole_merkle_proof) {
139+
(Some(accumulator_messages), Some(wormhole_merkle_proof)) => {
140+
(accumulator_messages, wormhole_merkle_proof)
141+
}
142+
_ => return Ok(()),
143+
};
150144

151-
let wormhole_merkle_message_states_proofs = construct_message_states_proofs(state.clone())?;
145+
let wormhole_merkle_message_states_proofs =
146+
construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_proof)?;
152147

153148
let current_time: UnixTimestamp =
154149
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
155150

156-
let message_states = state
157-
.accumulator_messages
151+
let message_states = accumulator_messages
158152
.messages
159153
.iter()
160154
.enumerate()
@@ -170,17 +164,15 @@ impl Store {
170164
.ok_or(anyhow!("Missing proof for message"))?
171165
.clone(),
172166
},
173-
state.accumulator_messages.slot,
167+
accumulator_messages.slot,
174168
current_time,
175169
))
176170
})
177171
.collect::<Result<Vec<_>>>()?;
178172

179173
log::info!("Message states len: {:?}", message_states.len());
180174

181-
self.storage.store_message_states(message_states)?;
182-
183-
self.pending_accumulations.invalidate(&slot).await;
175+
self.storage.store_message_states(message_states).await?;
184176

185177
self.update_tx.send(()).await?;
186178

@@ -191,16 +183,19 @@ impl Store {
191183
self.guardian_set.write().await.replace(guardian_set);
192184
}
193185

194-
pub fn get_price_feeds_with_update_data(
186+
pub async fn get_price_feeds_with_update_data(
195187
&self,
196188
price_ids: Vec<PriceIdentifier>,
197189
request_time: RequestTime,
198190
) -> Result<PriceFeedsWithUpdateData> {
199-
let messages = self.storage.retrieve_message_states(
200-
price_ids,
201-
request_time,
202-
Some(&|message_type| *message_type == MessageType::PriceFeedMessage),
203-
)?;
191+
let messages = self
192+
.storage
193+
.fetch_message_states(
194+
price_ids,
195+
request_time,
196+
MessageStateFilter::Only(MessageType::PriceFeedMessage),
197+
)
198+
.await?;
204199

205200
let price_feeds = messages
206201
.iter()
@@ -226,7 +221,12 @@ impl Store {
226221
})
227222
}
228223

229-
pub fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
230-
self.storage.keys().iter().map(|key| key.price_id).collect()
224+
pub async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
225+
self.storage
226+
.message_state_keys()
227+
.await
228+
.iter()
229+
.map(|key| key.price_id)
230+
.collect()
231231
}
232232
}

hermes/src/store/proof/wormhole_merkle.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use {
22
crate::store::{
3-
types::MessageState,
4-
AccumulatorState,
3+
storage::AccumulatorState,
4+
types::{
5+
AccumulatorMessages,
6+
MessageState,
7+
},
58
Store,
69
},
710
anyhow::{
@@ -40,45 +43,43 @@ pub async fn store_wormhole_merkle_verified_message(
4043
proof: WormholeMerkleRoot,
4144
vaa_bytes: Vec<u8>,
4245
) -> Result<()> {
43-
let pending_acc = store
44-
.pending_accumulations
45-
.entry(proof.slot)
46-
.or_default()
47-
.await
48-
.into_value();
46+
let mut accumulator_state = store
47+
.storage
48+
.fetch_accumulator_state(proof.slot)
49+
.await?
50+
.unwrap_or(AccumulatorState {
51+
slot: proof.slot,
52+
accumulator_messages: None,
53+
wormhole_merkle_proof: None,
54+
});
55+
56+
accumulator_state.wormhole_merkle_proof = Some((proof, vaa_bytes));
4957
store
50-
.pending_accumulations
51-
.insert(
52-
proof.slot,
53-
pending_acc.wormhole_merkle_proof((proof, vaa_bytes)),
54-
)
55-
.await;
58+
.storage
59+
.store_accumulator_state(accumulator_state)
60+
.await?;
5661
Ok(())
5762
}
5863

5964
pub fn construct_message_states_proofs(
60-
state: AccumulatorState,
65+
accumulator_messages: &AccumulatorMessages,
66+
wormhole_merkle_proof: &(WormholeMerkleRoot, Vec<u8>),
6167
) -> Result<Vec<WormholeMerkleMessageProof>> {
6268
// Check whether the state is valid
6369
let merkle_acc = match MerkleAccumulator::<Keccak160>::from_set(
64-
state
65-
.accumulator_messages
66-
.messages
67-
.iter()
68-
.map(|m| m.as_ref()),
70+
accumulator_messages.messages.iter().map(|m| m.as_ref()),
6971
) {
7072
Some(merkle_acc) => merkle_acc,
7173
None => return Ok(vec![]), // It only happens when the message set is empty
7274
};
7375

74-
let (proof, vaa) = &state.wormhole_merkle_proof;
76+
let (proof, vaa) = &wormhole_merkle_proof;
7577

7678
if merkle_acc.root != proof.root {
7779
return Err(anyhow!("Invalid merkle root"));
7880
}
7981

80-
state
81-
.accumulator_messages
82+
accumulator_messages
8283
.messages
8384
.iter()
8485
.map(|m| {

0 commit comments

Comments
 (0)