Skip to content

Commit bc9607f

Browse files
committed
feat: announce new blocks via IPC (feature-gated)
Signed-off-by: ljedrz <ljedrz@users.noreply.github.com>
1 parent b04dc9b commit bc9607f

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ package = [
124124
"dep:dotenvy"
125125
]
126126

127+
announce-blocks = [ "snarkvm-synthesizer/announce-blocks" ]
127128
async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ]
128129
cuda = [ "snarkvm-algorithms/cuda" ]
129130
dev-print = [

synthesizer/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ license = "Apache-2.0"
2424
edition = "2024"
2525

2626
[features]
27+
announce-blocks = [ "dep:bincode", "dep:interprocess" ]
2728
locktick = [
2829
"dep:locktick",
2930
"snarkvm-ledger-puzzle/locktick",
@@ -115,10 +116,18 @@ workspace = true
115116
[dependencies.anyhow]
116117
workspace = true
117118

119+
[dependencies.bincode]
120+
workspace = true
121+
optional = true
122+
118123
[dependencies.indexmap]
119124
workspace = true
120125
features = [ "serde", "rayon" ]
121126

127+
[dependencies.interprocess]
128+
version = "2.2.3"
129+
optional = true
130+
122131
[dependencies.itertools]
123132
workspace = true
124133

synthesizer/src/vm/helpers/sequential_op.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
use crate::vm::*;
1717
use console::network::prelude::Network;
1818

19+
#[cfg(feature = "announce-blocks")]
20+
use interprocess::local_socket::{self, Stream, prelude::*};
1921
use std::{fmt, thread};
2022
use tokio::sync::oneshot;
23+
#[cfg(feature = "announce-blocks")]
24+
use tracing::*;
2125

2226
impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
2327
/// Launches a thread dedicated to the sequential processing of storage-related
@@ -26,6 +30,9 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
2630
&self,
2731
request_rx: mpsc::Receiver<SequentialOperationRequest<N>>,
2832
) -> thread::JoinHandle<()> {
33+
#[cfg(feature = "announce-blocks")]
34+
let mut stream = start_block_announcement_stream();
35+
2936
// Spawn a dedicated thread.
3037
let vm = self.clone();
3138
thread::spawn(move || {
@@ -37,7 +44,13 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
3744
// Perform the queued operation.
3845
let ret = match op {
3946
SequentialOperation::AddNextBlock(block) => {
47+
#[cfg(feature = "announce-blocks")]
48+
let ipc_payload = (block.height(), bincode::serialize(&block).unwrap()); // Infallible.
4049
let ret = vm.add_next_block_inner(block);
50+
#[cfg(feature = "announce-blocks")]
51+
if ret.is_ok() {
52+
let _ = announce_block(&mut stream, ipc_payload);
53+
}
4154
SequentialOperationResult::AddNextBlock(ret)
4255
}
4356
SequentialOperation::AtomicSpeculate(a, b, c, d, e, f) => {
@@ -138,3 +151,42 @@ pub enum SequentialOperationResult<N: Network> {
138151
)>,
139152
),
140153
}
154+
155+
#[cfg(feature = "announce-blocks")]
156+
fn start_block_announcement_stream() -> Option<Stream> {
157+
let path = std::env::var("BLOCK_ANNOUNCE_PATH")
158+
.map_err(|_| {
159+
warn!("BLOCK_ANNOUNCE_PATH env variable must be set in order to publish blocks via IPC");
160+
})
161+
.ok()?
162+
.to_fs_name::<local_socket::GenericFilePath>()
163+
.expect("Invalid path provided as the BLOCK_ANNOUNCE_PATH");
164+
165+
match Stream::connect(path) {
166+
Ok(stream) => {
167+
debug!("Successfully started the IPC stream for block announcements");
168+
Some(stream)
169+
}
170+
Err(e) => {
171+
warn!("Couldn't start the IPC stream for block announcements: {e}");
172+
None
173+
}
174+
}
175+
}
176+
177+
#[cfg(feature = "announce-blocks")]
178+
fn announce_block(stream: &mut Option<Stream>, payload: (u32, Vec<u8>)) -> Result<bool> {
179+
if let Some(stream) = stream {
180+
let (block_height, block_bytes) = payload;
181+
debug!("Announcing block {block_height} to the IPC stream");
182+
let payload_size = u32::try_from(4 + block_bytes.len()).unwrap(); // Safe - blocks are smaller than 4GiB.
183+
stream.write_all(&payload_size.to_le_bytes())?;
184+
stream.write_all(&block_height.to_le_bytes())?;
185+
stream.write_all(&block_bytes)?;
186+
187+
Ok(true)
188+
} else {
189+
*stream = start_block_announcement_stream();
190+
if stream.is_some() { announce_block(stream, payload) } else { Ok(false) }
191+
}
192+
}

0 commit comments

Comments
 (0)