1
- use borsh:: BorshDeserialize ;
2
- use observation:: { Body , SignedBody } ;
3
- use posted_message:: PostedMessageUnreliableData ;
4
- use secp256k1:: SecretKey ;
5
- use solana_account_decoder:: UiAccountEncoding ;
6
- use solana_client:: {
7
- nonblocking:: pubsub_client:: PubsubClient , pubsub_client:: PubsubClientError , rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig }
1
+ use {
2
+ borsh:: BorshDeserialize ,
3
+ clap:: Parser ,
4
+ core:: panic,
5
+ observation:: {
6
+ Body ,
7
+ SignedBody ,
8
+ } ,
9
+ posted_message:: PostedMessageUnreliableData ,
10
+ secp256k1:: SecretKey ,
11
+ solana_account_decoder:: UiAccountEncoding ,
12
+ solana_client:: {
13
+ nonblocking:: pubsub_client:: PubsubClient ,
14
+ pubsub_client:: PubsubClientError ,
15
+ rpc_config:: {
16
+ RpcAccountInfoConfig ,
17
+ RpcProgramAccountsConfig ,
18
+ } ,
19
+ } ,
20
+ solana_sdk:: pubkey:: Pubkey ,
21
+ std:: {
22
+ fs,
23
+ str:: FromStr ,
24
+ time:: Duration ,
25
+ } ,
26
+ tokio:: time:: sleep,
27
+ tokio_stream:: StreamExt ,
8
28
} ;
9
- use solana_sdk:: pubkey:: Pubkey ;
10
- use tokio:: time:: sleep;
11
- use core:: panic;
12
- use std:: { fs, path:: PathBuf , str:: FromStr , time:: Duration } ;
13
- use tokio_stream:: StreamExt ;
14
- use clap:: Parser ;
15
29
16
- mod posted_message ;
30
+ mod config ;
17
31
mod observation;
32
+ mod posted_message;
18
33
mod serde_array;
19
- mod config;
20
34
21
35
const PYTHNET_CHAIN_ID : u16 = 26 ;
22
36
23
37
struct ListenerConfig {
24
- ws_url : String ,
25
- secret_key : SecretKey ,
26
- wormhole_pid : Pubkey ,
38
+ ws_url : String ,
39
+ secret_key : SecretKey ,
40
+ wormhole_pid : Pubkey ,
27
41
accumulator_address : Pubkey ,
28
42
}
29
43
30
- fn find_message_pda (
31
- wormhole_pid : & Pubkey ,
32
- ring_index : u32 ,
33
- ) -> Pubkey {
44
+ fn find_message_pda ( wormhole_pid : & Pubkey , ring_index : u32 ) -> Pubkey {
34
45
Pubkey :: find_program_address (
35
46
& [ b"AccumulatorMessage" , & ring_index. to_be_bytes ( ) ] ,
36
47
wormhole_pid,
37
- ) . 0
48
+ )
49
+ . 0
38
50
}
39
51
40
52
async fn run_listener ( config : ListenerConfig ) -> Result < ( ) , PubsubClientError > {
41
53
let client = PubsubClient :: new ( config. ws_url . as_str ( ) ) . await ?;
42
- let ( mut stream, unsubscribe) = client. program_subscribe (
43
- & config. wormhole_pid ,
44
- Some ( RpcProgramAccountsConfig {
45
- filters : None ,
46
- account_config : RpcAccountInfoConfig {
47
- encoding : Some ( UiAccountEncoding :: Base64 ) ,
48
- data_slice : None ,
49
- commitment : Some ( solana_sdk:: commitment_config:: CommitmentConfig :: confirmed ( ) ) ,
50
- min_context_slot : None ,
51
- } ,
52
- with_context : None ,
53
- sort_results : None
54
- } ) ,
55
- )
56
- . await ?;
54
+ let ( mut stream, unsubscribe) = client
55
+ . program_subscribe (
56
+ & config. wormhole_pid ,
57
+ Some ( RpcProgramAccountsConfig {
58
+ filters : None ,
59
+ account_config : RpcAccountInfoConfig {
60
+ encoding : Some ( UiAccountEncoding :: Base64 ) ,
61
+ data_slice : None ,
62
+ commitment : Some (
63
+ solana_sdk:: commitment_config:: CommitmentConfig :: confirmed ( ) ,
64
+ ) ,
65
+ min_context_slot : None ,
66
+ } ,
67
+ with_context : None ,
68
+ sort_results : None ,
69
+ } ) ,
70
+ )
71
+ . await ?;
57
72
58
73
while let Some ( update) = stream. next ( ) . await {
59
- let message_pda = find_message_pda (
60
- & config. wormhole_pid ,
61
- ( update. context . slot % 10_000 ) as u32
62
- ) ;
74
+ let message_pda =
75
+ find_message_pda ( & config. wormhole_pid , ( update. context . slot % 10_000 ) as u32 ) ;
63
76
if message_pda. to_string ( ) != update. value . pubkey {
64
77
continue ; // Skip updates that are not for the expected PDA
65
78
}
66
79
67
- let unreliable_data: Option < PostedMessageUnreliableData > = update. value . account . data . decode ( ) . map ( |data| {
68
- BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . ok ( )
69
- } ) . flatten ( ) ;
80
+ let unreliable_data: Option < PostedMessageUnreliableData > = update
81
+ . value
82
+ . account
83
+ . data
84
+ . decode ( )
85
+ . and_then ( |data| BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . ok ( ) ) ;
70
86
71
87
if let Some ( unreliable_data) = unreliable_data {
72
88
if PYTHNET_CHAIN_ID != unreliable_data. emitter_chain {
@@ -77,13 +93,13 @@ async fn run_listener(config: ListenerConfig) -> Result<(), PubsubClientError> {
77
93
}
78
94
79
95
let body = Body {
80
- timestamp : unreliable_data. submission_time ,
81
- nonce : unreliable_data. nonce ,
82
- emitter_chain : unreliable_data. emitter_chain ,
83
- emitter_address : unreliable_data. emitter_address ,
84
- sequence : unreliable_data. sequence ,
96
+ timestamp : unreliable_data. submission_time ,
97
+ nonce : unreliable_data. nonce ,
98
+ emitter_chain : unreliable_data. emitter_chain ,
99
+ emitter_address : unreliable_data. emitter_address ,
100
+ sequence : unreliable_data. sequence ,
85
101
consistency_level : unreliable_data. consistency_level ,
86
- payload : unreliable_data. payload . clone ( ) ,
102
+ payload : unreliable_data. payload . clone ( ) ,
87
103
} ;
88
104
89
105
match body. sign ( config. secret_key . secret_bytes ( ) ) {
@@ -105,7 +121,9 @@ async fn run_listener(config: ListenerConfig) -> Result<(), PubsubClientError> {
105
121
unsubscribe ( ) . await
106
122
} ) ;
107
123
108
- Err ( PubsubClientError :: ConnectionClosed ( "Stream ended" . to_string ( ) ) )
124
+ Err ( PubsubClientError :: ConnectionClosed (
125
+ "Stream ended" . to_string ( ) ,
126
+ ) )
109
127
}
110
128
111
129
fn load_secret_key ( path : String ) -> SecretKey {
@@ -115,7 +133,10 @@ fn load_secret_key(path: String) -> SecretKey {
115
133
return SecretKey :: from_byte_array ( byte_array) . expect ( "Invalid secret key length" ) ;
116
134
}
117
135
118
- let content = fs:: read_to_string ( path) . expect ( "Invalid secret key file" ) . trim ( ) . to_string ( ) ;
136
+ let content = fs:: read_to_string ( path)
137
+ . expect ( "Invalid secret key file" )
138
+ . trim ( )
139
+ . to_string ( ) ;
119
140
if let Ok ( secret_key) = SecretKey :: from_str ( & content) {
120
141
return secret_key;
121
142
}
@@ -127,18 +148,24 @@ fn load_secret_key(path: String) -> SecretKey {
127
148
async fn main ( ) {
128
149
let run_options = config:: RunOptions :: parse ( ) ;
129
150
let secret_key = load_secret_key ( run_options. secret_key_path ) ;
130
- let client = PubsubClient :: new ( & run_options. pythnet_url ) . await . expect ( "Invalid WebSocket URL" ) ;
151
+ let client = PubsubClient :: new ( & run_options. pythnet_url )
152
+ . await
153
+ . expect ( "Invalid WebSocket URL" ) ;
131
154
drop ( client) ; // Drop the client to avoid holding the connection open
132
- let accumulator_address = Pubkey :: from_str ( & run_options. accumulator_address ) . expect ( "Invalid accumulator address" ) ;
133
- let wormhole_pid = Pubkey :: from_str ( & run_options. wormhole_pid ) . expect ( "Invalid Wormhole program ID" ) ;
155
+ let accumulator_address =
156
+ Pubkey :: from_str ( & run_options. accumulator_address ) . expect ( "Invalid accumulator address" ) ;
157
+ let wormhole_pid =
158
+ Pubkey :: from_str ( & run_options. wormhole_pid ) . expect ( "Invalid Wormhole program ID" ) ;
134
159
135
160
loop {
136
- if let Err ( e) = run_listener ( ListenerConfig {
161
+ if let Err ( e) = run_listener ( ListenerConfig {
137
162
ws_url : run_options. pythnet_url . clone ( ) ,
138
- secret_key,
139
- wormhole_pid,
163
+ secret_key,
164
+ wormhole_pid,
140
165
accumulator_address,
141
- } ) . await {
166
+ } )
167
+ . await
168
+ {
142
169
tracing:: error!( error = ?e, "Error listening to messages" ) ;
143
170
sleep ( Duration :: from_millis ( 200 ) ) . await ; // Wait before retrying
144
171
}
0 commit comments