Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions node/router/messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ test = [ ]
[dependencies.bytes]
workspace = true

[dependencies.sha2]
version = "0.10"
default-features = false

[dependencies.snarkos-node-network]
workspace = true

Expand Down
97 changes: 97 additions & 0 deletions node/router/messages/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2019-2025 Provable Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MessageTrait;

use snarkvm::prelude::{FromBytes, ToBytes};
use std::{
borrow::Cow,
fmt::{self, Write},
io::{self, Read},
};

#[derive(Clone)]
pub struct MessageChunk {
/// The hash of the original message.
pub hash: [u8; 32],
/// The index of the chunk.
pub idx: u16,
/// A flag indicating whether this is the final chunk.
pub last: bool,
/// The bytes representing the chunk.
pub blob: Box<[u8]>,
}

impl fmt::Debug for MessageChunk {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut hash = String::with_capacity(8);
for byte in &self.hash[..4] {
write!(hash, "{byte:02x}")?;
}

f.debug_struct("MessageChunk").field("hash", &hash).field("idx", &self.idx).field("last", &self.last).finish()
}
}

impl PartialEq for MessageChunk {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash && self.idx == other.idx
}
}
impl Eq for MessageChunk {}

impl MessageChunk {
pub fn new(hash: [u8; 32], idx: u16, last: bool, blob: Box<[u8]>) -> Self {
Self { hash, idx, last, blob }
}
}

impl MessageTrait for MessageChunk {
/// Returns the message name.
#[inline]
fn name(&self) -> Cow<'static, str> {
let mut hash = String::with_capacity(8);
for byte in &self.hash[..4] {
let _ = write!(hash, "{byte:02x}"); // in-memory, shouldn't fail
}
format!("Chunk {}{} of Message {}", self.idx, if self.last { " (final)" } else { "" }, hash).into()
}
}

impl ToBytes for MessageChunk {
fn write_le<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
self.hash.write_le(&mut writer)?;
self.idx.write_le(&mut writer)?;
self.last.write_le(&mut writer)?;
(self.blob.len() as u32).write_le(&mut writer)?;
writer.write_all(&self.blob)?;

Ok(())
}
}

impl FromBytes for MessageChunk {
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
let hash = <[u8; 32]>::read_le(&mut reader)?;
let idx = u16::read_le(&mut reader)?;
let last = bool::read_le(&mut reader)?;
let blob_len = u32::read_le(&mut reader)?;
let mut blob = Vec::new();
(&mut reader).take(blob_len as u64).read_to_end(&mut blob)?;
let blob = blob.into();

Ok(Self { hash, idx, last, blob })
}
}
3 changes: 2 additions & 1 deletion node/router/messages/src/helpers/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
const MAXIMUM_HANDSHAKE_MESSAGE_SIZE: usize = 1024 * 1024; // 1 MiB

/// The maximum size of a message that can be transmitted in the network.
pub(crate) const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB
// TODO: with message chunking in place, it can be greatly reduced.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we surely need is message/event-specific limits. For the ones with a compile-time sizeof you could already set it a sensible lower value. We could also set consts whose correctness is checked in a unit test...

pub const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB

