Skip to content

Commit bb922c3

Browse files
committed
fix(hermes): add lock for the entire message state
Before this change, there was a lock for each message and it could cause the updateData for multiple ids have 2 updates (because of the race with the thread updating the states). This change adds a RwLock which makes sure that when the entire message state is updating, no one can read from it while allowing concurrent reads in other occasions.
1 parent a224199 commit bb922c3

File tree

3 files changed

+34
-30
lines changed

3 files changed

+34
-30
lines changed

hermes/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.3.1"
3+
version = "0.3.2"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

hermes/src/state/cache.rs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ use {
1212
anyhow,
1313
Result,
1414
},
15-
dashmap::DashMap,
15+
futures::future::join_all,
1616
pythnet_sdk::messages::{
1717
FeedId,
1818
Message,
1919
MessageType,
2020
},
2121
std::{
22-
collections::BTreeMap,
22+
collections::{
23+
BTreeMap,
24+
HashMap,
25+
},
2326
ops::Bound,
2427
sync::Arc,
2528
},
@@ -103,16 +106,16 @@ pub struct Cache {
103106
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
104107
wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,
105108

106-
message_cache: Arc<DashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>,
109+
message_cache: Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>,
107110
cache_size: u64,
108111
}
109112

110-
fn retrieve_message_state(
113+
async fn retrieve_message_state(
111114
cache: &Cache,
112115
key: MessageStateKey,
113116
request_time: RequestTime,
114117
) -> Option<MessageState> {
115-
match cache.message_cache.get(&key) {
118+
match cache.message_cache.read().await.get(&key) {
116119
Some(key_cache) => {
117120
match request_time {
118121
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
@@ -154,7 +157,7 @@ fn retrieve_message_state(
154157
impl Cache {
155158
pub fn new(cache_size: u64) -> Self {
156159
Self {
157-
message_cache: Arc::new(DashMap::new()),
160+
message_cache: Arc::new(RwLock::new(HashMap::new())),
158161
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
159162
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
160163
cache_size,
@@ -189,20 +192,20 @@ impl AggregateCache for crate::state::State {
189192
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
190193
self.cache
191194
.message_cache
195+
.read()
196+
.await
192197
.iter()
193-
.map(|entry| entry.key().clone())
198+
.map(|entry| entry.0.clone())
194199
.collect::<Vec<_>>()
195200
}
196201

197202
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
203+
let mut message_cache = self.cache.message_cache.write().await;
204+
198205
for message_state in message_states {
199206
let key = message_state.key();
200207
let time = message_state.time();
201-
let mut cache = self
202-
.cache
203-
.message_cache
204-
.entry(key)
205-
.or_insert_with(BTreeMap::new);
208+
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);
206209

207210
cache.insert(time, message_state);
208211

@@ -220,24 +223,25 @@ impl AggregateCache for crate::state::State {
220223
request_time: RequestTime,
221224
filter: MessageStateFilter,
222225
) -> Result<Vec<MessageState>> {
223-
ids.into_iter()
224-
.flat_map(|id| {
225-
let request_time = request_time.clone();
226-
let message_types: Vec<MessageType> = match filter {
227-
MessageStateFilter::All => MessageType::iter().collect(),
228-
MessageStateFilter::Only(t) => vec![t],
226+
join_all(ids.into_iter().flat_map(|id| {
227+
let request_time = request_time.clone();
228+
let message_types: Vec<MessageType> = match filter {
229+
MessageStateFilter::All => MessageType::iter().collect(),
230+
MessageStateFilter::Only(t) => vec![t],
231+
};
232+
233+
message_types.into_iter().map(move |message_type| {
234+
let key = MessageStateKey {
235+
feed_id: id,
236+
type_: message_type,
229237
};
230-
231-
message_types.into_iter().map(move |message_type| {
232-
let key = MessageStateKey {
233-
feed_id: id,
234-
type_: message_type,
235-
};
236-
retrieve_message_state(&self.cache, key, request_time.clone())
237-
.ok_or(anyhow!("Message not found"))
238-
})
238+
retrieve_message_state(&self.cache, key, request_time.clone())
239239
})
240-
.collect()
240+
}))
241+
.await
242+
.into_iter()
243+
.collect::<Option<Vec<_>>>()
244+
.ok_or(anyhow!("Message not found"))
241245
}
242246

243247
async fn store_accumulator_messages(

0 commit comments

Comments
 (0)