diff --git a/Cargo.lock b/Cargo.lock index 2be48c6eb0..9b230a67be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -800,6 +800,12 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8975ffdaa0ef3661bfe02dbdcc06c9f829dfafe6a3c474de366a8d5e44276921" +[[package]] +name = "doctest-file" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" + [[package]] name = "document-features" version = "0.2.12" @@ -1582,6 +1588,19 @@ dependencies = [ "serde_core", ] +[[package]] +name = "interprocess" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d" +dependencies = [ + "doctest-file", + "libc", + "recvmsg", + "widestring", + "windows-sys 0.52.0", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2326,6 +2345,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "recvmsg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" + [[package]] name = "redox_syscall" version = "0.5.18" @@ -3669,6 +3694,7 @@ dependencies = [ "criterion", "hex", "indexmap 2.12.0", + "interprocess", "itertools 0.14.0", "k256", "locktick", @@ -4605,6 +4631,12 @@ dependencies = [ "winsafe", ] +[[package]] +name = "widestring" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 9bc3dbc45f..a8978827c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,7 @@ package = [ "dep:dotenvy" ] +announce-blocks = [ "snarkvm-synthesizer/announce-blocks" ] async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ] cuda = [ "snarkvm-algorithms/cuda" ] history = [ "snarkvm-synthesizer/history" ] diff --git a/synthesizer/Cargo.toml b/synthesizer/Cargo.toml index 1f05f241f7..2bd71d82fc 100644 --- a/synthesizer/Cargo.toml +++ b/synthesizer/Cargo.toml @@ -24,6 +24,7 @@ license = "Apache-2.0" edition = "2024" [features] +announce-blocks = [ "dep:bincode", "dep:interprocess" ] locktick = [ "dep:locktick", "snarkvm-ledger-puzzle/locktick", @@ -116,10 +117,18 @@ workspace = true [dependencies.anyhow] workspace = true +[dependencies.bincode] +workspace = true +optional = true + [dependencies.indexmap] workspace = true features = [ "serde", "rayon" ] +[dependencies.interprocess] +version = "2.2.3" +optional = true + [dependencies.itertools] workspace = true diff --git a/synthesizer/src/vm/helpers/sequential_op.rs b/synthesizer/src/vm/helpers/sequential_op.rs index 8b99dfab2e..269de5cb4b 100644 --- a/synthesizer/src/vm/helpers/sequential_op.rs +++ b/synthesizer/src/vm/helpers/sequential_op.rs @@ -16,8 +16,12 @@ use crate::vm::*; use console::network::prelude::Network; +#[cfg(feature = "announce-blocks")] +use interprocess::local_socket::{self, Stream, prelude::*}; use std::{fmt, thread}; use tokio::sync::oneshot; +#[cfg(feature = "announce-blocks")] +use tracing::*; impl> VM { /// Launches a thread dedicated to the sequential processing of storage-related @@ -26,6 +30,9 @@ impl> VM { &self, request_rx: mpsc::Receiver>, ) -> thread::JoinHandle<()> { + #[cfg(feature = "announce-blocks")] + let mut stream = start_block_announcement_stream(); + // Spawn a dedicated thread. let vm = self.clone(); thread::spawn(move || { @@ -37,7 +44,17 @@ impl> VM { // Perform the queued operation. let ret = match op { SequentialOperation::AddNextBlock(block) => { + #[cfg(feature = "announce-blocks")] + let ipc_payload = (block.height(), bincode::serialize(&block)); let ret = vm.add_next_block_inner(block); + #[cfg(feature = "announce-blocks")] + if ret.is_ok() { + if let Err(e) = announce_block(&mut stream, ipc_payload) { + error!("IPC error: {e}"); + // Attempt to restart the stream. + stream = start_block_announcement_stream(); + } + } SequentialOperationResult::AddNextBlock(ret) } SequentialOperation::AtomicSpeculate(a, b, c, d, e, f) => { @@ -138,3 +155,46 @@ pub enum SequentialOperationResult { )>, ), } + +#[cfg(feature = "announce-blocks")] +fn start_block_announcement_stream() -> Option { + let path = std::env::var("BLOCK_ANNOUNCE_PATH") + .map_err(|_| { + warn!("BLOCK_ANNOUNCE_PATH env variable must be set in order to publish blocks via IPC"); + }) + .ok()? + .to_fs_name::() + .expect("Invalid path provided as the BLOCK_ANNOUNCE_PATH"); + + match Stream::connect(path) { + Ok(stream) => { + debug!("Successfully (re)started the IPC stream for block announcements"); + Some(stream) + } + Err(e) => { + warn!("Couldn't (re)start the IPC stream for block announcements: {e}"); + None + } + } +} + +#[cfg(feature = "announce-blocks")] +fn announce_block( + stream: &mut Option, + payload: (u32, Result, Box>), +) -> Result { + if let Some(stream) = stream { + let (block_height, serialized_block) = payload; + let block_bytes = serialized_block?; + debug!("Announcing block {block_height} to the IPC stream"); + let payload_size = u32::try_from(std::mem::size_of::() + block_bytes.len()).unwrap(); // Safe - blocks are smaller than 4GiB. + stream.write_all(&payload_size.to_le_bytes())?; + stream.write_all(&block_height.to_le_bytes())?; + stream.write_all(&block_bytes)?; + + Ok(true) + } else { + *stream = start_block_announcement_stream(); + if stream.is_some() { announce_block(stream, payload) } else { Ok(false) } + } +}