@@ -60,8 +60,13 @@ impl<'a> From<&'a State> for &'a WormholeState {
60
60
61
61
#[ async_trait:: async_trait]
62
62
pub trait Wormhole : Aggregates {
63
- async fn store_vaa ( & self , sequence : u64 , vaa_bytes : Vec < u8 > ) ;
64
- async fn process_message ( & self , vaa_bytes : Vec < u8 > ) -> Result < ( ) > ;
63
+ async fn store_vaa ( & self , sequence : u64 , vaa_bytes : Vec < u8 > ) -> bool ;
64
+ /// Process a Wormhole message, extracting the VAA and storing it in the state.
65
+ /// If `always_verify` is false, it will check if the VAA has been seen before verifying it.
66
+ /// If true, it will verify the VAA even if it has been seen before.
67
+ /// Returns true if the message was processed successfully, false if it was already seen.
68
+ /// Throws an error if the VAA is invalid or cannot be processed.
69
+ async fn process_message ( & self , vaa_bytes : Vec < u8 > , always_verify : bool ) -> Result < bool > ;
65
70
async fn update_guardian_set ( & self , id : u32 , guardian_set : GuardianSet ) ;
66
71
}
67
72
@@ -80,27 +85,35 @@ where
80
85
}
81
86
82
87
#[ tracing:: instrument( skip( self , vaa_bytes) ) ]
83
- async fn store_vaa ( & self , sequence : u64 , vaa_bytes : Vec < u8 > ) {
88
+ async fn store_vaa ( & self , sequence : u64 , vaa_bytes : Vec < u8 > ) -> bool {
84
89
// Check VAA hasn't already been seen, this may have been checked previously
85
90
// but due to async nature it's possible other threads have mutated the state
86
91
// since this VAA started processing.
87
92
let mut observed_vaa_seqs = self . into ( ) . observed_vaa_seqs . write ( ) . await ;
88
93
if observed_vaa_seqs. contains ( & sequence) {
89
- return ;
94
+ return false ;
90
95
}
91
96
92
97
// Clear old cached VAA sequences.
93
98
while observed_vaa_seqs. len ( ) > OBSERVED_CACHE_SIZE {
94
99
observed_vaa_seqs. pop_first ( ) ;
95
100
}
96
101
102
+ observed_vaa_seqs. insert ( sequence) ;
103
+ // Drop the lock to allow other threads to access the state.
104
+ drop ( observed_vaa_seqs) ;
105
+
97
106
// Hand the VAA to the aggregate store.
98
- if let Err ( e) = Aggregates :: store_update ( self , Update :: Vaa ( vaa_bytes) ) . await {
99
- tracing:: error!( error = ?e, "Failed to store VAA in aggregate store." ) ;
107
+ match Aggregates :: store_update ( self , Update :: Vaa ( vaa_bytes) ) . await {
108
+ Ok ( is_stored) => is_stored,
109
+ Err ( e) => {
110
+ tracing:: error!( error = ?e, "Failed to store VAA in aggregate store." ) ;
111
+ false
112
+ }
100
113
}
101
114
}
102
115
103
- async fn process_message ( & self , vaa_bytes : Vec < u8 > ) -> Result < ( ) > {
116
+ async fn process_message ( & self , vaa_bytes : Vec < u8 > , always_verify : bool ) -> Result < bool > {
104
117
let vaa = serde_wormhole:: from_slice :: < Vaa < & RawMessage > > ( & vaa_bytes) ?;
105
118
106
119
// Log VAA Processing.
@@ -114,17 +127,19 @@ where
114
127
} ;
115
128
tracing:: info!( slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA" ) ;
116
129
117
- // Check VAA hasn't already been seen.
118
- ensure ! (
119
- !self
120
- . into( )
121
- . observed_vaa_seqs
122
- . read( )
123
- . await
124
- . contains( & vaa. sequence) ,
125
- "Previously observed VAA: {}" ,
126
- vaa. sequence
127
- ) ;
130
+ if !always_verify {
131
+ // Check VAA hasn't already been seen.
132
+ ensure ! (
133
+ !self
134
+ . into( )
135
+ . observed_vaa_seqs
136
+ . read( )
137
+ . await
138
+ . contains( & vaa. sequence) ,
139
+ "Previously observed VAA: {}" ,
140
+ vaa. sequence
141
+ ) ;
142
+ }
128
143
129
144
// Check VAA source is valid, we don't want to process other protocols VAAs.
130
145
validate_vaa_source ( & vaa) ?;
@@ -140,9 +155,7 @@ where
140
155
vaa,
141
156
) ?;
142
157
143
- // Finally, store the resulting VAA in Hermes.
144
- self . store_vaa ( vaa. sequence , vaa_bytes) . await ;
145
- Ok ( ( ) )
158
+ Ok ( self . store_vaa ( vaa. sequence , vaa_bytes) . await )
146
159
}
147
160
}
148
161
// Rejects VAAs from invalid sources.
0 commit comments