/// The codec used to decode and encode network `Message`s.
pub struct MessageCodec<N: Network> {
Expand Down
2 changes: 1 addition & 1 deletion node/router/messages/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

mod codec;
pub use codec::MessageCodec;
pub use codec::{MAXIMUM_MESSAGE_SIZE, MessageCodec};

mod disconnect;
pub use disconnect::DisconnectReason;
98 changes: 96 additions & 2 deletions node/router/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub use challenge_request::ChallengeRequest;
mod challenge_response;
pub use challenge_response::ChallengeResponse;

mod chunk;
pub use chunk::MessageChunk;

mod disconnect;
pub use disconnect::Disconnect;

Expand Down Expand Up @@ -72,10 +75,12 @@ use snarkvm::prelude::{
ToBytes,
block::{Header, Transaction},
error,
io_error,
puzzle::{Solution, SolutionID},
};

use std::{borrow::Cow, io, net::SocketAddr};
use sha2::{Digest, Sha256};
use std::{borrow::Cow, io, marker::PhantomData, mem, net::SocketAddr};

pub trait MessageTrait: ToBytes + FromBytes {
/// Returns the message name.
Expand All @@ -97,6 +102,58 @@ pub enum Message<N: Network> {
PuzzleResponse(PuzzleResponse<N>),
UnconfirmedSolution(UnconfirmedSolution<N>),
UnconfirmedTransaction(UnconfirmedTransaction<N>),
Chunk(MessageChunk),
}

/// An iterator that produces chunks from a Message.
#[doc(hidden)]
pub struct ChunkIter<N: Network> {
/// The hash of the complete Message.
hash: [u8; 32],
/// The remaining serialized bytes to build chunks from.
serialized: Vec<u8>,
/// The chunk index.
idx: u16,
_phantom: PhantomData<N>,
}

impl<N: Network> ChunkIter<N> {
pub fn new(message: &Message<N>) -> io::Result<Self> {
let serialized =
message.to_bytes_le().map_err(|_| io_error(format!("failed to serialize {}", message.name())))?;
let digest = Sha256::digest(&serialized);
let mut hash = [0u8; 32];
hash.copy_from_slice(&digest);

Ok(Self { hash, serialized, idx: 0, _phantom: Default::default() })
}
}

impl<N: Network> Iterator for ChunkIter<N> {
type Item = MessageChunk;

fn next(&mut self) -> Option<Self::Item> {
let remaining_len = self.serialized.len();

// If the serialized bytes are exhausted, there is nothing more to chunk.
if remaining_len == 0 {
return None;
}

// Isolate a single chunk.
let chunk_len = Message::<N>::MAX_CHUNK_LEN.min(remaining_len);
let chunk = if remaining_len <= Message::<N>::MAX_CHUNK_LEN {
let chunk_blob = mem::take(&mut self.serialized).into();
MessageChunk::new(self.hash, self.idx, true, chunk_blob)
} else {
let chunk_blob: Box<[u8]> = self.serialized.drain(0..chunk_len).collect();
MessageChunk::new(self.hash, self.idx, false, chunk_blob)
};

self.idx += 1;

Some(chunk)
}
}

impl<N: Network> From<DisconnectReason> for Message<N> {
Expand All @@ -105,7 +162,40 @@ impl<N: Network> From<DisconnectReason> for Message<N> {
}
}

impl<N: Network> TryFrom<Vec<MessageChunk>> for Message<N> {
type Error = io::Error;

fn try_from(chunks: Vec<MessageChunk>) -> io::Result<Self> {
if chunks.is_empty() {
return Err(error("A message can't be reconstructed from an empty list of chunks"));
}

// Recreate the full Message from the chunks.
let mut full = Vec::new();
for chunk in &chunks {
full.extend_from_slice(&chunk.blob);
}
let full_len = full.len();

// Verify the hash of the recreated message.
let expected_hash = chunks.first().unwrap().hash;
let recreated_hash = Sha256::digest(&full);
if *recreated_hash != expected_hash {
return Err(error("The hash of a chunked message doesn't match its expected value"));
}

// Parse the full Message.
let mut reader = io::Cursor::new(full);
let message = Message::read_le(&mut reader)?;
debug!("recreated a {}B {} from {} chunks", full_len, message.name(), chunks.len());

Ok(message)
}
}

impl<N: Network> Message<N> {
/// The maximum size of a chunk in a chunked message.
pub const MAX_CHUNK_LEN: usize = 64 * 1024;
/// The version of the network protocol; this is incremented for breaking changes between migration versions.
// Note. This should be incremented for each new `ConsensusVersion` that is added.
pub const VERSIONS: [(ConsensusVersion, u32); 7] = [
Expand Down Expand Up @@ -167,6 +257,7 @@ impl<N: Network> Message<N> {
Self::PuzzleResponse(message) => message.name(),
Self::UnconfirmedSolution(message) => message.name(),
Self::UnconfirmedTransaction(message) => message.name(),
Self::Chunk(message) => message.name(),
}
}

Expand All @@ -187,6 +278,7 @@ impl<N: Network> Message<N> {
Self::PuzzleResponse(..) => 10,
Self::UnconfirmedSolution(..) => 11,
Self::UnconfirmedTransaction(..) => 12,
Self::Chunk(..) => 13,
}
}

Expand Down Expand Up @@ -231,6 +323,7 @@ impl<N: Network> ToBytes for Message<N> {
Self::PuzzleResponse(message) => message.write_le(writer),
Self::UnconfirmedSolution(message) => message.write_le(writer),
Self::UnconfirmedTransaction(message) => message.write_le(writer),
Self::Chunk(message) => message.write_le(writer),
}
}
}
Expand All @@ -257,7 +350,8 @@ impl<N: Network> FromBytes for Message<N> {
10 => Self::PuzzleResponse(PuzzleResponse::read_le(&mut reader)?),
11 => Self::UnconfirmedSolution(UnconfirmedSolution::read_le(&mut reader)?),
12 => Self::UnconfirmedTransaction(UnconfirmedTransaction::read_le(&mut reader)?),
13.. => return Err(error("Unknown message ID {id}")),
13 => Self::Chunk(MessageChunk::read_le(&mut reader)?),
14.. => return Err(error(format!("Unknown message ID {id}"))),
};

