6
6
crate :: store:: {
7
7
types:: {
8
8
AccumulatorMessages ,
9
- RawMessage ,
10
9
Update ,
11
10
} ,
12
11
Store ,
@@ -21,18 +20,19 @@ use {
21
20
RpcAccountInfoConfig ,
22
21
RpcProgramAccountsConfig ,
23
22
} ,
23
+ rpc_filter:: {
24
+ Memcmp ,
25
+ RpcFilterType ,
26
+ } ,
24
27
} ,
25
28
solana_sdk:: {
26
29
account:: Account ,
27
30
commitment_config:: CommitmentConfig ,
28
31
pubkey:: Pubkey ,
29
32
system_program,
30
33
} ,
31
- std:: ops:: Rem ,
32
34
} ;
33
35
34
- const RING_SIZE : u32 = 10_000 ;
35
-
36
36
pub async fn spawn ( pythnet_ws_endpoint : String , store : Store ) -> Result < ( ) > {
37
37
let client = PubsubClient :: new ( pythnet_ws_endpoint. as_ref ( ) ) . await ?;
38
38
@@ -42,6 +42,10 @@ pub async fn spawn(pythnet_ws_endpoint: String, store: Store) -> Result<()> {
42
42
encoding : Some ( UiAccountEncoding :: Base64Zstd ) ,
43
43
..Default :: default ( )
44
44
} ,
45
+ filters : Some ( vec ! [ RpcFilterType :: Memcmp ( Memcmp :: new_raw_bytes(
46
+ 0 ,
47
+ b"PAS1" . to_vec( ) ,
48
+ ) ) ] ) ,
45
49
with_context : Some ( true ) ,
46
50
..Default :: default ( )
47
51
} ;
@@ -55,38 +59,36 @@ pub async fn spawn(pythnet_ws_endpoint: String, store: Store) -> Result<()> {
55
59
log:: debug!( "Received Pythnet update: {:?}" , update) ;
56
60
57
61
if let Some ( update) = update {
58
- // Check whether this account matches the state for this slot
59
- // FIXME this is hardcoded for localnet, we need to remove it from the code
60
- let pyth = Pubkey :: try_from ( "7th6GdMuo4u1zNLzFAyMY6psunHNsGjPjo8hXvcTgKei" ) . unwrap ( ) ;
61
-
62
- let accumulator_slot = update. context . slot - 1 ;
63
-
64
- // Apparently we get the update for the previous slot, so we need to subtract 1
65
- let ring_index = accumulator_slot. rem ( RING_SIZE as u64 ) as u32 ;
66
-
67
- let ( candidate, _) = Pubkey :: find_program_address (
68
- & [
69
- b"AccumulatorState" ,
70
- & pyth. to_bytes ( ) ,
71
- & ring_index. to_be_bytes ( ) ,
72
- ] ,
73
- & system_program:: id ( ) ,
74
- ) ;
75
-
76
- if candidate. to_string ( ) != update. value . pubkey {
77
- continue ;
78
- }
79
-
80
62
let account: Account = update. value . account . decode ( ) . unwrap ( ) ;
81
63
log:: debug!( "Received Accumulator update: {:?}" , account) ;
82
- let accumulator_messages = AccumulatorMessages {
83
- slot : accumulator_slot,
84
- messages : Vec :: < RawMessage > :: try_from_slice ( account. data . as_ref ( ) ) ?,
85
- } ;
86
64
87
- store
88
- . store_update ( Update :: AccumulatorMessages ( accumulator_messages) )
89
- . await ?;
65
+ let accumulator_messages = AccumulatorMessages :: try_from_slice ( & account. data ) ;
66
+ match accumulator_messages {
67
+ Ok ( accumulator_messages) => {
68
+ let ( candidate, _) = Pubkey :: find_program_address (
69
+ & [
70
+ b"AccumulatorState" ,
71
+ & accumulator_messages. ring_index ( ) . to_be_bytes ( ) ,
72
+ ] ,
73
+ & system_program:: id ( ) ,
74
+ ) ;
75
+
76
+ if candidate. to_string ( ) == update. value . pubkey {
77
+ store
78
+ . store_update ( Update :: AccumulatorMessages ( accumulator_messages) )
79
+ . await ?;
80
+ } else {
81
+ log:: error!(
82
+ "Failed to verify the messages public key: {:?} != {:?}" ,
83
+ candidate,
84
+ update. value. pubkey
85
+ ) ;
86
+ }
87
+ }
88
+ Err ( err) => {
89
+ log:: error!( "Failed to parse AccumulatorMessages: {:?}" , err) ;
90
+ }
91
+ } ;
90
92
}
91
93
}
92
94
}
0 commit comments