Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ package = [
"dep:dotenvy"
]

announce-blocks = [ "snarkvm-synthesizer/announce-blocks" ]
async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ]
cuda = [ "snarkvm-algorithms/cuda" ]
history = [ "snarkvm-synthesizer/history" ]
Expand Down
9 changes: 9 additions & 0 deletions synthesizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ license = "Apache-2.0"
edition = "2024"

[features]
announce-blocks = [ "dep:bincode", "dep:interprocess" ]
locktick = [
"dep:locktick",
"snarkvm-ledger-puzzle/locktick",
Expand Down Expand Up @@ -116,10 +117,18 @@ workspace = true
[dependencies.anyhow]
workspace = true

[dependencies.bincode]
workspace = true
optional = true

[dependencies.indexmap]
workspace = true
features = [ "serde", "rayon" ]

[dependencies.interprocess]
version = "2.2.3"
optional = true

[dependencies.itertools]
workspace = true

Expand Down
60 changes: 60 additions & 0 deletions synthesizer/src/vm/helpers/sequential_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
use crate::vm::*;
use console::network::prelude::Network;

#[cfg(feature = "announce-blocks")]
use interprocess::local_socket::{self, Stream, prelude::*};
use std::{fmt, thread};
use tokio::sync::oneshot;
#[cfg(feature = "announce-blocks")]
use tracing::*;

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Launches a thread dedicated to the sequential processing of storage-related
Expand All @@ -26,6 +30,9 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
&self,
request_rx: mpsc::Receiver<SequentialOperationRequest<N>>,
) -> thread::JoinHandle<()> {
#[cfg(feature = "announce-blocks")]
let mut stream = start_block_announcement_stream();

// Spawn a dedicated thread.
let vm = self.clone();
thread::spawn(move || {
Expand All @@ -37,7 +44,17 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
// Perform the queued operation.
let ret = match op {
SequentialOperation::AddNextBlock(block) => {
#[cfg(feature = "announce-blocks")]
let ipc_payload = (block.height(), bincode::serialize(&block));
let ret = vm.add_next_block_inner(block);
#[cfg(feature = "announce-blocks")]
if ret.is_ok() {
if let Err(e) = announce_block(&mut stream, ipc_payload) {
error!("IPC error: {e}");
// Attempt to restart the stream.
stream = start_block_announcement_stream();
}
}
SequentialOperationResult::AddNextBlock(ret)
}
SequentialOperation::AtomicSpeculate(a, b, c, d, e, f) => {
Expand Down Expand Up @@ -138,3 +155,46 @@ pub enum SequentialOperationResult<N: Network> {
)>,
),
}

#[cfg(feature = "announce-blocks")]
fn start_block_announcement_stream() -> Option<Stream> {
let path = std::env::var("BLOCK_ANNOUNCE_PATH")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: We should probably have a unified source that documenst all the environment variables that are used.

.map_err(|_| {
warn!("BLOCK_ANNOUNCE_PATH env variable must be set in order to publish blocks via IPC");
})
.ok()?
.to_fs_name::<local_socket::GenericFilePath>()
.expect("Invalid path provided as the BLOCK_ANNOUNCE_PATH");

match Stream::connect(path) {
Ok(stream) => {
debug!("Successfully (re)started the IPC stream for block announcements");
Some(stream)
}
Err(e) => {
warn!("Couldn't (re)start the IPC stream for block announcements: {e}");
None
}
}
}

#[cfg(feature = "announce-blocks")]
fn announce_block(
stream: &mut Option<Stream>,
payload: (u32, Result<Vec<u8>, Box<bincode::ErrorKind>>),
) -> Result<bool> {
if let Some(stream) = stream {
let (block_height, serialized_block) = payload;
let block_bytes = serialized_block?;
debug!("Announcing block {block_height} to the IPC stream");
let payload_size = u32::try_from(std::mem::size_of::<u32>() + block_bytes.len()).unwrap(); // Safe - blocks are smaller than 4GiB.
stream.write_all(&payload_size.to_le_bytes())?;
stream.write_all(&block_height.to_le_bytes())?;
stream.write_all(&block_bytes)?;

Ok(true)
} else {
*stream = start_block_announcement_stream();
if stream.is_some() { announce_block(stream, payload) } else { Ok(false) }
}
}