diff --git a/Cargo.lock b/Cargo.lock index b97e470ba8..1d0c8131c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4459,6 +4459,7 @@ version = "4.4.0" dependencies = [ "bytes", "proptest", + "sha2", "snarkos-node-bft-events", "snarkos-node-network", "snarkos-node-sync-locators", diff --git a/node/router/messages/Cargo.toml b/node/router/messages/Cargo.toml index 63f276d722..220d5563cd 100644 --- a/node/router/messages/Cargo.toml +++ b/node/router/messages/Cargo.toml @@ -23,6 +23,10 @@ test = [ ] [dependencies.bytes] workspace = true +[dependencies.sha2] +version = "0.10" +default-features = false + [dependencies.snarkos-node-network] workspace = true diff --git a/node/router/messages/src/chunk.rs b/node/router/messages/src/chunk.rs new file mode 100644 index 0000000000..a2980305a1 --- /dev/null +++ b/node/router/messages/src/chunk.rs @@ -0,0 +1,97 @@ +// Copyright (c) 2019-2025 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::MessageTrait; + +use snarkvm::prelude::{FromBytes, ToBytes}; +use std::{ + borrow::Cow, + fmt::{self, Write}, + io::{self, Read}, +}; + +#[derive(Clone)] +pub struct MessageChunk { + /// The hash of the original message. + pub hash: [u8; 32], + /// The index of the chunk. + pub idx: u16, + /// A flag indicating whether this is the final chunk. + pub last: bool, + /// The bytes representing the chunk. + pub blob: Box<[u8]>, +} + +impl fmt::Debug for MessageChunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut hash = String::with_capacity(8); + for byte in &self.hash[..4] { + write!(hash, "{byte:02x}")?; + } + + f.debug_struct("MessageChunk").field("hash", &hash).field("idx", &self.idx).field("last", &self.last).finish() + } +} + +impl PartialEq for MessageChunk { + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash && self.idx == other.idx + } +} +impl Eq for MessageChunk {} + +impl MessageChunk { + pub fn new(hash: [u8; 32], idx: u16, last: bool, blob: Box<[u8]>) -> Self { + Self { hash, idx, last, blob } + } +} + +impl MessageTrait for MessageChunk { + /// Returns the message name. + #[inline] + fn name(&self) -> Cow<'static, str> { + let mut hash = String::with_capacity(8); + for byte in &self.hash[..4] { + let _ = write!(hash, "{byte:02x}"); // in-memory, shouldn't fail + } + format!("Chunk {}{} of Message {}", self.idx, if self.last { " (final)" } else { "" }, hash).into() + } +} + +impl ToBytes for MessageChunk { + fn write_le(&self, mut writer: W) -> io::Result<()> { + self.hash.write_le(&mut writer)?; + self.idx.write_le(&mut writer)?; + self.last.write_le(&mut writer)?; + (self.blob.len() as u32).write_le(&mut writer)?; + writer.write_all(&self.blob)?; + + Ok(()) + } +} + +impl FromBytes for MessageChunk { + fn read_le(mut reader: R) -> io::Result { + let hash = <[u8; 32]>::read_le(&mut reader)?; + let idx = u16::read_le(&mut reader)?; + let last = bool::read_le(&mut reader)?; + let blob_len = u32::read_le(&mut reader)?; + let mut blob = Vec::new(); + (&mut reader).take(blob_len as u64).read_to_end(&mut blob)?; + let blob = blob.into(); + + Ok(Self { hash, idx, last, blob }) + } +} diff --git a/node/router/messages/src/helpers/codec.rs b/node/router/messages/src/helpers/codec.rs index c83b56105e..a734dbe2a8 100644 --- a/node/router/messages/src/helpers/codec.rs +++ b/node/router/messages/src/helpers/codec.rs @@ -24,7 +24,8 @@ use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; const MAXIMUM_HANDSHAKE_MESSAGE_SIZE: usize = 1024 * 1024; // 1 MiB /// The maximum size of a message that can be transmitted in the network. -pub(crate) const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB +// TODO: with message chunking in place, it can be greatly reduced. +pub const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB /// The codec used to decode and encode network `Message`s. pub struct MessageCodec { diff --git a/node/router/messages/src/helpers/mod.rs b/node/router/messages/src/helpers/mod.rs index 4e944c7c72..9b03dc33d7 100644 --- a/node/router/messages/src/helpers/mod.rs +++ b/node/router/messages/src/helpers/mod.rs @@ -14,7 +14,7 @@ // limitations under the License. mod codec; -pub use codec::MessageCodec; +pub use codec::{MAXIMUM_MESSAGE_SIZE, MessageCodec}; mod disconnect; pub use disconnect::DisconnectReason; diff --git a/node/router/messages/src/lib.rs b/node/router/messages/src/lib.rs index 144be28538..bebc0010b8 100644 --- a/node/router/messages/src/lib.rs +++ b/node/router/messages/src/lib.rs @@ -33,6 +33,9 @@ pub use challenge_request::ChallengeRequest; mod challenge_response; pub use challenge_response::ChallengeResponse; +mod chunk; +pub use chunk::MessageChunk; + mod disconnect; pub use disconnect::Disconnect; @@ -72,10 +75,12 @@ use snarkvm::prelude::{ ToBytes, block::{Header, Transaction}, error, + io_error, puzzle::{Solution, SolutionID}, }; -use std::{borrow::Cow, io, net::SocketAddr}; +use sha2::{Digest, Sha256}; +use std::{borrow::Cow, io, marker::PhantomData, mem, net::SocketAddr}; pub trait MessageTrait: ToBytes + FromBytes { /// Returns the message name. @@ -97,6 +102,58 @@ pub enum Message { PuzzleResponse(PuzzleResponse), UnconfirmedSolution(UnconfirmedSolution), UnconfirmedTransaction(UnconfirmedTransaction), + Chunk(MessageChunk), +} + +/// An iterator that produces chunks from a Message. +#[doc(hidden)] +pub struct ChunkIter { + /// The hash of the complete Message. + hash: [u8; 32], + /// The remaining serialized bytes to build chunks from. + serialized: Vec, + /// The chunk index. + idx: u16, + _phantom: PhantomData, +} + +impl ChunkIter { + pub fn new(message: &Message) -> io::Result { + let serialized = + message.to_bytes_le().map_err(|_| io_error(format!("failed to serialize {}", message.name())))?; + let digest = Sha256::digest(&serialized); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&digest); + + Ok(Self { hash, serialized, idx: 0, _phantom: Default::default() }) + } +} + +impl Iterator for ChunkIter { + type Item = MessageChunk; + + fn next(&mut self) -> Option { + let remaining_len = self.serialized.len(); + + // If the serialized bytes are exhausted, there is nothing more to chunk. + if remaining_len == 0 { + return None; + } + + // Isolate a single chunk. + let chunk_len = Message::::MAX_CHUNK_LEN.min(remaining_len); + let chunk = if remaining_len <= Message::::MAX_CHUNK_LEN { + let chunk_blob = mem::take(&mut self.serialized).into(); + MessageChunk::new(self.hash, self.idx, true, chunk_blob) + } else { + let chunk_blob: Box<[u8]> = self.serialized.drain(0..chunk_len).collect(); + MessageChunk::new(self.hash, self.idx, false, chunk_blob) + }; + + self.idx += 1; + + Some(chunk) + } } impl From for Message { @@ -105,7 +162,40 @@ impl From for Message { } } +impl TryFrom> for Message { + type Error = io::Error; + + fn try_from(chunks: Vec) -> io::Result { + if chunks.is_empty() { + return Err(error("A message can't be reconstructed from an empty list of chunks")); + } + + // Recreate the full Message from the chunks. + let mut full = Vec::new(); + for chunk in &chunks { + full.extend_from_slice(&chunk.blob); + } + let full_len = full.len(); + + // Verify the hash of the recreated message. + let expected_hash = chunks.first().unwrap().hash; + let recreated_hash = Sha256::digest(&full); + if *recreated_hash != expected_hash { + return Err(error("The hash of a chunked message doesn't match its expected value")); + } + + // Parse the full Message. + let mut reader = io::Cursor::new(full); + let message = Message::read_le(&mut reader)?; + debug!("recreated a {}B {} from {} chunks", full_len, message.name(), chunks.len()); + + Ok(message) + } +} + impl Message { + /// The maximum size of a chunk in a chunked message. + pub const MAX_CHUNK_LEN: usize = 64 * 1024; /// The version of the network protocol; this is incremented for breaking changes between migration versions. // Note. This should be incremented for each new `ConsensusVersion` that is added. pub const VERSIONS: [(ConsensusVersion, u32); 7] = [ @@ -167,6 +257,7 @@ impl Message { Self::PuzzleResponse(message) => message.name(), Self::UnconfirmedSolution(message) => message.name(), Self::UnconfirmedTransaction(message) => message.name(), + Self::Chunk(message) => message.name(), } } @@ -187,6 +278,7 @@ impl Message { Self::PuzzleResponse(..) => 10, Self::UnconfirmedSolution(..) => 11, Self::UnconfirmedTransaction(..) => 12, + Self::Chunk(..) => 13, } } @@ -231,6 +323,7 @@ impl ToBytes for Message { Self::PuzzleResponse(message) => message.write_le(writer), Self::UnconfirmedSolution(message) => message.write_le(writer), Self::UnconfirmedTransaction(message) => message.write_le(writer), + Self::Chunk(message) => message.write_le(writer), } } } @@ -257,7 +350,8 @@ impl FromBytes for Message { 10 => Self::PuzzleResponse(PuzzleResponse::read_le(&mut reader)?), 11 => Self::UnconfirmedSolution(UnconfirmedSolution::read_le(&mut reader)?), 12 => Self::UnconfirmedTransaction(UnconfirmedTransaction::read_le(&mut reader)?), - 13.. => return Err(error("Unknown message ID {id}")), + 13 => Self::Chunk(MessageChunk::read_le(&mut reader)?), + 14.. => return Err(error(format!("Unknown message ID {id}"))), }; // Ensure that there are no "dangling" bytes. diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 22cbd53226..5e8d6b660a 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -20,6 +20,7 @@ use crate::{ BlockRequest, BlockResponse, DataBlocks, + MAXIMUM_MESSAGE_SIZE, Message, PeerResponse, Ping, @@ -31,13 +32,14 @@ use crate::{ use snarkos_node_tcp::protocols::Reading; use snarkvm::prelude::{ ConsensusVersion, + FromBytes, Network, block::{Block, Header, Transaction}, puzzle::Solution, }; use anyhow::{Result, anyhow, bail}; -use std::net::SocketAddr; +use std::{collections::hash_map::Entry, io, net::SocketAddr}; use tokio::task::spawn_blocking; /// The max number of peers to send in a `PeerResponse` message. @@ -303,6 +305,59 @@ pub trait Inbound: Reading + Outbound { false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"), } } + Message::Chunk(chunk) => { + // Single-chunk messages don't require any lookups. + let message = if chunk.idx == 0 && chunk.last { + let mut reader = io::Cursor::new(chunk.blob); + Some(Message::read_le(&mut reader)?) + } else { + let mut message_chunks = self.router().message_chunks.lock(); + let peers_chunks = message_chunks.entry(peer_ip).or_default(); + + match peers_chunks.entry(chunk.hash) { + Entry::Vacant(entry) => { + // A new chunked message - start saving the chunks. + let mut chunks = Vec::with_capacity(8); + chunks.push(chunk); + entry.insert(chunks); + + None + } + Entry::Occupied(mut entry) => { + // Ensure correct chunk ordering. + let previous_idx = entry.get().last().map(|c| c.idx).unwrap_or_default(); + if chunk.idx != previous_idx + 1 { + entry.remove(); + bail!("Peer '{peer_ip}' sent an out-of-order message chunk"); + } + + // Ensure that the overall network message size hasn't been breached. + if (chunk.idx as usize + 1) * Message::::MAX_CHUNK_LEN > MAXIMUM_MESSAGE_SIZE { + entry.remove(); + bail!("Peer '{peer_ip}' sent an oversized message"); + } + + if chunk.last { + // Final chunk; remove the entry and recreate the full message. + let mut chunks = entry.remove(); + chunks.push(chunk); + let message = Message::::try_from(chunks)?; + + Some(message) + } else { + // Save this chunk; more will follow. + entry.get_mut().push(chunk); + + None + } + } + } + }; + match message { + Some(message) => self.inbound(peer_addr, message).await, + None => Ok(true), + } + } } } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 7f50f9cb56..3eca8e4d9a 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -44,7 +44,7 @@ pub use routing::*; mod writing; -use crate::messages::{BlockRequest, Message, MessageCodec}; +use crate::messages::{BlockRequest, Message, MessageChunk, MessageCodec}; use snarkos_account::Account; use snarkos_node_bft_ledger_service::LedgerService; @@ -117,6 +117,9 @@ impl PeerPoolHandling for Router { } } +// A map from hashed of chunked Messages to their corresponding chunks. +type ChunkedMessageMap = HashMap<[u8; 32], Vec>; + pub struct InnerRouter { /// The TCP stack. tcp: Tcp, @@ -128,6 +131,9 @@ pub struct InnerRouter { ledger: Arc>, /// The cache. cache: Cache, + /// Chunks of messages that were split for delivery. + // TODO: decide how to clean it up and what to do about users leaving partial messages. + message_chunks: Mutex>, /// The resolver. resolver: RwLock>, /// The collection of both candidate and connected peers. @@ -193,6 +199,7 @@ impl Router { account, ledger, cache: Default::default(), + message_chunks: Default::default(), resolver: Default::default(), peer_pool: RwLock::new(initial_peers), handles: Default::default(), diff --git a/node/router/src/writing.rs b/node/router/src/writing.rs index 95ac21c746..6a77a1dcec 100644 --- a/node/router/src/writing.rs +++ b/node/router/src/writing.rs @@ -15,6 +15,7 @@ use super::*; +use snarkos_node_router_messages::{ChunkIter, Message}; use snarkos_node_sync_locators::BlockLocators; use snarkos_node_tcp::protocols::Writing; @@ -37,7 +38,8 @@ impl Router { /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] /// which can be used to determine when and whether the message has been delivered. /// - /// This returns None, if the peer does not exist or disconnected. + /// This returns None if the given Message wasn't delivered, which may be caused + /// by the peer having disconnected, or the Message being split into chunks. pub fn send(&self, peer_ip: SocketAddr, message: Message) -> Option>> { // Determine whether to send the message. if !self.can_send(peer_ip, &message) { @@ -63,6 +65,17 @@ impl Router { if matches!(message, Message::PeerRequest(_)) { self.cache.increment_outbound_peer_requests(peer_ip); } + + // TODO: decide which messages to chunk + if matches!(message, Message::BlockResponse(_)) { + let chunk_iter = ChunkIter::::new(&message).ok()?; + for chunk in chunk_iter { + self.send(peer_ip, Message::Chunk(chunk))?; + } + + return None; + } + // Retrieve the message name. let name = message.name(); // Send the message to the peer.