// Ensure that there are no "dangling" bytes.
Expand Down
57 changes: 56 additions & 1 deletion node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
BlockRequest,
BlockResponse,
DataBlocks,
MAXIMUM_MESSAGE_SIZE,
Message,
PeerResponse,
Ping,
Expand All @@ -31,13 +32,14 @@ use crate::{
use snarkos_node_tcp::protocols::Reading;
use snarkvm::prelude::{
ConsensusVersion,
FromBytes,
Network,
block::{Block, Header, Transaction},
puzzle::Solution,
};

use anyhow::{Result, anyhow, bail};
use std::net::SocketAddr;
use std::{collections::hash_map::Entry, io, net::SocketAddr};
use tokio::task::spawn_blocking;

/// The max number of peers to send in a `PeerResponse` message.
Expand Down Expand Up @@ -303,6 +305,59 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
}
}
Message::Chunk(chunk) => {
// Single-chunk messages don't require any lookups.
let message = if chunk.idx == 0 && chunk.last {
let mut reader = io::Cursor::new(chunk.blob);
Some(Message::read_le(&mut reader)?)
} else {
let mut message_chunks = self.router().message_chunks.lock();
let peers_chunks = message_chunks.entry(peer_ip).or_default();

match peers_chunks.entry(chunk.hash) {
Entry::Vacant(entry) => {
// A new chunked message - start saving the chunks.
let mut chunks = Vec::with_capacity(8);
chunks.push(chunk);
entry.insert(chunks);

None
}
Entry::Occupied(mut entry) => {
// Ensure correct chunk ordering.
let previous_idx = entry.get().last().map(|c| c.idx).unwrap_or_default();
if chunk.idx != previous_idx + 1 {
entry.remove();
bail!("Peer '{peer_ip}' sent an out-of-order message chunk");
}

// Ensure that the overall network message size hasn't been breached.
if (chunk.idx as usize + 1) * Message::<N>::MAX_CHUNK_LEN > MAXIMUM_MESSAGE_SIZE {
entry.remove();
bail!("Peer '{peer_ip}' sent an oversized message");
}

if chunk.last {
// Final chunk; remove the entry and recreate the full message.
let mut chunks = entry.remove();
chunks.push(chunk);
let message = Message::<N>::try_from(chunks)?;

Some(message)
} else {
// Save this chunk; more will follow.
entry.get_mut().push(chunk);

None
}
}
}
};
match message {
Some(message) => self.inbound(peer_addr, message).await,
None => Ok(true),
}
}
}
}

Expand Down
Loading