@@ -26,12 +26,13 @@ use {
26
26
ProofSet ,
27
27
UnixTimestamp ,
28
28
} ,
29
- wormhole:: parse_and_verify_vaa ,
29
+ wormhole:: verify_vaa ,
30
30
} ,
31
31
anyhow:: {
32
32
anyhow,
33
33
Result ,
34
34
} ,
35
+ moka:: future:: Cache ,
35
36
pyth_oracle:: {
36
37
Message ,
37
38
MessageType ,
45
46
collections:: HashSet ,
46
47
sync:: Arc ,
47
48
time:: {
49
+ Duration ,
48
50
SystemTime ,
49
51
UNIX_EPOCH ,
50
52
} ,
57
59
Address ,
58
60
Chain ,
59
61
GuardianAddress ,
62
+ Vaa ,
60
63
} ,
61
64
} ;
62
65
@@ -66,15 +69,20 @@ pub mod types;
66
69
pub mod wormhole;
67
70
68
71
pub struct Store {
69
- pub storage : StorageInstance ,
70
- pub guardian_set : RwLock < Option < Vec < GuardianAddress > > > ,
71
- pub update_tx : Sender < ( ) > ,
72
+ pub storage : StorageInstance ,
73
+ pub observed_vaa_seqs : Cache < u64 , bool > ,
74
+ pub guardian_set : RwLock < Option < Vec < GuardianAddress > > > ,
75
+ pub update_tx : Sender < ( ) > ,
72
76
}
73
77
74
78
impl Store {
75
79
pub fn new_with_local_cache ( update_tx : Sender < ( ) > , cache_size : u64 ) -> Arc < Self > {
76
80
Arc :: new ( Self {
77
81
storage : storage:: local_storage:: LocalStorage :: new_instance ( cache_size) ,
82
+ observed_vaa_seqs : Cache :: builder ( )
83
+ . max_capacity ( cache_size)
84
+ . time_to_live ( Duration :: from_secs ( 60 * 5 ) )
85
+ . build ( ) ,
78
86
guardian_set : RwLock :: new ( None ) ,
79
87
update_tx,
80
88
} )
@@ -84,22 +92,32 @@ impl Store {
84
92
pub async fn store_update ( & self , update : Update ) -> Result < ( ) > {
85
93
let slot = match update {
86
94
Update :: Vaa ( vaa_bytes) => {
87
- let body = parse_and_verify_vaa ( self , & vaa_bytes) . await ;
88
- let body = match body {
89
- Ok ( body) => body,
95
+ let vaa =
96
+ serde_wormhole:: from_slice :: < Vaa < & serde_wormhole:: RawMessage > > ( & vaa_bytes) ?;
97
+
98
+ if vaa. emitter_chain != Chain :: Pythnet
99
+ || vaa. emitter_address != Address ( pythnet_sdk:: ACCUMULATOR_EMITTER_ADDRESS )
100
+ {
101
+ return Ok ( ( ) ) ; // Ignore VAA from other emitters
102
+ }
103
+
104
+ if self . observed_vaa_seqs . get ( & vaa. sequence ) . is_some ( ) {
105
+ return Ok ( ( ) ) ; // Ignore VAA if we have already seen it
106
+ }
107
+
108
+ let vaa = verify_vaa ( self , vaa) . await ;
109
+
110
+ let vaa = match vaa {
111
+ Ok ( vaa) => vaa,
90
112
Err ( err) => {
91
113
log:: info!( "Ignoring invalid VAA: {:?}" , err) ;
92
114
return Ok ( ( ) ) ;
93
115
}
94
116
} ;
95
117
96
- if body. emitter_chain != Chain :: Pythnet
97
- || body. emitter_address != Address ( pythnet_sdk:: ACCUMULATOR_EMITTER_ADDRESS )
98
- {
99
- return Ok ( ( ) ) ; // Ignore VAA from other emitters
100
- }
118
+ self . observed_vaa_seqs . insert ( vaa. sequence , true ) . await ;
101
119
102
- match WormholeMessage :: try_from_bytes ( body . payload ) ?. payload {
120
+ match WormholeMessage :: try_from_bytes ( vaa . payload ) ?. payload {
103
121
WormholePayload :: Merkle ( proof) => {
104
122
log:: info!( "Storing merkle proof for slot {:?}" , proof. slot, ) ;
105
123
store_wormhole_merkle_verified_message ( self , proof. clone ( ) , vaa_bytes)
0 commit comments