Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ package = [
"dep:dotenvy"
]

announce-blocks = [ "snarkvm-synthesizer/announce-blocks" ]
async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ]
cuda = [ "snarkvm-algorithms/cuda" ]
history = [ "snarkvm-synthesizer/history" ]
Expand Down
9 changes: 9 additions & 0 deletions synthesizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ license = "Apache-2.0"
edition = "2024"

[features]
announce-blocks = [ "dep:bincode", "dep:interprocess" ]
locktick = [
"dep:locktick",
"snarkvm-ledger-puzzle/locktick",
Expand Down Expand Up @@ -116,10 +117,18 @@ workspace = true
[dependencies.anyhow]
workspace = true

[dependencies.bincode]
workspace = true
optional = true

[dependencies.indexmap]
workspace = true
features = [ "serde", "rayon" ]

[dependencies.interprocess]
version = "2.2.3"
optional = true

[dependencies.itertools]
workspace = true

Expand Down
56 changes: 56 additions & 0 deletions synthesizer/src/vm/helpers/sequential_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
use crate::vm::*;
use console::network::prelude::Network;

#[cfg(feature = "announce-blocks")]
use interprocess::local_socket::{self, Stream, prelude::*};
use std::{fmt, thread};
use tokio::sync::oneshot;
#[cfg(feature = "announce-blocks")]
use tracing::*;

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Launches a thread dedicated to the sequential processing of storage-related
Expand All @@ -26,6 +30,9 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
&self,
request_rx: mpsc::Receiver<SequentialOperationRequest<N>>,
) -> thread::JoinHandle<()> {
#[cfg(feature = "announce-blocks")]
let mut stream = start_block_announcement_stream();

// Spawn a dedicated thread.
let vm = self.clone();
thread::spawn(move || {
Expand All @@ -37,7 +44,17 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
// Perform the queued operation.
let ret = match op {
SequentialOperation::AddNextBlock(block) => {
#[cfg(feature = "announce-blocks")]
let ipc_payload = (block.height(), bincode::serialize(&block).unwrap()); // Infallible.
let ret = vm.add_next_block_inner(block);
#[cfg(feature = "announce-blocks")]
if ret.is_ok() {
if let Err(e) = announce_block(&mut stream, ipc_payload) {
error!("IPC error: {e}");
// Attempt to restart the stream.
stream = start_block_announcement_stream();
}
}
SequentialOperationResult::AddNextBlock(ret)
}
SequentialOperation::AtomicSpeculate(a, b, c, d, e, f) => {
Expand Down Expand Up @@ -138,3 +155,42 @@ pub enum SequentialOperationResult<N: Network> {
)>,
),
}

#[cfg(feature = "announce-blocks")]
fn start_block_announcement_stream() -> Option<Stream> {
let path = std::env::var("BLOCK_ANNOUNCE_PATH")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: We should probably have a unified source that documenst all the environment variables that are used.

.map_err(|_| {
warn!("BLOCK_ANNOUNCE_PATH env variable must be set in order to publish blocks via IPC");
})
.ok()?
.to_fs_name::<local_socket::GenericFilePath>()
.expect("Invalid path provided as the BLOCK_ANNOUNCE_PATH");

match Stream::connect(path) {
Ok(stream) => {
debug!("Successfully (re)started the IPC stream for block announcements");
Some(stream)
}
Err(e) => {
warn!("Couldn't (re)start the IPC stream for block announcements: {e}");
None
}
}
}

#[cfg(feature = "announce-blocks")]
fn announce_block(stream: &mut Option<Stream>, payload: (u32, Vec<u8>)) -> Result<bool> {
if let Some(stream) = stream {
let (block_height, block_bytes) = payload;
debug!("Announcing block {block_height} to the IPC stream");
let payload_size = u32::try_from(4 + block_bytes.len()).unwrap(); // Safe - blocks are smaller than 4GiB.
stream.write_all(&payload_size.to_le_bytes())?;
stream.write_all(&block_height.to_le_bytes())?;
stream.write_all(&block_bytes)?;

Ok(true)
} else {
*stream = start_block_announcement_stream();
if stream.is_some() { announce_block(stream, payload) } else { Ok(false) }
}
}