-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Slipstream Plugin Integration/Interface #3190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from all commits
c3c2429
0b91215
027a403
2820dfc
c96eba5
9cd55f8
4cccb46
2a2888e
68b234c
f4af5c6
a7af4ce
ca43ff7
5666186
cf0bed8
32dfed1
0c9105d
802c996
17e347c
3cf0ffa
a7b7cac
939d34b
fc27c57
261abcb
bc5039f
0d0aa19
6dd2b78
38f5714
0246766
3ac009c
74e2950
e22c364
bc2c61f
933fbf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,11 +31,17 @@ use aleo_std_storage::StorageMode; | |
| use anyhow::Result; | ||
| use core::marker::PhantomData; | ||
| use indexmap::IndexSet; | ||
| #[cfg(feature = "history")] | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| use snarkvm_slipstream_plugin_manager::SlipstreamPluginManager; | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| use std::sync::{Arc, OnceLock, RwLock, atomic::AtomicBool}; | ||
| #[cfg(any(feature = "history", feature = "history-staking-rewards"))] | ||
| use std::{ | ||
| borrow::Cow, | ||
| sync::atomic::{AtomicU32, Ordering}, | ||
| }; | ||
| #[cfg(all(feature = "history", feature = "slipstream-plugins"))] | ||
| type SerializedMappingEntries = Option<(Vec<u8>, Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>)>; | ||
|
|
||
| /// TODO (howardwu): Remove this. | ||
| /// Returns the mapping ID for the given `program ID` and `mapping name`. | ||
|
|
@@ -654,6 +660,15 @@ pub struct FinalizeStore<N: Network, P: FinalizeStorage<N>> { | |
| storage: P, | ||
| /// PhantomData. | ||
| _phantom: PhantomData<N>, | ||
| /// Indicates that canonical finalize is currently in progress. | ||
| /// When `true`, storage writes notify registered Slipstream plugins. | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| is_finalize_mode: Arc<AtomicBool>, | ||
| /// Optional plugin manager for streaming canonical mapping and staking updates. | ||
| /// Wrapped in `Arc` so that all clones of `FinalizeStore` share the same cell; the inner | ||
| /// `OnceLock` ensures it can be installed from a shared reference after construction. | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| slipstream_plugin_manager: Arc<OnceLock<Arc<RwLock<SlipstreamPluginManager>>>>, | ||
| } | ||
|
|
||
| impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> { | ||
|
|
@@ -665,7 +680,14 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> { | |
| /// Initializes a finalize store from storage. | ||
| pub fn from(storage: P) -> Result<Self> { | ||
| // Return the finalize store. | ||
| Ok(Self { storage, _phantom: PhantomData }) | ||
| Ok(Self { | ||
| storage, | ||
| _phantom: PhantomData, | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| is_finalize_mode: Arc::new(AtomicBool::new(false)), | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| slipstream_plugin_manager: Arc::new(OnceLock::new()), | ||
| }) | ||
| } | ||
|
|
||
| /// Starts an atomic batch write operation. | ||
|
|
@@ -714,6 +736,76 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> { | |
| self.storage.current_block_height() | ||
| } | ||
|
|
||
| /// Returns a reference to the canonical finalize mode flag. | ||
| /// | ||
| /// When `true`, storage writes notify registered Slipstream plugins. | ||
| /// Set to `true` by the VM before canonical finalize runs and reset to `false` afterwards. | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| pub fn is_finalize_mode(&self) -> &Arc<AtomicBool> { | ||
| &self.is_finalize_mode | ||
| } | ||
|
|
||
| /// Installs a Slipstream plugin manager to receive canonical mapping and staking updates. | ||
| /// | ||
| /// May be called from a shared reference. Logs a warning if called more than once. | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| pub fn set_slipstream_plugin_manager(&self, manager: Arc<RwLock<SlipstreamPluginManager>>) { | ||
| if self.slipstream_plugin_manager.set(manager).is_err() { | ||
| tracing::warn!("Slipstream plugin manager is already set; ignoring subsequent call."); | ||
| } | ||
| } | ||
|
|
||
| /// Returns the Slipstream plugin manager, if one has been installed. | ||
| /// | ||
| /// The returned `Arc` is a lightweight additional handle to the same manager instance; | ||
| /// it does not clone the manager itself. | ||
| #[cfg(feature = "slipstream-plugins")] | ||
| pub fn slipstream_plugin_manager(&self) -> Option<Arc<RwLock<SlipstreamPluginManager>>> { | ||
| self.slipstream_plugin_manager.get().cloned() | ||
| } | ||
|
|
||
| /// Notifies all interested plugins of a staking reward, if canonical finalize is active. | ||
| /// | ||
| /// Errors from plugin calls are logged but never propagated. | ||
| #[cfg(all(feature = "history-staking-rewards", feature = "slipstream-plugins"))] | ||
| pub fn notify_staking_reward( | ||
| &self, | ||
| staker: &Address<N>, | ||
| validator: &Address<N>, | ||
| reward: u64, | ||
| new_stake: u64, | ||
| block_height: u32, | ||
| ) { | ||
| if !self.is_finalize_mode.load(Ordering::SeqCst) { | ||
| return; | ||
| } | ||
|
|
||
| if let Some(mgr) = self.slipstream_plugin_manager.get() { | ||
| let staker_bytes = match staker.to_bytes_le() { | ||
| Ok(b) => b, | ||
| Err(e) => { | ||
| tracing::warn!("Slipstream: failed to serialize staker address: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| let validator_bytes = match validator.to_bytes_le() { | ||
| Ok(b) => b, | ||
| Err(e) => { | ||
| tracing::warn!("Slipstream: failed to serialize validator address: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| match mgr.read() { | ||
| Ok(plugin_mgr) => { | ||
| plugin_mgr.notify_staking_reward(&staker_bytes, &validator_bytes, reward, new_stake, block_height) | ||
| } | ||
| Err(e) => tracing::warn!( | ||
| "Slipstream: plugin manager lock poisoned, skipping staking reward notification: {e}" | ||
| ), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Returns the historical value of a mapping. | ||
| #[cfg(feature = "history")] | ||
| pub fn get_historical_mapping_value( | ||
|
|
@@ -827,7 +919,51 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStoreTrait<N> for FinalizeStore< | |
| key: Plaintext<N>, | ||
| value: Value<N>, | ||
| ) -> Result<FinalizeOperation<N>> { | ||
| self.storage.update_key_value(program_id, mapping_name, key, value) | ||
| // Serialize before moving, if a plugin notification may be needed. | ||
| #[cfg(all(feature = "history", feature = "slipstream-plugins"))] | ||
| let plugin_data = if self.is_finalize_mode.load(Ordering::SeqCst) { | ||
| if let Some(mgr) = self.slipstream_plugin_manager.get() { | ||
| match mgr.read() { | ||
| Ok(plugin_mgr) if plugin_mgr.history_mappings_enabled() => Some(( | ||
| program_id.to_bytes_le()?, | ||
| mapping_name.to_bytes_le()?, | ||
| key.to_bytes_le()?, | ||
| value.to_bytes_le()?, | ||
| )), | ||
| Ok(_) => None, | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| "Slipstream: plugin manager lock poisoned, skipping mapping update serialization: {e}" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let result = self.storage.update_key_value(program_id, mapping_name, key, value)?; | ||
|
|
||
| // Notify plugins of the update if in canonical finalize mode. | ||
| #[cfg(all(feature = "history", feature = "slipstream-plugins"))] | ||
| if let Some((pid, mname, k, v)) = plugin_data { | ||
| #[cfg(feature = "history")] | ||
| let height = self.storage.current_block_height().load(Ordering::SeqCst); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used? |
||
| #[cfg(all(not(feature = "history"), feature = "slipstream-plugins"))] | ||
| let height = 0u32; | ||
| if let Some(mgr) = self.slipstream_plugin_manager.get() { | ||
| match mgr.read() { | ||
| Ok(plugin_mgr) => plugin_mgr.notify_mapping_update(&pid, &mname, &k, &v, height), | ||
| Err(e) => tracing::warn!( | ||
| "Slipstream: plugin manager lock poisoned, skipping mapping update notification: {e}" | ||
| ), | ||
| } | ||
| } | ||
| } | ||
| Ok(result) | ||
| } | ||
|
|
||
| /// Removes the key-value pair for the given `program ID`, `mapping name`, and `key` from storage. | ||
|
|
@@ -860,7 +996,58 @@ impl<N: Network, P: FinalizeStorage<N>> FinalizeStore<N, P> { | |
| mapping_name: Identifier<N>, | ||
| entries: Vec<(Plaintext<N>, Value<N>)>, | ||
| ) -> Result<FinalizeOperation<N>> { | ||
| self.storage.replace_mapping(program_id, mapping_name, entries) | ||
| // Serialize mapping identity and all entries before moving them into storage, | ||
| // so they are available for plugin notification after the storage call. | ||
| #[cfg(all(feature = "history", feature = "slipstream-plugins"))] | ||
| let plugin_data: SerializedMappingEntries = if self.is_finalize_mode.load(Ordering::SeqCst) { | ||
| if let Some(mgr) = self.slipstream_plugin_manager.get() { | ||
| match mgr.read() { | ||
| Ok(plugin_mgr) if plugin_mgr.history_mappings_enabled() => { | ||
| let mut serialized_entries = Vec::with_capacity(entries.len()); | ||
| for (key, value) in &entries { | ||
| serialized_entries.push((key.to_bytes_le()?, value.to_bytes_le()?)); | ||
| } | ||
| Some((program_id.to_bytes_le()?, mapping_name.to_bytes_le()?, serialized_entries)) | ||
| } | ||
| Ok(_) => None, | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| "Slipstream: plugin manager lock poisoned, skipping mapping replace serialization: {e}" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let result = self.storage.replace_mapping(program_id, mapping_name, entries)?; | ||
|
|
||
| // Notify plugins of each updated key-value pair if in canonical finalize mode. | ||
| #[cfg(all(feature = "history", feature = "slipstream-plugins"))] | ||
| if let Some((pid, mname, serialized_entries)) = plugin_data { | ||
| #[cfg(feature = "history")] | ||
| let height = self.storage.current_block_height().load(Ordering::SeqCst); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same note as above |
||
| #[cfg(all(not(feature = "history"), feature = "slipstream-plugins"))] | ||
| let height = 0u32; | ||
| if let Some(mgr) = self.slipstream_plugin_manager.get() { | ||
| match mgr.read() { | ||
| Ok(plugin_mgr) => { | ||
| for (k, v) in &serialized_entries { | ||
| plugin_mgr.notify_mapping_update(&pid, &mname, k, v, height); | ||
| } | ||
| } | ||
| Err(e) => tracing::warn!( | ||
| "Slipstream: plugin manager lock poisoned, skipping mapping update notifications: {e}" | ||
| ), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(result) | ||
| } | ||
|
|
||
| /// Removes the mapping for the given `program ID` and `mapping name` from storage, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| [package] | ||
| name = "snarkvm-slipstream-plugin-interface" | ||
| version = "4.6.0" | ||
| authors = [ "The Aleo Team <hello@aleo.org>" ] | ||
| description = "The SnarkVM Slipstream plugin interface." | ||
| homepage = "https://aleo.org" | ||
| repository = "https://github.com/ProvableHQ/snarkVM" | ||
| license = "Apache-2.0" | ||
| edition = "2024" | ||
|
|
||
| [dependencies.anyhow] | ||
| workspace = true | ||
|
|
||
| [dependencies.tracing] | ||
| workspace = true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| # Aleo Slipstream Plugin Interface | ||
|
|
||
| This crate enables a plugin to be added into a SnarkVM runtime to | ||
| take actions at the time of mapping updates at block finalization; | ||
| for example, saving historical mappings state and staking data to an external database. The plugin must | ||
| implement the `SlipstreamPlugin` trait. Please see the details of the | ||
| `slipstream_plugin_interface.rs` for the interface definition. | ||
|
|
||
| # Components | ||
|
|
||
| ### `plugins/slipstream_plugin_interface` | ||
| Defines the `SlipstreamPlugin` trait — the interface all plugins must implement. | ||
|
|
||
| | Method | Description | | ||
| |---|---| | ||
| | `on_load` / `on_unload` | Lifecycle hooks | | ||
| | `notify_mapping_update` | Called when a mapping key-value is inserted/updated during canonical finalize; args are serialized to bytes for object-safety | | ||
| | `notify_staking_reward` | Called once per staker per block during staking reward distribution | | ||
| | `history_enabled` / `history_staking_rewards_enabled` | Flags plugins use to opt in to data streams | | ||
|
|
||
| ### `plugins/slipstream_plugin_manager` | ||
| Manages loaded plugins and their backing `libloading::Library` handles. | ||
|
|
||
| - **`LoadedSlipstreamPlugin`** — wrapper holding a boxed plugin + its name; implements `Deref`/`DerefMut` | ||
| - **`SlipstreamPluginManager`** | ||
| - `unload()` — fires `on_unload()` on each plugin then drops the libraries | ||
| - `history_mappings_enabled()` / `history_staking_rewards_enabled()` — aggregate opt-in checks | ||
| - `notify_mapping_update()` — fan-out broadcast to all interested plugins | ||
| - **`SlipstreamService`** — async service wrapping the manager (separate file) | ||
|
|
||
| --- | ||
|
|
||
| ## Plugin Config File (JSON5) | ||
|
|
||
| Each plugin requires a config file: | ||
| ```json5 | ||
| { | ||
| "libpath": "/path/to/libmy_plugin.so", // required; relative paths resolve from the config file's dir | ||
| "name": "my_plugin" // optional; overrides the plugin's name() return value | ||
| } | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## Plugin Library Convention | ||
|
|
||
| The shared library (`.so` / `.dylib` / `.dll`) must export a C function: | ||
| ```rust | ||
| #[no_mangle] | ||
| pub extern "C" fn _create_plugin() -> *mut dyn SlipstreamPlugin { | ||
| Box::into_raw(Box::new(MyPlugin::new())) | ||
| } | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## Startup | ||
|
|
||
| `SlipstreamPluginService::new()` takes a slice of config file paths: | ||
| ```rust | ||
| let service = SlipstreamPluginService::new(&[ | ||
| PathBuf::from("/etc/aleo/plugins/my_plugin.json5"), | ||
| ])?; | ||
| ``` | ||
|
|
||
| > **Note:** Not yet wired up to any CLI flags or environment variables. How/where `SlipstreamPluginService` gets constructed and passed into the VM still needs to be plumbed in (likely in snarkOS or wherever the VM is instantiated). | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we reword/remove? |
||
|
|
||
| > Errors from plugin callbacks (`notify_mapping_update`, `notify_staking_reward`) are logged as warnings and never propagated — a misbehaving plugin will not crash the node. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally like the "fail loudly" philosophy. But proper error handling/metrics is also more work so can be left for the future. |
||
Uh oh!
There was an error while loading. Please reload this page.