1
1
use {
2
2
borsh:: BorshDeserialize ,
3
3
clap:: Parser ,
4
- core:: panic,
5
- observation:: {
6
- Body ,
7
- SignedBody ,
8
- } ,
9
4
posted_message:: PostedMessageUnreliableData ,
10
5
secp256k1:: SecretKey ,
6
+ signed_body:: SignedBody ,
11
7
solana_account_decoder:: UiAccountEncoding ,
12
8
solana_client:: {
13
9
nonblocking:: pubsub_client:: PubsubClient ,
16
12
RpcAccountInfoConfig ,
17
13
RpcProgramAccountsConfig ,
18
14
} ,
15
+ rpc_filter:: {
16
+ Memcmp ,
17
+ RpcFilterType ,
18
+ } ,
19
19
} ,
20
20
solana_sdk:: pubkey:: Pubkey ,
21
21
std:: {
@@ -25,14 +25,16 @@ use {
25
25
} ,
26
26
tokio:: time:: sleep,
27
27
tokio_stream:: StreamExt ,
28
+ wormhole_sdk:: {
29
+ vaa:: Body ,
30
+ Address ,
31
+ Chain ,
32
+ } ,
28
33
} ;
29
34
30
35
mod config;
31
- mod observation;
32
36
mod posted_message;
33
- mod serde_array;
34
-
35
- const PYTHNET_CHAIN_ID : u16 = 26 ;
37
+ mod signed_body;
36
38
37
39
struct ListenerConfig {
38
40
ws_url : String ,
@@ -41,7 +43,8 @@ struct ListenerConfig {
41
43
accumulator_address : Pubkey ,
42
44
}
43
45
44
- fn find_message_pda ( wormhole_pid : & Pubkey , ring_index : u32 ) -> Pubkey {
46
+ fn find_message_pda ( wormhole_pid : & Pubkey , slot : u64 ) -> Pubkey {
47
+ let ring_index = ( slot % 10_000 ) as u32 ;
45
48
Pubkey :: find_program_address (
46
49
& [ b"AccumulatorMessage" , & ring_index. to_be_bytes ( ) ] ,
47
50
wormhole_pid,
@@ -55,7 +58,10 @@ async fn run_listener(config: ListenerConfig) -> Result<(), PubsubClientError> {
55
58
. program_subscribe (
56
59
& config. wormhole_pid ,
57
60
Some ( RpcProgramAccountsConfig {
58
- filters : None ,
61
+ filters : Some ( vec ! [ RpcFilterType :: Memcmp ( Memcmp :: new(
62
+ 0 ,
63
+ solana_client:: rpc_filter:: MemcmpEncodedBytes :: Bytes ( b"msu" . to_vec( ) ) ,
64
+ ) ) ] ) ,
59
65
account_config : RpcAccountInfoConfig {
60
66
encoding : Some ( UiAccountEncoding :: Base64 ) ,
61
67
data_slice : None ,
@@ -71,55 +77,54 @@ async fn run_listener(config: ListenerConfig) -> Result<(), PubsubClientError> {
71
77
. await ?;
72
78
73
79
while let Some ( update) = stream. next ( ) . await {
74
- let message_pda =
75
- find_message_pda ( & config . wormhole_pid , ( update. context . slot % 10_000 ) as u32 ) ;
76
- if message_pda . to_string ( ) != update . value . pubkey {
80
+ if find_message_pda ( & config . wormhole_pid , update . context . slot ) . to_string ( )
81
+ != update. value . pubkey
82
+ {
77
83
continue ; // Skip updates that are not for the expected PDA
78
84
}
79
85
80
- let unreliable_data: Option < PostedMessageUnreliableData > = update
81
- . value
82
- . account
83
- . data
84
- . decode ( )
85
- . and_then ( |data| BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . ok ( ) ) ;
86
-
87
- if let Some ( unreliable_data) = unreliable_data {
88
- if PYTHNET_CHAIN_ID != unreliable_data. emitter_chain {
89
- continue ;
90
- }
91
- if config. accumulator_address != Pubkey :: from ( unreliable_data. emitter_address ) {
92
- continue ;
93
- }
94
-
95
- let body = Body {
96
- timestamp : unreliable_data. submission_time ,
97
- nonce : unreliable_data. nonce ,
98
- emitter_chain : unreliable_data. emitter_chain ,
99
- emitter_address : unreliable_data. emitter_address ,
100
- sequence : unreliable_data. sequence ,
101
- consistency_level : unreliable_data. consistency_level ,
102
- payload : unreliable_data. payload . clone ( ) ,
86
+ let unreliable_data: PostedMessageUnreliableData = {
87
+ let data = match update. value . account . data . decode ( ) {
88
+ Some ( data) => data,
89
+ None => {
90
+ tracing:: error!( "Failed to decode account data" ) ;
91
+ continue ;
92
+ }
103
93
} ;
104
94
105
- match body. sign ( config. secret_key . secret_bytes ( ) ) {
106
- Ok ( signature) => {
107
- let signed_body = SignedBody {
108
- version : unreliable_data. vaa_version ,
109
- signature,
110
- body,
111
- } ;
112
- println ! ( "Signed Body: {:?}" , signed_body) ;
95
+ match BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) {
96
+ Ok ( data) => data,
97
+ Err ( e) => {
98
+ tracing:: error!( error = ?e, "Invalid unreliable data format" ) ;
99
+ continue ;
113
100
}
114
- Err ( e) => tracing:: error!( error = ?e, "Failed to sign body" ) ,
115
101
}
102
+ } ;
103
+
104
+ if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
105
+ continue ;
106
+ }
107
+ if config. accumulator_address != Pubkey :: from ( unreliable_data. emitter_address ) {
108
+ continue ;
116
109
}
110
+
111
+ let body = Body {
112
+ timestamp : unreliable_data. submission_time ,
113
+ nonce : unreliable_data. nonce ,
114
+ emitter_chain : unreliable_data. emitter_chain . into ( ) ,
115
+ emitter_address : Address ( unreliable_data. emitter_address ) ,
116
+ sequence : unreliable_data. sequence ,
117
+ consistency_level : unreliable_data. consistency_level ,
118
+ payload : unreliable_data. payload . clone ( ) ,
119
+ } ;
120
+
121
+ match SignedBody :: try_new ( body, config. secret_key ) {
122
+ Ok ( signed_body) => println ! ( "Signed Body: {:?}" , signed_body) ,
123
+ Err ( e) => tracing:: error!( error = ?e, "Failed to sign body" ) ,
124
+ } ;
117
125
}
118
126
119
- tokio:: spawn ( async move {
120
- // Wait for the stream to finish
121
- unsubscribe ( ) . await
122
- } ) ;
127
+ tokio:: spawn ( async move { unsubscribe ( ) . await } ) ;
123
128
124
129
Err ( PubsubClientError :: ConnectionClosed (
125
130
"Stream ended" . to_string ( ) ,
@@ -137,11 +142,7 @@ fn load_secret_key(path: String) -> SecretKey {
137
142
. expect ( "Invalid secret key file" )
138
143
. trim ( )
139
144
. to_string ( ) ;
140
- if let Ok ( secret_key) = SecretKey :: from_str ( & content) {
141
- return secret_key;
142
- }
143
-
144
- panic ! ( "Invalid secret key" ) ;
145
+ SecretKey :: from_str ( & content) . expect ( "Invalid secret key" )
145
146
}
146
147
147
148
#[ tokio:: main]
@@ -152,8 +153,8 @@ async fn main() {
152
153
. await
153
154
. expect ( "Invalid WebSocket URL" ) ;
154
155
drop ( client) ; // Drop the client to avoid holding the connection open
155
- let accumulator_address =
156
- Pubkey :: from_str ( & run_options . accumulator_address ) . expect ( "Invalid accumulator address" ) ;
156
+ let accumulator_address = Pubkey :: from_str ( "G9LV2mp9ua1znRAfYwZz5cPiJMAbo1T6mbjdQsDZuMJg" )
157
+ . expect ( "Invalid accumulator address" ) ;
157
158
let wormhole_pid =
158
159
Pubkey :: from_str ( & run_options. wormhole_pid ) . expect ( "Invalid Wormhole program ID" ) ;
159
160
0 commit comments