Skip to content

Commit 4fa9304

Browse files
ali-behjatiReisen
authored andcommitted
Move Message type extensions to pyth-client
1 parent c19f519 commit 4fa9304

File tree

8 files changed

+98
-100
lines changed

8 files changed

+98
-100
lines changed

hermes/Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ serde_qs = { version = "0.12.0", features = ["axum"] }
6464

6565
serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1"}
6666
wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
67-
pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "7d593d87e07a1e2486e7ca21597d664ee72be1ec", features = ["library"] }
67+
# pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "7d593d87e07a1e2486e7ca21597d664ee72be1ec", features = ["library"] }
68+
pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "319cdc1baade5c4780b830eaf927f9bfef89ee39" , features = ["library"] }
69+
6870

6971
strum = { version = "0.24", features = ["derive"] }
7072
ethabi = { version = "18.0.0", features = ["serde"] }

hermes/src/network/pythnet.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use {
1515
Result,
1616
},
1717
borsh::BorshDeserialize,
18+
byteorder::BE,
1819
futures::stream::StreamExt,
20+
pyth_oracle::Message,
1921
solana_account_decoder::UiAccountEncoding,
2022
solana_client::{
2123
nonblocking::pubsub_client::PubsubClient,
@@ -74,9 +76,44 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
7476
}
7577
};
7678

