@@ -36,6 +36,10 @@ use std::{
3636 borrow:: Cow ,
3737 sync:: atomic:: { AtomicU32 , Ordering } ,
3838} ;
39+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
40+ use std:: sync:: { Arc , OnceLock , RwLock , atomic:: AtomicBool } ;
41+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
42+ use snarkvm_slipstream_plugin_manager:: SlipstreamPluginManager ;
3943
4044/// TODO (howardwu): Remove this.
4145/// Returns the mapping ID for the given `program ID` and `mapping name`.
@@ -249,6 +253,7 @@ pub trait FinalizeStorage<N: Network>: 'static + Clone + Send + Sync {
249253 Ok ( FinalizeOperation :: InitializeMapping ( to_mapping_id ( & program_id, & mapping_name) ?) )
250254 }
251255
256+ // NOTE: THIS IS NEVER USED IN PROD
252257 /// Stores the given `(key, value)` pair at the given `program ID` and `mapping name` in storage.
253258 /// If the `mapping name` is not initialized, an error is returned.
254259 /// If the `key` already exists, the method returns an error.
@@ -279,6 +284,9 @@ pub trait FinalizeStorage<N: Network>: 'static + Clone + Send + Sync {
279284 // Update the historical maps.
280285 #[ cfg( feature = "history" ) ]
281286 {
287+ // TODO: TODO: Is here where we would want to stream the data?
288+ // NOTE: MAYBE WE WOULD WANT TO PUSH TO A BUFFER HERE AND THEN STREAM IT
289+ // IN ATOMIC_POST_RATIFY()
282290 let current_height = self . current_block_height( ) . load( Ordering :: SeqCst ) ;
283291
284292 // Insert the initial value as the first historical update.
@@ -329,9 +337,10 @@ pub trait FinalizeStorage<N: Network>: 'static + Clone + Send + Sync {
329337 let value_id = N :: hash_bhp1024 ( & ( key_id, N :: hash_bhp1024 ( & value. to_bits_le ( ) ) ?) . to_bits_le ( ) ) ?;
330338
331339 atomic_batch_scope ! ( self , {
332- // Update the historical maps.
340+ // Update the historical maps. // NOTE: THIS GETS CALLED IN vm/finalize.rs, TODO: STREAM HERE
333341 #[ cfg( feature = "history" ) ]
334342 {
343+ // TODO: Add to buffer here for optional streaming?
335344 let current_height = self . current_block_height( ) . load( Ordering :: SeqCst ) ;
336345
337346 // Register the updated value at the current height.
@@ -654,6 +663,14 @@ pub struct FinalizeStore<N: Network, P: FinalizeStorage<N>> {
654663 storage : P ,
655664 /// PhantomData.
656665 _phantom : PhantomData < N > ,
666+ /// Indicates that canonical finalize is currently in progress.
667+ /// When `true`, storage writes notify registered Slipstream plugins.
668+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
669+ is_finalize_mode : Arc < AtomicBool > ,
670+ /// Optional plugin manager for streaming canonical mapping and staking updates.
671+ /// Uses `OnceLock` so it can be installed from a shared reference after construction.
672+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
673+ slipstream_plugin_manager : OnceLock < Arc < RwLock < SlipstreamPluginManager > > > ,
657674}
658675
659676impl < N : Network , P : FinalizeStorage < N > > FinalizeStore < N , P > {
@@ -665,7 +682,14 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> {
665682 /// Initializes a finalize store from storage.
666683 pub fn from ( storage : P ) -> Result < Self > {
667684 // Return the finalize store.
668- Ok ( Self { storage, _phantom : PhantomData } )
685+ Ok ( Self {
686+ storage,
687+ _phantom : PhantomData ,
688+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
689+ is_finalize_mode : Arc :: new ( AtomicBool :: new ( false ) ) ,
690+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
691+ slipstream_plugin_manager : OnceLock :: new ( ) ,
692+ } )
669693 }
670694
671695 /// Starts an atomic batch write operation.
@@ -714,6 +738,61 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> {
714738 self . storage . current_block_height ( )
715739 }
716740
741+ /// Returns a reference to the canonical finalize mode flag.
742+ ///
743+ /// When `true`, storage writes notify registered Slipstream plugins.
744+ /// Set to `true` by the VM before canonical finalize runs and reset to `false` afterwards.
745+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
746+ pub fn is_finalize_mode ( & self ) -> & Arc < AtomicBool > {
747+ & self . is_finalize_mode
748+ }
749+
750+ /// Installs a Slipstream plugin manager to receive canonical mapping and staking updates.
751+ ///
752+ /// May be called from a shared reference. Logs a warning if called more than once.
753+ #[ cfg( any( feature = "history" , feature = "history-staking-rewards" ) ) ]
754+ pub fn set_slipstream_plugin_manager ( & self , manager : Arc < RwLock < SlipstreamPluginManager > > ) {
755+ if self . slipstream_plugin_manager . set ( manager) . is_err ( ) {
756+ tracing:: warn!( "Slipstream plugin manager is already set; ignoring subsequent call." ) ;
757+ }
758+ }
759+
760+ /// Notifies all interested plugins of a staking reward, if canonical finalize is active.
761+ ///
762+ /// Errors from plugin calls are logged but never propagated.
763+ #[ cfg( feature = "history-staking-rewards" ) ]
764+ pub fn notify_staking_reward (
765+ & self ,
766+ staker : & Address < N > ,
767+ validator : & Address < N > ,
768+ reward : u64 ,
769+ new_stake : u64 ,
770+ block_height : u32 ,
771+ ) {
772+ if !self . is_finalize_mode . load ( std:: sync:: atomic:: Ordering :: SeqCst ) {
773+ return ;
774+ }
775+ if let Some ( mgr) = self . slipstream_plugin_manager . get ( ) {
776+ let staker_bytes = match staker. to_bytes_le ( ) {
777+ Ok ( b) => b,
778+ Err ( e) => {
779+ tracing:: warn!( "Slipstream: failed to serialize staker address: {e}" ) ;
780+ return ;
781+ }
782+ } ;
783+ let validator_bytes = match validator. to_bytes_le ( ) {
784+ Ok ( b) => b,
785+ Err ( e) => {
786+ tracing:: warn!( "Slipstream: failed to serialize validator address: {e}" ) ;
787+ return ;
788+ }
789+ } ;
790+ mgr. read ( )
791+ . unwrap ( )
792+ . notify_staking_reward ( & staker_bytes, & validator_bytes, reward, new_stake, block_height) ;
793+ }
794+ }
795+
717796 /// Returns the historical value of a mapping.
718797 #[ cfg( feature = "history" ) ]
719798 pub fn get_historical_mapping_value (
@@ -827,7 +906,32 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStoreTrait<N> for FinalizeStore<
827906 key : Plaintext < N > ,
828907 value : Value < N > ,
829908 ) -> Result < FinalizeOperation < N > > {
830- self . storage . update_key_value ( program_id, mapping_name, key, value)
909+ // Serialize before moving, if a plugin notification may be needed.
910+ #[ cfg( feature = "history" ) ]
911+ let plugin_data =
912+ if self . is_finalize_mode . load ( Ordering :: SeqCst ) && self . slipstream_plugin_manager . get ( ) . is_some ( ) {
913+ Some ( (
914+ program_id. to_bytes_le ( ) ?,
915+ mapping_name. to_bytes_le ( ) ?,
916+ key. to_bytes_le ( ) ?,
917+ value. to_bytes_le ( ) ?,
918+ ) )
919+ } else {
920+ None
921+ } ;
922+
923+ let result = self . storage . update_key_value ( program_id, mapping_name, key, value) ?;
924+
925+ // Notify plugins of the update if in canonical finalize mode.
926+ #[ cfg( feature = "history" ) ]
927+ if let Some ( ( pid, mname, k, v) ) = plugin_data {
928+ let height = self . storage . current_block_height ( ) . load ( Ordering :: SeqCst ) ;
929+ if let Some ( mgr) = self . slipstream_plugin_manager . get ( ) {
930+ mgr. read ( ) . unwrap ( ) . notify_mapping_update ( & pid, & mname, & k, & v, height) ;
931+ }
932+ }
933+
934+ Ok ( result)
831935 }
832936
833937 /// Removes the key-value pair for the given `program ID`, `mapping name`, and `key` from storage.
@@ -860,7 +964,35 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> {
860964 mapping_name : Identifier < N > ,
861965 entries : Vec < ( Plaintext < N > , Value < N > ) > ,
862966 ) -> Result < FinalizeOperation < N > > {
863- self . storage . replace_mapping ( program_id, mapping_name, entries)
967+ // Serialize mapping identity and all entries before moving them into storage,
968+ // so they are available for plugin notification after the storage call.
969+ #[ cfg( feature = "history" ) ]
970+ let plugin_data: Option < ( Vec < u8 > , Vec < u8 > , Vec < ( Vec < u8 > , Vec < u8 > ) > ) > =
971+ if self . is_finalize_mode . load ( Ordering :: SeqCst ) && self . slipstream_plugin_manager . get ( ) . is_some ( ) {
972+ let mut serialized_entries = Vec :: with_capacity ( entries. len ( ) ) ;
973+ for ( key, value) in & entries {
974+ serialized_entries. push ( ( key. to_bytes_le ( ) ?, value. to_bytes_le ( ) ?) ) ;
975+ }
976+ Some ( ( program_id. to_bytes_le ( ) ?, mapping_name. to_bytes_le ( ) ?, serialized_entries) )
977+ } else {
978+ None
979+ } ;
980+
981+ let result = self . storage . replace_mapping ( program_id, mapping_name, entries) ?;
982+
983+ // Notify plugins of each updated key-value pair if in canonical finalize mode.
984+ #[ cfg( feature = "history" ) ]
985+ if let Some ( ( pid, mname, serialized_entries) ) = plugin_data {
986+ let height = self . storage . current_block_height ( ) . load ( Ordering :: SeqCst ) ;
987+ if let Some ( mgr) = self . slipstream_plugin_manager . get ( ) {
988+ let mgr_guard = mgr. read ( ) . unwrap ( ) ;
989+ for ( k, v) in & serialized_entries {
990+ mgr_guard. notify_mapping_update ( & pid, & mname, k, v, height) ;
991+ }
992+ }
993+ }
994+
995+ Ok ( result)
864996 }
865997
866998 /// Removes the mapping for the given `program ID` and `mapping name` from storage,
0 commit comments