77-
let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
79+
// The validators writes the accumulator messages using Borsh with
80+
// the following struct. We cannot directly have messages as Vec<Messages>
81+
// because they are serialized using big-endian byte order and Borsh
82+
// uses little-endian byte order.
83+
#[derive(BorshDeserialize)]
84+
struct RawAccumulatorMessages {
85+
pub magic: [u8; 4],
86+
pub slot: u64,
87+
pub ring_size: u32,
88+
pub raw_messages: Vec<Vec<u8>>,
89+
}
90+
91+
let accumulator_messages = RawAccumulatorMessages::try_from_slice(&account.data);
7892
match accumulator_messages {
7993
Ok(accumulator_messages) => {
94+
let messages = accumulator_messages
95+
.raw_messages
96+
.iter()
97+
.map(|message| {
98+
pythnet_sdk::wire::from_slice::<BE, Message>(message.as_slice())
99+
})
100+
.collect::<Result<Vec<Message>, _>>();
101+
102+
let messages = match messages {
103+
Ok(messages) => messages,
104+
Err(err) => {
105+
log::error!("Failed to parse messages: {:?}", err);
106+
continue;
107+
}
108+
};
109+
110+
let accumulator_messages = AccumulatorMessages {
111+
magic: accumulator_messages.magic,
112+
slot: accumulator_messages.slot,
113+
ring_size: accumulator_messages.ring_size,
114+
messages,
115+
};
116+
80117
let (candidate, _) = Pubkey::find_program_address(
81118
&[
82119
b"AccumulatorState",

hermes/src/store.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use {
77
StorageInstance,
88
},
99
types::{
10-
MessageType,
1110
PriceFeedUpdate,
1211
PriceFeedsWithUpdateData,
1312
RequestTime,
@@ -33,7 +32,10 @@ use {
3332
anyhow,
3433
Result,
3534
},
36-
pyth_oracle::Message,
35+
pyth_oracle::{
36+
Message,
37+
MessageType,
38+
},
3739
pyth_sdk::PriceIdentifier,
3840
pythnet_sdk::wire::v1::{
3941
WormholeMessage,
@@ -160,12 +162,9 @@ impl Store {
160162
.messages
161163
.iter()
162164
.enumerate()
163-
.map(|(idx, raw_message)| {
164-
let message = Message::try_from_bytes(raw_message)?;
165-
165+
.map(|(idx, message)| {
166166
Ok(MessageState::new(
167-
message,
168-
raw_message.clone(),
167+
message.clone(),
169168
ProofSet {
170169
wormhole_merkle_proof: wormhole_merkle_message_states_proofs
171170
.get(idx)
@@ -232,7 +231,7 @@ impl Store {
232231
.message_state_keys()
233232
.await
234233
.iter()
235-
.map(|key| key.price_id)
234+
.map(|key| PriceIdentifier::new(key.id))
236235
.collect()
237236
}
238237
}

hermes/src/store/proof/wormhole_merkle.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,24 @@ pub fn construct_message_states_proofs(
7676
let accumulator_messages = &completed_accumulator_state.accumulator_messages;
7777
let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state;
7878

79+
let raw_messages = accumulator_messages
80+
.messages
81+
.iter()
82+
.map(|m| m.to_bytes())
83+
.collect::<Vec<Vec<u8>>>();
84+
7985
// Check whether the state is valid
80-
let merkle_acc = match MerkleAccumulator::<Keccak160>::from_set(
81-
accumulator_messages.messages.iter().map(|m| m.as_ref()),
82-
) {
83-
Some(merkle_acc) => merkle_acc,
84-
None => return Ok(vec![]), // It only happens when the message set is empty
85-
};
86+
let merkle_acc =
87+
match MerkleAccumulator::<Keccak160>::from_set(raw_messages.iter().map(|m| m.as_ref())) {
88+
Some(merkle_acc) => merkle_acc,
89+
None => return Ok(vec![]), // It only happens when the message set is empty
90+
};
8691

8792
if merkle_acc.root != wormhole_merkle_state.root.root {
8893
return Err(anyhow!("Invalid merkle root"));
8994
}
9095

91-
accumulator_messages
92-
.messages
96+
raw_messages
9397
.iter()
9498
.map(|m| {
9599
Ok(WormholeMerkleMessageProof {
@@ -126,7 +130,7 @@ pub fn construct_update_data(mut message_states: Vec<&MessageState>) -> Result<V
126130
updates: messages
127131
.iter()
128132
.map(|message| MerklePriceUpdate {
129-
message: message.raw_message.clone().into(),
133+
message: message.message.to_bytes().into(),
130134
proof: message.proof_set.wormhole_merkle_proof.proof.clone(),
131135
})
132136
.collect(),

hermes/src/store/storage.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@ use {
33
proof::wormhole_merkle::WormholeMerkleState,
44
types::{
55
AccumulatorMessages,
6-
MessageExt,
7-
MessageIdentifier,
8-
MessageType,
96
ProofSet,
10-
RawMessage,
117
RequestTime,
128
Slot,
139
UnixTimestamp,
@@ -18,7 +14,10 @@ use {
1814
Result,
1915
},
2016
async_trait::async_trait,
21-
pyth_oracle::Message,
17+
pyth_oracle::{
18+
Message,
19+
MessageType,
20+
},
2221
pyth_sdk::PriceIdentifier,
2322
};
2423

@@ -56,6 +55,12 @@ impl TryFrom<AccumulatorState> for CompletedAccumulatorState {
5655
}
5756
}
5857

58+
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
59+
pub struct MessageStateKey {
60+
pub id: [u8; 32],
61+
pub type_: MessageType,
62+
}
63+
5964
#[derive(Clone, PartialEq, Eq, Debug, PartialOrd, Ord)]
6065
pub struct MessageStateTime {
6166
pub publish_time: UnixTimestamp,
@@ -64,40 +69,36 @@ pub struct MessageStateTime {
6469

6570
#[derive(Clone, PartialEq, Debug)]
6671
pub struct MessageState {
67-
pub publish_time: UnixTimestamp,
68-
pub slot: Slot,
69-
pub id: MessageIdentifier,
70-
pub message: Message,
71-
pub raw_message: RawMessage,
72-
pub proof_set: ProofSet,
73-
pub received_at: UnixTimestamp,
72+
pub slot: Slot,
73+
pub message: Message,
74+
pub proof_set: ProofSet,
75+
pub received_at: UnixTimestamp,
7476
}
7577

7678
impl MessageState {
7779
pub fn time(&self) -> MessageStateTime {
7880
MessageStateTime {
79-
publish_time: self.publish_time,
81+
publish_time: self.message.publish_time(),
8082
slot: self.slot,
8183
}
8284
}
8385

84-
pub fn key(&self) -> MessageIdentifier {
85-
self.id.clone()
86+
pub fn key(&self) -> MessageStateKey {
87+
MessageStateKey {
88+
id: self.message.id(),
89+
type_: self.message.into(),
90+
}
8691
}
8792

8893
pub fn new(
8994
message: Message,
90-
raw_message: RawMessage,
9195
proof_set: ProofSet,
9296
slot: Slot,
9397
received_at: UnixTimestamp,
9498
) -> Self {
9599
Self {
96-
publish_time: message.publish_time(),
97100
slot,
98-
id: message.id(),
99101
message,
100-
raw_message,
101102
proof_set,
102103
received_at,
103104
}
@@ -119,7 +120,7 @@ pub enum MessageStateFilter {
119120
/// key for the update data they wish to access.
120121
#[async_trait]
121122
pub trait Storage: Send + Sync {
122-
async fn message_state_keys(&self) -> Vec<MessageIdentifier>;
123+
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
123124
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
124125
async fn fetch_message_states(
125126
&self,

hermes/src/store/storage/local_storage.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
11
use {
22
super::{
33
AccumulatorState,
4-
MessageIdentifier,
54
MessageState,
65
MessageStateFilter,
6+
MessageStateKey,
77
RequestTime,
88
Storage,
99
StorageInstance,
1010
},
11-
crate::store::types::{
12-
MessageType,
13-
Slot,
14-
},
11+
crate::store::types::Slot,
1512
anyhow::{
1613
anyhow,
1714
Result,
1815
},
1916
async_trait::async_trait,
2017
dashmap::DashMap,
2118
moka::sync::Cache,
19+
pyth_oracle::MessageType,
2220
pyth_sdk::PriceIdentifier,
2321
std::{
2422
collections::VecDeque,
@@ -29,7 +27,7 @@ use {
2927

3028
#[derive(Clone)]
3129
pub struct LocalStorage {
32-
message_cache: Arc<DashMap<MessageIdentifier, VecDeque<MessageState>>>,
30+
message_cache: Arc<DashMap<MessageStateKey, VecDeque<MessageState>>>,
3331
accumulator_cache: Cache<Slot, AccumulatorState>,
3432
cache_size: u64,
3533
}
@@ -48,7 +46,7 @@ impl LocalStorage {
4846

4947
fn retrieve_message_state(
5048
&self,
51-
key: MessageIdentifier,
49+
key: MessageStateKey,
5250
request_time: RequestTime,
5351
) -> Option<MessageState> {
5452
match self.message_cache.get(&key) {
@@ -135,9 +133,9 @@ impl Storage for LocalStorage {
135133
};
136134

137135
message_types.into_iter().map(move |message_type| {
138-
let key = MessageIdentifier {
139-
price_id: id,
140-
type_: message_type,
136+
let key = MessageStateKey {
137+
id: id.to_bytes(),
138+
type_: message_type,
141139
};
142140
self.retrieve_message_state(key, request_time.clone())
143141
.ok_or(anyhow!("Message not found"))
@@ -146,7 +144,7 @@ impl Storage for LocalStorage {
146144
.collect()
147145
}
148146

149-
async fn message_state_keys(&self) -> Vec<MessageIdentifier> {
147+
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
150148
self.message_cache
151149
.iter()
152150
.map(|entry| entry.key().clone())

0 commit comments

Comments
 (0)