diff --git a/lib-network/src/blockchain_sync/mod.rs b/lib-network/src/blockchain_sync/mod.rs index 89c3af8e..f8f07f8f 100644 --- a/lib-network/src/blockchain_sync/mod.rs +++ b/lib-network/src/blockchain_sync/mod.rs @@ -46,13 +46,20 @@ use anyhow::Result; use lib_crypto::PublicKey; use crate::types::mesh_message::ZhtpMeshMessage; use crate::protocols::NetworkProtocol; +use crate::mtu::{ + BLE_CHUNK_SIZE, BLUETOOTH_CLASSIC_CHUNK_SIZE, WIFI_DIRECT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, +}; use std::time::Duration; -/// Chunk sizes based on protocol capabilities -pub const BLE_CHUNK_SIZE: usize = 200; // Conservative for BLE GATT (247-byte MTU) -pub const CLASSIC_CHUNK_SIZE: usize = 1000; // Bluetooth Classic RFCOMM (larger MTU) -pub const WIFI_CHUNK_SIZE: usize = 1400; // WiFi Direct (can handle more) -pub const DEFAULT_CHUNK_SIZE: usize = 200; // Safe fallback +/// Re-export chunk sizes for backward compatibility +#[deprecated(since = "0.1.0", note = "Use crate::mtu::BLE_CHUNK_SIZE instead")] +pub const BLE_CHUNK_SIZE_COMPAT: usize = BLE_CHUNK_SIZE; + +#[deprecated(since = "0.1.0", note = "Use crate::mtu::BLUETOOTH_CLASSIC_CHUNK_SIZE instead")] +pub const CLASSIC_CHUNK_SIZE: usize = BLUETOOTH_CLASSIC_CHUNK_SIZE; + +#[deprecated(since = "0.1.0", note = "Use crate::mtu::WIFI_DIRECT_CHUNK_SIZE instead")] +pub const WIFI_CHUNK_SIZE: usize = WIFI_DIRECT_CHUNK_SIZE; /// Security constraints - Original limits pub const MAX_CHUNK_BUFFER_SIZE: usize = 10_000_000; // 10MB max buffer per request @@ -82,11 +89,13 @@ pub const MAX_REQUESTS_PER_PEER: usize = 10; /// Get optimal chunk size for protocol pub fn get_chunk_size_for_protocol(protocol: &NetworkProtocol) -> usize { + use crate::mtu::Protocol; + match protocol { - NetworkProtocol::BluetoothLE => BLE_CHUNK_SIZE, - NetworkProtocol::BluetoothClassic => CLASSIC_CHUNK_SIZE, - NetworkProtocol::WiFiDirect => WIFI_CHUNK_SIZE, - NetworkProtocol::TCP | NetworkProtocol::UDP => WIFI_CHUNK_SIZE, + NetworkProtocol::BluetoothLE => Protocol::BluetoothLE.chunk_size(), + NetworkProtocol::BluetoothClassic => Protocol::BluetoothClassic.chunk_size(), + NetworkProtocol::WiFiDirect => Protocol::WiFiDirect.chunk_size(), + NetworkProtocol::TCP | NetworkProtocol::UDP => Protocol::Udp.chunk_size(), _ => DEFAULT_CHUNK_SIZE, } } diff --git a/lib-network/src/discovery/lorawan_hardware.rs b/lib-network/src/discovery/lorawan_hardware.rs index b2febbcb..c33d1040 100644 --- a/lib-network/src/discovery/lorawan_hardware.rs +++ b/lib-network/src/discovery/lorawan_hardware.rs @@ -346,7 +346,7 @@ async fn test_serial_lorawan_module(port: &str, chip_type: &str) -> Result Result { class_a: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10, 11, 12], ..Default::default() }, @@ -414,7 +414,7 @@ async fn detect_raspberry_pi_lorawan_hat() -> Result { class_c: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10, 11, 12], ..Default::default() }, @@ -445,7 +445,7 @@ fn create_sx127x_hardware(chip_name: &str, spi_path: &str) -> LoRaWANHardware { class_c: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![6, 7, 8, 9, 10, 11, 12], ..Default::default() }, @@ -466,7 +466,7 @@ fn create_sx130x_hardware(chip_name: &str, spi_path: &str) -> LoRaWANHardware { class_c: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10, 11, 12], ..Default::default() }, @@ -509,7 +509,7 @@ async fn detect_windows_lorawan() -> Result { class_a: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10], ..Default::default() }, @@ -559,7 +559,7 @@ async fn test_windows_com_lorawan(port_name: &str) -> Result { class_a: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10], ..Default::default() }, @@ -632,7 +632,7 @@ async fn test_macos_usb_lorawan(device_path: &str) -> Result { class_a: true, otaa_support: true, abp_support: true, - max_payload_size: 242, + max_payload_size: crate::mtu::LORAWAN_MAX_PAYLOAD, spreading_factors: vec![7, 8, 9, 10, 11, 12], ..Default::default() }, diff --git a/lib-network/src/fragmentation.rs b/lib-network/src/fragmentation.rs new file mode 100644 index 00000000..a2614264 --- /dev/null +++ b/lib-network/src/fragmentation.rs @@ -0,0 +1,530 @@ +//! Message Fragmentation and Reassembly +//! +//! Unified fragmentation logic for splitting large messages across protocols with +//! limited MTU (Maximum Transmission Unit). +//! +//! ## Features +//! +//! - Protocol-agnostic fragmentation with configurable chunk sizes +//! - Automatic sequence numbering and total fragment tracking +//! - Reassembly with duplicate detection and out-of-order handling +//! - Support for both simple and complex fragmentation schemes +//! +//! ## Usage +//! +//! ```ignore +//! use lib_network::fragmentation::{fragment_message, reassemble_message, FragmentReassembler}; +//! use lib_network::mtu::Protocol; +//! +//! // Fragment a message +//! let payload = vec![0u8; 5000]; +//! let fragments = fragment_message(&payload, Protocol::BluetoothLE.chunk_size()); +//! +//! // Reassemble fragments +//! let mut reassembler = FragmentReassembler::new(); +//! for fragment in fragments { +//! if let Some(complete) = reassembler.add_fragment(fragment)? { +//! println!("Message reassembled: {} bytes", complete.len()); +//! } +//! } +//! ``` + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Fragment header structure for sequencing and reassembly +/// +/// This header is prepended to each fragment payload: +/// - 4 bytes: message_id (u32) +/// - 2 bytes: total_fragments (u16) +/// - 2 bytes: fragment_index (u16) +/// +/// Total overhead: 8 bytes per fragment +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FragmentHeader { + /// Unique message identifier (same for all fragments of a message) + pub message_id: u32, + /// Total number of fragments in this message + pub total_fragments: u16, + /// Zero-based index of this fragment + pub fragment_index: u16, +} + +impl FragmentHeader { + /// Size of the serialized header in bytes + pub const SIZE: usize = 8; + + /// Create a new fragment header + pub fn new(message_id: u32, total_fragments: u16, fragment_index: u16) -> Self { + Self { + message_id, + total_fragments, + fragment_index, + } + } + + /// Serialize header to bytes (8 bytes fixed size) + pub fn to_bytes(&self) -> [u8; Self::SIZE] { + let mut bytes = [0u8; Self::SIZE]; + bytes[0..4].copy_from_slice(&self.message_id.to_le_bytes()); + bytes[4..6].copy_from_slice(&self.total_fragments.to_le_bytes()); + bytes[6..8].copy_from_slice(&self.fragment_index.to_le_bytes()); + bytes + } + + /// Deserialize header from bytes + pub fn from_bytes(bytes: &[u8]) -> Result { + if bytes.len() < Self::SIZE { + return Err(anyhow!("Fragment header too short: {} bytes", bytes.len())); + } + + let message_id = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]); + let total_fragments = u16::from_le_bytes([bytes[4], bytes[5]]); + let fragment_index = u16::from_le_bytes([bytes[6], bytes[7]]); + + Ok(Self { + message_id, + total_fragments, + fragment_index, + }) + } +} + +/// A single message fragment with header and payload +#[derive(Debug, Clone)] +pub struct Fragment { + /// Fragment header (metadata) + pub header: FragmentHeader, + /// Fragment payload (actual data chunk) + pub payload: Vec, +} + +impl Fragment { + /// Create a new fragment + pub fn new(message_id: u32, total_fragments: u16, fragment_index: u16, payload: Vec) -> Self { + Self { + header: FragmentHeader::new(message_id, total_fragments, fragment_index), + payload, + } + } + + /// Serialize fragment to wire format (header + payload) + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::with_capacity(FragmentHeader::SIZE + self.payload.len()); + bytes.extend_from_slice(&self.header.to_bytes()); + bytes.extend_from_slice(&self.payload); + bytes + } + + /// Deserialize fragment from wire format + pub fn from_bytes(bytes: &[u8]) -> Result { + if bytes.len() < FragmentHeader::SIZE { + return Err(anyhow!("Fragment too short: {} bytes", bytes.len())); + } + + let header = FragmentHeader::from_bytes(&bytes[0..FragmentHeader::SIZE])?; + let payload = bytes[FragmentHeader::SIZE..].to_vec(); + + Ok(Self { header, payload }) + } + + /// Get the total size of this fragment (header + payload) + pub fn size(&self) -> usize { + FragmentHeader::SIZE + self.payload.len() + } +} + +/// Fragment a message into chunks suitable for transmission +/// +/// ## Arguments +/// +/// - `payload`: The message data to fragment +/// - `chunk_size`: Maximum size of each fragment payload (excluding 8-byte header) +/// +/// ## Returns +/// +/// Vector of fragments with headers and sequencing information. +/// Each fragment is guaranteed to be <= (chunk_size + 8) bytes when serialized. +/// +/// ## Example +/// +/// ```ignore +/// let payload = vec![0u8; 5000]; +/// let fragments = fragment_message(&payload, 200); +/// assert_eq!(fragments.len(), 25); // 5000 / 200 = 25 fragments +/// ``` +pub fn fragment_message(payload: &[u8], chunk_size: usize) -> Vec { + if payload.is_empty() { + return vec![]; + } + + // Generate unique message ID (use hash of payload for determinism) + let message_id = payload.iter().fold(0u32, |acc, &b| acc.wrapping_add(b as u32)); + + let total_fragments = ((payload.len() + chunk_size - 1) / chunk_size) as u16; + let mut fragments = Vec::with_capacity(total_fragments as usize); + + for (index, chunk) in payload.chunks(chunk_size).enumerate() { + let fragment = Fragment::new( + message_id, + total_fragments, + index as u16, + chunk.to_vec(), + ); + fragments.push(fragment); + } + + fragments +} + +/// Reassemble a complete message from fragments +/// +/// ## Arguments +/// +/// - `fragments`: All fragments of a message (can be out of order) +/// +/// ## Returns +/// +/// The complete reassembled message payload (without headers) +/// +/// ## Errors +/// +/// - Missing fragments +/// - Mismatched message IDs +/// - Duplicate fragment indices +/// +/// ## Example +/// +/// ```ignore +/// let fragments = fragment_message(&payload, 200); +/// let reassembled = reassemble_message(&fragments)?; +/// assert_eq!(reassembled, payload); +/// ``` +pub fn reassemble_message(fragments: &[Fragment]) -> Result> { + if fragments.is_empty() { + return Ok(vec![]); + } + + // Verify all fragments have the same message ID + let message_id = fragments[0].header.message_id; + for fragment in fragments { + if fragment.header.message_id != message_id { + return Err(anyhow!("Fragment message ID mismatch")); + } + } + + let total_fragments = fragments[0].header.total_fragments; + + // Check we have all fragments + if fragments.len() != total_fragments as usize { + return Err(anyhow!( + "Missing fragments: expected {}, got {}", + total_fragments, + fragments.len() + )); + } + + // Sort fragments by index + let mut sorted_fragments = fragments.to_vec(); + sorted_fragments.sort_by_key(|f| f.header.fragment_index); + + // Verify no duplicates and correct sequence + for (i, fragment) in sorted_fragments.iter().enumerate() { + if fragment.header.fragment_index != i as u16 { + return Err(anyhow!( + "Fragment sequence error: expected index {}, got {}", + i, + fragment.header.fragment_index + )); + } + } + + // Concatenate payloads + let total_size: usize = sorted_fragments.iter().map(|f| f.payload.len()).sum(); + let mut reassembled = Vec::with_capacity(total_size); + + for fragment in sorted_fragments { + reassembled.extend_from_slice(&fragment.payload); + } + + Ok(reassembled) +} + +/// Stateful fragment reassembler for streaming reassembly +/// +/// Handles out-of-order fragments and multiple concurrent messages. +#[derive(Debug)] +pub struct FragmentReassembler { + /// In-progress messages: message_id -> collected fragments + pending: HashMap>, + /// Maximum number of concurrent messages to track + max_pending: usize, +} + +impl FragmentReassembler { + /// Create a new reassembler + pub fn new() -> Self { + Self { + pending: HashMap::new(), + max_pending: 100, // Prevent memory exhaustion + } + } + + /// Create a reassembler with custom max pending messages + pub fn with_max_pending(max_pending: usize) -> Self { + Self { + pending: HashMap::new(), + max_pending, + } + } + + /// Add a fragment and attempt reassembly + /// + /// ## Returns + /// + /// - `Some(Vec)`: Complete message if all fragments received + /// - `None`: Message still incomplete + /// + /// ## Errors + /// + /// - Duplicate fragment index + /// - Too many pending messages (DoS protection) + pub fn add_fragment(&mut self, fragment: Fragment) -> Result>> { + let message_id = fragment.header.message_id; + let total_fragments = fragment.header.total_fragments; + let fragment_index = fragment.header.fragment_index; + + // Get or create fragment list + let fragments = self.pending.entry(message_id).or_insert_with(Vec::new); + + // Check for duplicate + if fragments.iter().any(|f| f.header.fragment_index == fragment_index) { + return Err(anyhow!("Duplicate fragment index {}", fragment_index)); + } + + // Add fragment + fragments.push(fragment); + + // Check if complete + if fragments.len() == total_fragments as usize { + // Remove from pending and reassemble + let complete_fragments = self.pending.remove(&message_id).unwrap(); + let reassembled = reassemble_message(&complete_fragments)?; + return Ok(Some(reassembled)); + } + + // DoS protection: limit pending messages + if self.pending.len() > self.max_pending { + // Remove oldest message (simple FIFO eviction) + if let Some(&oldest_id) = self.pending.keys().next() { + self.pending.remove(&oldest_id); + } + } + + Ok(None) + } + + /// Clear all pending fragments + pub fn clear(&mut self) { + self.pending.clear(); + } + + /// Get number of pending messages + pub fn pending_count(&self) -> usize { + self.pending.len() + } + + /// Check if a message is being reassembled + pub fn has_message(&self, message_id: u32) -> bool { + self.pending.contains_key(&message_id) + } + + /// Get fragment count for a pending message + pub fn fragment_count(&self, message_id: u32) -> usize { + self.pending.get(&message_id).map(|v| v.len()).unwrap_or(0) + } +} + +impl Default for FragmentReassembler { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fragment_header_serialization() { + let header = FragmentHeader::new(12345, 10, 5); + let bytes = header.to_bytes(); + let decoded = FragmentHeader::from_bytes(&bytes).unwrap(); + + assert_eq!(header, decoded); + assert_eq!(bytes.len(), FragmentHeader::SIZE); + } + + #[test] + fn test_fragment_serialization() { + let fragment = Fragment::new(999, 5, 2, vec![1, 2, 3, 4, 5]); + let bytes = fragment.to_bytes(); + let decoded = Fragment::from_bytes(&bytes).unwrap(); + + assert_eq!(fragment.header, decoded.header); + assert_eq!(fragment.payload, decoded.payload); + assert_eq!(bytes.len(), FragmentHeader::SIZE + 5); + } + + #[test] + fn test_fragmentation_simple() { + let payload = vec![0u8; 500]; + let fragments = fragment_message(&payload, 100); + + assert_eq!(fragments.len(), 5); + assert_eq!(fragments[0].header.total_fragments, 5); + assert_eq!(fragments[0].header.fragment_index, 0); + assert_eq!(fragments[4].header.fragment_index, 4); + + // All fragments should have same message_id + let msg_id = fragments[0].header.message_id; + assert!(fragments.iter().all(|f| f.header.message_id == msg_id)); + } + + #[test] + fn test_reassembly_in_order() { + let payload = vec![42u8; 1000]; + let fragments = fragment_message(&payload, 200); + + let reassembled = reassemble_message(&fragments).unwrap(); + assert_eq!(reassembled, payload); + } + + #[test] + fn test_reassembly_out_of_order() { + let payload = vec![42u8; 1000]; + let mut fragments = fragment_message(&payload, 200); + + // Shuffle fragments + fragments.reverse(); + + let reassembled = reassemble_message(&fragments).unwrap(); + assert_eq!(reassembled, payload); + } + + #[test] + fn test_reassembler_streaming() { + let payload = vec![123u8; 500]; + let fragments = fragment_message(&payload, 100); + + let mut reassembler = FragmentReassembler::new(); + + // Add fragments one by one + for (i, fragment) in fragments.iter().enumerate() { + let result = reassembler.add_fragment(fragment.clone()).unwrap(); + if i < fragments.len() - 1 { + assert!(result.is_none()); + } else { + assert!(result.is_some()); + assert_eq!(result.unwrap(), payload); + } + } + } + + #[test] + fn test_reassembler_out_of_order() { + let payload = vec![99u8; 300]; + let mut fragments = fragment_message(&payload, 100); + + // Reverse order + fragments.reverse(); + + let mut reassembler = FragmentReassembler::new(); + + for (i, fragment) in fragments.iter().enumerate() { + let result = reassembler.add_fragment(fragment.clone()).unwrap(); + if i < fragments.len() - 1 { + assert!(result.is_none()); + } else { + assert_eq!(result.unwrap(), payload); + } + } + } + + #[test] + fn test_reassembler_duplicate_detection() { + let payload = vec![1u8; 200]; + let fragments = fragment_message(&payload, 100); + + let mut reassembler = FragmentReassembler::new(); + + reassembler.add_fragment(fragments[0].clone()).unwrap(); + let result = reassembler.add_fragment(fragments[0].clone()); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Duplicate")); + } + + #[test] + fn test_reassembler_multiple_messages() { + let payload1 = vec![1u8; 200]; + let payload2 = vec![2u8; 300]; + + let fragments1 = fragment_message(&payload1, 100); + let mut fragments2 = fragment_message(&payload2, 100); + + // Ensure different message IDs + fragments2[0].header.message_id = fragments1[0].header.message_id + 1000; + for f in fragments2.iter_mut() { + f.header.message_id = fragments1[0].header.message_id + 1000; + } + + let mut reassembler = FragmentReassembler::new(); + + // Interleave fragments + reassembler.add_fragment(fragments1[0].clone()).unwrap(); + reassembler.add_fragment(fragments2[0].clone()).unwrap(); + reassembler.add_fragment(fragments1[1].clone()).unwrap(); + reassembler.add_fragment(fragments2[1].clone()).unwrap(); + + let result2 = reassembler.add_fragment(fragments2[2].clone()).unwrap(); + assert!(result2.is_some()); + assert_eq!(result2.unwrap(), payload2); + } + + #[test] + fn test_empty_payload() { + let payload = vec![]; + let fragments = fragment_message(&payload, 100); + assert_eq!(fragments.len(), 0); + + let reassembled = reassemble_message(&fragments).unwrap(); + assert_eq!(reassembled, payload); + } + + #[test] + fn test_single_fragment() { + let payload = vec![42u8; 50]; + let fragments = fragment_message(&payload, 100); + + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].header.total_fragments, 1); + assert_eq!(fragments[0].header.fragment_index, 0); + + let reassembled = reassemble_message(&fragments).unwrap(); + assert_eq!(reassembled, payload); + } + + #[test] + fn test_fragment_size_limit() { + let payload = vec![0u8; 1000]; + let chunk_size = 200; + let fragments = fragment_message(&payload, chunk_size); + + for fragment in fragments { + // Each fragment should be <= chunk_size + header + assert!(fragment.size() <= chunk_size + FragmentHeader::SIZE); + } + } +} diff --git a/lib-network/src/lib.rs b/lib-network/src/lib.rs index a1d07f66..477a23be 100644 --- a/lib-network/src/lib.rs +++ b/lib-network/src/lib.rs @@ -23,6 +23,10 @@ pub use crate::types::*; pub use crate::discovery::*; pub use crate::relays::*; +// MTU and Fragmentation utilities +pub use crate::mtu::{Protocol, BLE_MIN_MTU, BLE_TYPICAL_MTU, BLE_MAX_MTU, BLE_CHUNK_SIZE}; +pub use crate::fragmentation::{fragment_message, reassemble_message, FragmentReassembler, Fragment, FragmentHeader}; + // Unified Peer Identity System (replaces separate NodeId, PeerId, PublicKey systems) pub use crate::identity::{UnifiedPeerId, PeerIdMapper, PeerMapperConfig}; @@ -84,6 +88,8 @@ pub mod mesh; pub mod messaging; pub mod discovery; pub mod relays; +pub mod mtu; // MTU constants +pub mod fragmentation; // Message fragmentation and reassembly pub mod routing; pub mod protocols; diff --git a/lib-network/src/mtu.rs b/lib-network/src/mtu.rs new file mode 100644 index 00000000..1584687a --- /dev/null +++ b/lib-network/src/mtu.rs @@ -0,0 +1,4 @@ +//! MTU (Maximum Transmission Unit) constants. +//! Re-exported from lib-types to keep shared values in a protocol-neutral crate. + +pub use lib_types::mtu::*; diff --git a/lib-network/src/protocols/bluetooth/classic.rs b/lib-network/src/protocols/bluetooth/classic.rs index 59b63e2a..7f8e9562 100644 --- a/lib-network/src/protocols/bluetooth/classic.rs +++ b/lib-network/src/protocols/bluetooth/classic.rs @@ -12,6 +12,7 @@ use anyhow::{Result, anyhow}; use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::sync::RwLock; use tracing::{info, warn, debug}; use serde::{Serialize, Deserialize}; @@ -26,6 +27,13 @@ use super::common::{ parse_mac_address, get_system_bluetooth_mac, }; +// V2 fragmentation imports for session-scoped reassembly +use crate::fragmentation_v2::{ + FragmentReassemblerV2, FragmentV1, ReassemblerConfig, + fragment_message_v2, Protocol as FragProtocol, +}; +use crate::protocols::types::SessionId; + // Windows-specific imports removed - using local imports in methods @@ -69,6 +77,48 @@ pub struct BluetoothClassicProtocol { pub enabled: Arc, /// Whether discovery is currently active pub discovery_active: Arc, + /// Per-connection RFCOMM sessions for v2 fragmentation (peer_address -> session) + pub rfcomm_sessions: Arc>>, +} + +/// RFCOMM session for v2 fragmentation (per-connection state) +/// +/// Each RFCOMM connection has its own session for proper session-scoped +/// fragment reassembly, preventing collision attacks between connections. +#[derive(Debug)] +pub struct RfcommSession { + /// Unique session identifier (cryptographically random) + pub session_id: SessionId, + /// Per-session message sequence counter (monotonic, never reused) + pub message_seq: AtomicU32, + /// V2 fragment reassembler (session-bound) + pub reassembler: FragmentReassemblerV2, +} + +impl RfcommSession { + /// Create a new RFCOMM session with v2 fragmentation + pub fn new() -> Self { + let session_id = SessionId::generate(); + let config = ReassemblerConfig::from_protocol(FragProtocol::BluetoothClassic); + let reassembler = FragmentReassemblerV2::new(session_id.clone(), config); + + Self { + session_id, + message_seq: AtomicU32::new(0), + reassembler, + } + } + + /// Get next message sequence number (thread-safe, monotonic) + pub fn next_message_seq(&self) -> u32 { + self.message_seq.fetch_add(1, Ordering::SeqCst) + } +} + +impl Default for RfcommSession { + fn default() -> Self { + Self::new() + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -525,6 +575,7 @@ impl BluetoothClassicProtocol { message_handler: None, enabled: Arc::new(std::sync::atomic::AtomicBool::new(false)), // Disabled by default discovery_active: Arc::new(std::sync::atomic::AtomicBool::new(false)), + rfcomm_sessions: Arc::new(RwLock::new(HashMap::new())), // V2 fragmentation sessions }) } @@ -1055,6 +1106,9 @@ impl BluetoothClassicProtocol { } /// Send mesh message via RFCOMM + /// + /// Uses v2 fragmentation with session-scoped message sequencing for messages + /// larger than the RFCOMM MTU. pub async fn send_mesh_message(&self, target_address: &str, message: &[u8]) -> Result<()> { info!(" Sending RFCOMM message to {}: {} bytes", target_address, message.len()); @@ -1066,16 +1120,29 @@ impl BluetoothClassicProtocol { let connection = connections.get(target_address).unwrap(); let mtu = connection.mtu as usize; + drop(connections); // RFCOMM has larger MTU (1000 bytes typical) - less fragmentation needed if message.len() <= mtu { self.transmit_rfcomm_packet(message, target_address).await?; } else { - // Fragment message (but with much larger chunks than BLE) - let chunks: Vec<&[u8]> = message.chunks(mtu).collect(); - for (i, chunk) in chunks.iter().enumerate() { - info!("Sending RFCOMM fragment {}/{} ({} bytes)", i + 1, chunks.len(), chunk.len()); - self.transmit_rfcomm_packet(chunk, target_address).await?; + // Get or create session for this connection (v2 fragmentation) + let mut sessions = self.rfcomm_sessions.write().await; + let session = sessions.entry(target_address.to_string()) + .or_insert_with(RfcommSession::new); + + // Fragment message using v2 fragmentation (session-scoped) + use crate::fragmentation_v2::FragmentHeaderV1; + let chunk_size = mtu.saturating_sub(FragmentHeaderV1::SIZE); // Leave room for v2 10-byte header + let message_seq = session.next_message_seq(); + let fragments = fragment_message_v2(message_seq, message, chunk_size)?; + + info!("Fragmenting RFCOMM message into {} parts (msg_seq={})", fragments.len(), message_seq); + + for (i, fragment) in fragments.iter().enumerate() { + let wire_bytes = fragment.to_bytes(); + info!("Sending RFCOMM fragment {}/{} ({} bytes)", i + 1, fragments.len(), wire_bytes.len()); + self.transmit_rfcomm_packet(&wire_bytes, target_address).await?; // Minimal delay for flow control (RFCOMM handles this better than BLE) tokio::time::sleep(tokio::time::Duration::from_micros(500)).await; @@ -1083,7 +1150,6 @@ impl BluetoothClassicProtocol { } // Update connection activity - drop(connections); let mut connections_mut = self.active_connections.write().await; if let Some(conn) = connections_mut.get_mut(target_address) { conn.last_seen = std::time::SystemTime::now() @@ -1095,6 +1161,35 @@ impl BluetoothClassicProtocol { Ok(()) } + /// Add a fragment to the session's v2 reassembler + /// + /// Returns the complete message if all fragments have been received. + pub async fn add_fragment_v2(&self, peer_address: &str, fragment: Vec) -> Result>> { + let mut sessions = self.rfcomm_sessions.write().await; + let session = sessions.entry(peer_address.to_string()) + .or_insert_with(RfcommSession::new); + + // Parse fragment using v2 format (10-byte header) + let parsed = FragmentV1::from_bytes(&fragment)?; + + debug!("RFCOMM fragment received: msg_seq={}, idx={}/{} from {}", + parsed.header.message_seq, + parsed.header.fragment_index, + parsed.header.total_fragments, + peer_address); + + // Delegate to session's v2 reassembler (handles timeout cleanup automatically) + let result = session.reassembler.add_fragment(parsed)?; + + if let Some(ref data) = result { + info!("✅ Reassembled RFCOMM message: {} bytes (session: {})", + data.len(), + session.session_id.to_short_string()); + } + + Ok(result) + } + /// Send mesh message envelope via RFCOMM (NEW - Phase 1) pub async fn send_mesh_envelope( &self, diff --git a/lib-network/src/protocols/bluetooth/gatt.rs b/lib-network/src/protocols/bluetooth/gatt.rs index 952f5950..633e8e22 100644 --- a/lib-network/src/protocols/bluetooth/gatt.rs +++ b/lib-network/src/protocols/bluetooth/gatt.rs @@ -6,10 +6,60 @@ use anyhow::{Result, anyhow}; use tracing::{info, debug, warn}; use serde::{Serialize, Deserialize}; use std::collections::HashMap; +use std::sync::atomic::{AtomicU32, Ordering}; +use crate::mtu::{BLE_MIN_MTU, BLE_MAX_MTU}; +use crate::fragmentation::{fragment_message, FragmentReassembler as CentralizedReassembler, Fragment}; +// V2 fragmentation imports for session-scoped reassembly +use crate::fragmentation_v2::{ + FragmentReassemblerV2, FragmentV1, ReassemblerConfig, + fragment_message_v2, Protocol, +}; +use crate::protocols::types::SessionId; // Placeholder until blockchain integration is relocated. type BlockHeader = Vec; +/// GATT session for tracking per-connection state (v2 fragmentation) +/// +/// Each BLE connection should have its own GattSession for proper +/// session-scoped fragment reassembly. This prevents collision attacks +/// where fragments from different connections could be mixed. +#[derive(Debug)] +pub struct GattSession { + /// Unique session identifier (cryptographically random) + pub session_id: SessionId, + /// Per-session message sequence counter (monotonic, never reused) + pub message_seq: AtomicU32, + /// V2 fragment reassembler (session-bound) + pub reassembler: FragmentReassemblerV2, +} + +impl GattSession { + /// Create a new GATT session with v2 fragmentation + pub fn new() -> Self { + let session_id = SessionId::generate(); + let config = ReassemblerConfig::from_protocol(Protocol::BluetoothLE); + let reassembler = FragmentReassemblerV2::new(session_id.clone(), config); + + Self { + session_id, + message_seq: AtomicU32::new(0), + reassembler, + } + } + + /// Get next message sequence number (thread-safe, monotonic) + pub fn next_message_seq(&self) -> u32 { + self.message_seq.fetch_add(1, Ordering::SeqCst) + } +} + +impl Default for GattSession { + fn default() -> Self { + Self::new() + } +} + /// GATT operation types #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum GattOperation { @@ -59,112 +109,159 @@ pub fn validate_write_size(data: &[u8], mtu: u16) -> Result<()> { } /// Fragment data for GATT transmission +/// +/// **DEPRECATED**: Use crate::fragmentation::fragment_message for new code. +/// This wrapper maintains backward compatibility. pub fn fragment_data(data: &[u8], mtu: u16) -> Vec> { let max_chunk_size = (mtu as usize).saturating_sub(3); + // Simple chunking without headers (backward compatible) data.chunks(max_chunk_size) .map(|chunk| chunk.to_vec()) .collect() } /// Fragment a large message for BLE transmission (with sequencing) -/// Returns Vec of fragments, each containing: [fragment_id:1][total_fragments:1][sequence:1][data...] +/// +/// **REFACTORED**: Now uses centralized fragmentation with standardized 8-byte headers. +/// Returns Vec of wire-format fragments ready for GATT transmission. +/// +/// **DEPRECATED**: Use `fragment_large_message_v2` with a GattSession for new code. pub fn fragment_large_message(message_id: u64, data: &[u8], mtu: u16) -> Vec> { - const HEADER_SIZE: usize = 11; // message_id(8) + total_fragments(2) + sequence(1) - let max_data_per_fragment = (mtu as usize).saturating_sub(3 + HEADER_SIZE); + // ATT overhead is 3 bytes, leave room for our 8-byte header + let chunk_size = (mtu as usize).saturating_sub(3 + 8).max(20); + + // Use centralized fragmentation (produces 8-byte headers) + let fragments = fragment_message(data, chunk_size); + + // Convert to wire format + fragments.into_iter() + .map(|f| f.to_bytes()) + .collect() +} + +/// Fragment a large message for BLE transmission using v2 protocol (session-scoped) +/// +/// This is the preferred method for new code. Uses v2 fragmentation with: +/// - Session-scoped message sequencing (no collisions between sessions) +/// - 10-byte versioned headers +/// - Protocol-specific size limits +/// +/// # Arguments +/// * `session` - The GattSession for this connection +/// * `data` - Message payload to fragment +/// * `mtu` - Negotiated ATT MTU +/// +/// # Returns +/// Vec of wire-format fragments ready for GATT transmission +pub fn fragment_large_message_v2(session: &GattSession, data: &[u8], mtu: u16) -> Result>> { + use crate::fragmentation_v2::FragmentHeaderV1; + + // ATT overhead is 3 bytes, leave room for v2 10-byte header + let chunk_size = (mtu as usize).saturating_sub(3 + FragmentHeaderV1::SIZE).max(20); + + // Get next message sequence from session (monotonic, never reused) + let message_seq = session.next_message_seq(); - let chunks: Vec<&[u8]> = data.chunks(max_data_per_fragment).collect(); - let total_fragments = chunks.len() as u16; + // Use v2 fragmentation (produces 10-byte versioned headers) + let fragments = fragment_message_v2(message_seq, data, chunk_size)?; - chunks.into_iter().enumerate().map(|(index, chunk)| { - let mut fragment = Vec::with_capacity(HEADER_SIZE + chunk.len()); - fragment.extend_from_slice(&message_id.to_le_bytes()); - fragment.extend_from_slice(&total_fragments.to_le_bytes()); - fragment.push(index as u8); - fragment.extend_from_slice(chunk); - fragment - }).collect() + info!("Fragmenting BLE message: msg_seq={}, {} fragments, {} bytes total", + message_seq, fragments.len(), data.len()); + + // Convert to wire format + Ok(fragments.into_iter() + .map(|f| f.to_bytes()) + .collect()) } /// Fragment reassembler for multi-part BLE messages +/// +/// **REFACTORED**: Now delegates to centralized FragmentReassembler. +/// Maintains backward compatibility with existing code. +/// +/// **DEPRECATED**: Use `GattSession.reassembler` for new code (v2 protocol). #[derive(Debug)] pub struct FragmentReassembler { - fragments: HashMap>>, // message_id -> (fragment_index -> data) - total_fragments: HashMap, // message_id -> total count + inner: CentralizedReassembler, } impl FragmentReassembler { pub fn new() -> Self { Self { - fragments: HashMap::new(), - total_fragments: HashMap::new(), + inner: CentralizedReassembler::new(), } } /// Add a fragment and return complete message if all fragments received + /// + /// **UPDATED**: Now uses centralized fragmentation (8-byte headers) + /// **DEPRECATED**: Use `add_fragment_v2` with a GattSession for new code. pub fn add_fragment(&mut self, fragment: Vec) -> Result>> { - if fragment.len() < 11 { - return Err(anyhow!("Fragment too small: {} bytes", fragment.len())); - } + // Parse fragment using centralized format + let parsed = Fragment::from_bytes(&fragment)?; - let message_id = u64::from_le_bytes(fragment[0..8].try_into()?); - let total_fragments = u16::from_le_bytes(fragment[8..10].try_into()?); - let fragment_index = fragment[10]; - let data = fragment[11..].to_vec(); + // Delegate to centralized reassembler + let result = self.inner.add_fragment(parsed)?; - // Store total fragments count - self.total_fragments.insert(message_id, total_fragments); - - // Store this fragment - self.fragments.entry(message_id) - .or_insert_with(HashMap::new) - .insert(fragment_index, data); - - // Check if all fragments received - let received_count = self.fragments.get(&message_id).map(|f| f.len()).unwrap_or(0); - if received_count == total_fragments as usize { - // Reassemble in order - let mut complete_data = Vec::new(); - for i in 0..total_fragments { - if let Some(fragment_data) = self.fragments.get(&message_id).and_then(|f| f.get(&(i as u8))) { - complete_data.extend_from_slice(fragment_data); - } else { - return Err(anyhow!("Missing fragment {} for message {}", i, message_id)); - } - } - - // Clean up - self.fragments.remove(&message_id); - self.total_fragments.remove(&message_id); - - info!(" Reassembled message {} from {} fragments ({} bytes)", - message_id, total_fragments, complete_data.len()); - - return Ok(Some(complete_data)); + if let Some(ref data) = result { + info!("✅ Reassembled message from {} fragments ({} bytes)", + self.inner.pending_count(), data.len()); } - debug!(" Fragment {}/{} received for message {}", - received_count, total_fragments, message_id); - - Ok(None) + Ok(result) } /// Clear stale fragments older than timeout - pub fn cleanup_stale_fragments(&mut self, message_id: u64) { - self.fragments.remove(&message_id); - self.total_fragments.remove(&message_id); - warn!("🗑️ Cleaned up stale fragments for message {}", message_id); + pub fn cleanup_stale_fragments(&mut self, _message_id: u64) { + // Clear all pending (centralized reassembler doesn't track individual message cleanup) + self.inner.clear(); + warn!("🗑️ Cleaned up all stale fragments"); } } +/// Add a fragment using v2 protocol (session-scoped reassembly) +/// +/// This function uses the session's v2 reassembler which provides: +/// - Collision-free reassembly (message_seq disambiguation) +/// - Automatic timeout cleanup +/// - Bounded memory usage +/// - DoS resistance +/// +/// # Arguments +/// * `session` - The GattSession for this connection (must be mutable) +/// * `fragment` - Raw fragment bytes from GATT notification +/// +/// # Returns +/// * `Ok(Some(data))` - Complete message reassembled +/// * `Ok(None)` - Fragment added, message incomplete +/// * `Err(_)` - Invalid fragment, duplicate, or timeout +pub fn add_fragment_v2(session: &mut GattSession, fragment: Vec) -> Result>> { + // Parse fragment using v2 format (10-byte header) + let parsed = FragmentV1::from_bytes(&fragment)?; + + debug!("BLE fragment received: msg_seq={}, idx={}/{}", + parsed.header.message_seq, + parsed.header.fragment_index, + parsed.header.total_fragments); + + // Delegate to session's v2 reassembler (handles timeout cleanup automatically) + let result = session.reassembler.add_fragment(parsed)?; + + if let Some(ref data) = result { + info!("✅ Reassembled BLE message: {} bytes (session: {})", + data.len(), + session.session_id.to_short_string()); + } + + Ok(result) +} + /// Calculate optimal MTU for connection pub fn calculate_optimal_mtu(requested_mtu: u16, max_mtu: u16) -> u16 { - // BLE spec minimum is 23, maximum is typically 512 - const MIN_MTU: u16 = 23; - const MAX_BLE_MTU: u16 = 512; - - let effective_max = max_mtu.min(MAX_BLE_MTU); - requested_mtu.clamp(MIN_MTU, effective_max) + // Use centralized MTU constants + let effective_max = max_mtu.min(BLE_MAX_MTU as u16); + requested_mtu.clamp(BLE_MIN_MTU as u16, effective_max) } /// GATT message types for unified handling diff --git a/lib-network/src/protocols/lorawan.rs b/lib-network/src/protocols/lorawan.rs index 7cc5011d..7c30ee30 100644 --- a/lib-network/src/protocols/lorawan.rs +++ b/lib-network/src/protocols/lorawan.rs @@ -5,13 +5,64 @@ mod gateway_auth; use anyhow::{Result, anyhow}; -use tracing::{info, warn}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use tokio::sync::RwLock; +use tracing::{info, warn, debug}; use lib_crypto::symmetric::chacha20::{encrypt_data}; pub use gateway_auth::{ GatewayAttestation, LoRaDeviceMessage, LoRaWANGatewayAuth, LoRaWanUhpBinding, }; +// V2 fragmentation imports for session-scoped reassembly +use crate::fragmentation_v2::{ + FragmentReassemblerV2, FragmentV1, ReassemblerConfig, + fragment_message_v2, Protocol as FragProtocol, +}; +use crate::protocols::types::SessionId; + +/// LoRaWAN session for v2 fragmentation (per-device state) +/// +/// Each LoRaWAN device connection has its own session for proper session-scoped +/// fragment reassembly, preventing collision attacks between devices. +#[derive(Debug)] +pub struct LoRaWANSession { + /// Unique session identifier (cryptographically random) + pub session_id: SessionId, + /// Per-session message sequence counter (monotonic, never reused) + pub message_seq: AtomicU32, + /// V2 fragment reassembler (session-bound) + pub reassembler: FragmentReassemblerV2, +} + +impl LoRaWANSession { + /// Create a new LoRaWAN session with v2 fragmentation + pub fn new() -> Self { + let session_id = SessionId::generate(); + let config = ReassemblerConfig::from_protocol(FragProtocol::LoRaWAN); + let reassembler = FragmentReassemblerV2::new(session_id.clone(), config); + + Self { + session_id, + message_seq: AtomicU32::new(0), + reassembler, + } + } + + /// Get next message sequence number (thread-safe, monotonic) + pub fn next_message_seq(&self) -> u32 { + self.message_seq.fetch_add(1, Ordering::SeqCst) + } +} + +impl Default for LoRaWANSession { + fn default() -> Self { + Self::new() + } +} + /// LoRaWAN mesh protocol handler pub struct LoRaWANMeshProtocol { /// Node ID for this mesh node @@ -26,6 +77,8 @@ pub struct LoRaWANMeshProtocol { pub discovery_active: bool, /// Optional trust anchor for gateway-mediated auth (ARCH-D-1.9) pub gateway_auth: Option, + /// Per-device LoRaWAN sessions for v2 fragmentation (dev_addr -> session) + pub lora_sessions: Arc>>, } impl LoRaWANMeshProtocol { @@ -49,6 +102,7 @@ impl LoRaWANMeshProtocol { app_key, discovery_active: false, gateway_auth: Some(LoRaWANGatewayAuth::new()?), + lora_sessions: Arc::new(RwLock::new(HashMap::new())), // V2 fragmentation sessions }) } @@ -455,35 +509,45 @@ impl LoRaWANMeshProtocol { } async fn get_max_payload_size(&self) -> Result { + // Use centralized LoRaWAN MTU constant + use crate::mtu::LORAWAN_MAX_PAYLOAD; + // LoRaWAN payload size depends on spreading factor and region // EU868: SF7=242, SF8=242, SF9=115, SF10=59, SF11=59, SF12=59 // US915: SF7=242, SF8=242, SF9=115, SF10=11 - Ok(242) // Conservative estimate for SF7/SF8 + Ok(LORAWAN_MAX_PAYLOAD) // Conservative estimate for SF7/SF8 } + /// Send fragmented message using v2 protocol (session-scoped) + /// + /// Uses v2 fragmentation with session-scoped message sequencing for + /// messages larger than the LoRaWAN payload limit. async fn send_fragmented_message(&self, target_address: &str, message: &[u8]) -> Result<()> { + use crate::fragmentation_v2::FragmentHeaderV1; + let max_payload = self.get_max_payload_size().await?; - let header_size = 8; // Fragment header - let chunk_size = max_payload - header_size; + let chunk_size = max_payload.saturating_sub(FragmentHeaderV1::SIZE); // Leave room for v2 10-byte header - let total_fragments = (message.len() + chunk_size - 1) / chunk_size; - info!(" Fragmenting message into {} parts", total_fragments); + // Get or create session for this device (v2 fragmentation) + let mut sessions = self.lora_sessions.write().await; + let session = sessions.entry(target_address.to_string()) + .or_insert_with(LoRaWANSession::new); - for (fragment_id, chunk) in message.chunks(chunk_size).enumerate() { - let mut fragment = Vec::new(); - - // Fragment header - fragment.extend_from_slice(&(fragment_id as u16).to_le_bytes()); - fragment.extend_from_slice(&(total_fragments as u16).to_le_bytes()); - fragment.extend_from_slice(&(message.len() as u32).to_le_bytes()); - - // Fragment payload - fragment.extend_from_slice(chunk); - - let frame = self.prepare_lorawan_frame(target_address, &fragment).await?; + // Get next message sequence from session counter + let message_seq = session.next_message_seq(); + + // Fragment using v2 protocol + let fragments = fragment_message_v2(message_seq, message, chunk_size)?; + info!("📡 Fragmenting LoRaWAN message into {} parts (msg_seq={})", fragments.len(), message_seq); + + drop(sessions); // Release lock before transmission + + for (i, fragment) in fragments.iter().enumerate() { + let wire_bytes = fragment.to_bytes(); + let frame = self.prepare_lorawan_frame(target_address, &wire_bytes).await?; self.transmit_frame(&frame).await?; - info!("Fragment {}/{} transmitted", fragment_id + 1, total_fragments); + info!("📡 LoRa fragment {}/{} transmitted", i + 1, fragments.len()); // Delay between fragments to respect duty cycle tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -492,6 +556,35 @@ impl LoRaWANMeshProtocol { Ok(()) } + /// Add a received fragment to the session's v2 reassembler + /// + /// Returns the complete message if all fragments have been received. + pub async fn add_fragment_v2(&self, dev_address: &str, fragment: Vec) -> Result>> { + let mut sessions = self.lora_sessions.write().await; + let session = sessions.entry(dev_address.to_string()) + .or_insert_with(LoRaWANSession::new); + + // Parse fragment using v2 format (10-byte header) + let parsed = FragmentV1::from_bytes(&fragment)?; + + debug!("📡 LoRaWAN fragment received: msg_seq={}, idx={}/{} from {}", + parsed.header.message_seq, + parsed.header.fragment_index, + parsed.header.total_fragments, + dev_address); + + // Delegate to session's v2 reassembler (handles timeout cleanup automatically) + let result = session.reassembler.add_fragment(parsed)?; + + if let Some(ref data) = result { + info!("✅ Reassembled LoRaWAN message: {} bytes (session: {})", + data.len(), + session.session_id.to_short_string()); + } + + Ok(result) + } + async fn prepare_lorawan_frame(&self, target_address: &str, payload: &[u8]) -> Result> { let mut frame = Vec::new(); diff --git a/lib-network/src/protocols/quic_mesh.rs b/lib-network/src/protocols/quic_mesh.rs index 600e15e2..8545f070 100644 --- a/lib-network/src/protocols/quic_mesh.rs +++ b/lib-network/src/protocols/quic_mesh.rs @@ -43,6 +43,7 @@ use async_trait::async_trait; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::sync::RwLock; use tracing::{info, warn, debug, error}; @@ -67,6 +68,13 @@ use super::quic_handshake; use crate::types::mesh_message::ZhtpMeshMessage; use crate::messaging::message_handler::MeshMessageHandler; +// V2 fragmentation imports for session-scoped reassembly +use crate::fragmentation_v2::{ + FragmentReassemblerV2, FragmentV1, ReassemblerConfig, + fragment_message_v2, Protocol as FragProtocol, +}; +use crate::protocols::types::SessionId; + /// Default path for TLS certificate pub const DEFAULT_TLS_CERT_PATH: &str = "./data/tls/server.crt"; /// Default path for TLS private key @@ -112,6 +120,7 @@ pub struct QuicMeshProtocol { /// - **Verified peer identity**: Dilithium signatures verified via UHP /// - **Master key**: Derived from UHP session key + Kyber shared secret /// - **Replay protection**: Nonces checked against shared cache +/// - **V2 fragmentation**: Session-scoped message sequencing for large payloads pub struct PqcQuicConnection { /// Underlying QUIC connection quic_conn: Connection, @@ -133,6 +142,15 @@ pub struct PqcQuicConnection { /// New nodes connecting for first time can only request blockchain data /// NOTE: Even in bootstrap mode, UHP handshake is performed for identity verification pub bootstrap_mode: bool, + + /// V2 fragmentation session ID (derived from UHP session for consistency) + frag_session_id: SessionId, + + /// Per-session message sequence counter for v2 fragmentation + message_seq: AtomicU32, + + /// V2 fragment reassembler (session-bound, for receiving large messages) + reassembler: Option, } // NOTE: PqcHandshakeMessage has been REMOVED - authentication bypass vulnerability @@ -787,6 +805,7 @@ impl PqcQuicConnection { /// Create a new PqcQuicConnection from a verified peer and derived keys. /// /// This is the ONLY way to create an authenticated connection. + /// V2 fragmentation session is automatically initialized from the UHP session ID. pub fn from_verified_peer( quic_conn: Connection, peer_addr: SocketAddr, @@ -795,6 +814,17 @@ impl PqcQuicConnection { session_id: [u8; 16], bootstrap_mode: bool, ) -> Self { + // Derive fragmentation session ID from UHP session for consistency + // Extend 16-byte session_id to 32 bytes for SessionId + let mut frag_session_bytes = [0u8; 32]; + frag_session_bytes[..16].copy_from_slice(&session_id); + frag_session_bytes[16..].copy_from_slice(&session_id); // Mirror for 32 bytes + let frag_session_id = SessionId::from_bytes(frag_session_bytes); + + // Create v2 reassembler for receiving fragmented messages + let config = ReassemblerConfig::from_protocol(FragProtocol::QUIC); + let reassembler = FragmentReassemblerV2::new(frag_session_id.clone(), config); + Self { quic_conn, master_key: Some(master_key), @@ -802,8 +832,21 @@ impl PqcQuicConnection { session_id: Some(session_id), peer_addr, bootstrap_mode, + frag_session_id, + message_seq: AtomicU32::new(0), + reassembler: Some(reassembler), } } + + /// Get next message sequence number for v2 fragmentation (thread-safe, monotonic) + pub fn next_message_seq(&self) -> u32 { + self.message_seq.fetch_add(1, Ordering::SeqCst) + } + + /// Get the v2 fragmentation session ID + pub fn frag_session_id(&self) -> &SessionId { + &self.frag_session_id + } /// Get the underlying QUIC connection pub fn get_connection(&self) -> &Connection { @@ -901,6 +944,79 @@ impl PqcQuicConnection { debug!("📥 Received {} bytes (double-decrypted: TLS 1.3 + UHP+Kyber)", decrypted.len()); Ok(decrypted) } + + /// Send large message with v2 fragmentation (session-scoped) + /// + /// For messages that need to be split across multiple QUIC streams or + /// exceed typical stream buffer sizes, this method provides v2 fragmentation + /// with session-scoped message sequencing. + /// + /// # Arguments + /// * `message` - Message payload to send + /// * `chunk_size` - Maximum fragment payload size (default: 64KB for QUIC) + pub async fn send_fragmented_message(&mut self, message: &[u8], chunk_size: usize) -> Result<()> { + let master_key = self.master_key + .ok_or_else(|| anyhow!("UHP+Kyber handshake not complete"))?; + + // Get next message sequence from session counter + let message_seq = self.next_message_seq(); + + // Fragment using v2 protocol + let fragments = fragment_message_v2(message_seq, message, chunk_size)?; + + info!("📤 Sending fragmented QUIC message: msg_seq={}, {} fragments, {} bytes", + message_seq, fragments.len(), message.len()); + + for (i, fragment) in fragments.iter().enumerate() { + let wire_bytes = fragment.to_bytes(); + + // Encrypt fragment with master key + let encrypted = encrypt_data(&wire_bytes, &master_key)?; + + // Send over QUIC stream + let mut stream = self.quic_conn.open_uni().await?; + stream.write_all(&encrypted).await?; + stream.finish()?; + + debug!("📤 Sent QUIC fragment {}/{} ({} bytes)", i + 1, fragments.len(), wire_bytes.len()); + } + + Ok(()) + } + + /// Add a received fragment to the v2 reassembler + /// + /// Returns the complete message if all fragments have been received. + /// Call this for each incoming fragment from QUIC streams. + pub fn add_fragment_v2(&mut self, encrypted_fragment: &[u8]) -> Result>> { + let master_key = self.master_key + .ok_or_else(|| anyhow!("UHP+Kyber handshake not complete"))?; + + // Decrypt fragment + let wire_bytes = decrypt_data(encrypted_fragment, &master_key)?; + + // Parse v2 fragment + let fragment = FragmentV1::from_bytes(&wire_bytes)?; + + debug!("📥 QUIC fragment received: msg_seq={}, idx={}/{}", + fragment.header.message_seq, + fragment.header.fragment_index, + fragment.header.total_fragments); + + // Add to reassembler + let reassembler = self.reassembler.as_mut() + .ok_or_else(|| anyhow!("V2 reassembler not initialized"))?; + + let result = reassembler.add_fragment(fragment)?; + + if let Some(ref data) = result { + info!("✅ Reassembled QUIC message: {} bytes (session: {})", + data.len(), + self.frag_session_id.to_short_string()); + } + + Ok(result) + } // ======================================================================== // REMOVED: Legacy methods that bypassed authentication diff --git a/lib-network/src/protocols/wifi_direct.rs b/lib-network/src/protocols/wifi_direct.rs index f31bb20e..c9336e30 100644 --- a/lib-network/src/protocols/wifi_direct.rs +++ b/lib-network/src/protocols/wifi_direct.rs @@ -5,6 +5,7 @@ use anyhow::Result; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::sync::RwLock; use tracing::{info, warn, error, debug}; use serde::{Serialize, Deserialize}; @@ -18,6 +19,13 @@ use crate::protocols::enhanced_wifi_direct::{ MacOSWiFiDirectManager, AdvancedWPSSecurity, MacOSWiFiInterface, MacOSP2PGroup }; +// V2 fragmentation imports for session-scoped reassembly +use crate::fragmentation_v2::{ + FragmentReassemblerV2, FragmentV1, ReassemblerConfig, + fragment_message_v2, Protocol as FragProtocol, +}; +use crate::protocols::types::SessionId; + // Network and time imports removed - using system commands instead /// P2P Group Owner negotiation parameters @@ -189,6 +197,48 @@ pub struct WiFiDirectMeshProtocol { pub hidden_ssid: bool, /// WiFi Direct enabled state (starts OFF by default for security) pub enabled: Arc>, + /// Per-device WiFi Direct sessions for v2 fragmentation (mac_address -> session) + pub wifi_sessions: Arc>>, +} + +/// WiFi Direct session for v2 fragmentation (per-device state) +/// +/// Each WiFi Direct connection has its own session for proper session-scoped +/// fragment reassembly, preventing collision attacks between devices. +#[derive(Debug)] +pub struct WiFiDirectSession { + /// Unique session identifier (cryptographically random) + pub session_id: SessionId, + /// Per-session message sequence counter (monotonic, never reused) + pub message_seq: AtomicU32, + /// V2 fragment reassembler (session-bound) + pub reassembler: FragmentReassemblerV2, +} + +impl WiFiDirectSession { + /// Create a new WiFi Direct session with v2 fragmentation + pub fn new() -> Self { + let session_id = SessionId::generate(); + let config = ReassemblerConfig::from_protocol(FragProtocol::WiFiDirect); + let reassembler = FragmentReassemblerV2::new(session_id.clone(), config); + + Self { + session_id, + message_seq: AtomicU32::new(0), + reassembler, + } + } + + /// Get next message sequence number (thread-safe, monotonic) + pub fn next_message_seq(&self) -> u32 { + self.message_seq.fetch_add(1, Ordering::SeqCst) + } +} + +impl Default for WiFiDirectSession { + fn default() -> Self { + Self::new() + } } /// Persistent P2P Group information @@ -300,6 +350,7 @@ impl WiFiDirectMeshProtocol { authenticated_peers: Arc::new(RwLock::new(HashMap::new())), hidden_ssid: true, // SECURITY: Hidden SSID by default to prevent non-ZHTP connections enabled: Arc::new(RwLock::new(false)), // SECURITY: WiFi Direct starts OFF for privacy/security + wifi_sessions: Arc::new(RwLock::new(HashMap::new())), // V2 fragmentation sessions }) } @@ -3070,6 +3121,78 @@ impl WiFiDirectMeshProtocol { } } + /// Send large message with v2 fragmentation (session-scoped) + /// + /// For messages that exceed WiFi Direct MTU, this method provides v2 fragmentation + /// with session-scoped message sequencing. + /// + /// # Arguments + /// * `target_address` - Device MAC address or IP + /// * `message` - Message payload to send + /// * `chunk_size` - Maximum fragment payload size (default: 64KB for WiFi Direct) + pub async fn send_fragmented_message_v2(&self, target_address: &str, message: &[u8], chunk_size: usize) -> Result<()> { + use crate::fragmentation_v2::FragmentHeaderV1; + + info!("📶 Sending fragmented WiFi Direct message to {}: {} bytes", target_address, message.len()); + + // Get or create session for this device (v2 fragmentation) + let mut sessions = self.wifi_sessions.write().await; + let session = sessions.entry(target_address.to_string()) + .or_insert_with(WiFiDirectSession::new); + + // Get next message sequence from session counter + let message_seq = session.next_message_seq(); + + // Calculate effective chunk size + let effective_chunk_size = chunk_size.saturating_sub(FragmentHeaderV1::SIZE); + + // Fragment using v2 protocol + let fragments = fragment_message_v2(message_seq, message, effective_chunk_size)?; + info!("📶 Fragmenting WiFi Direct message into {} parts (msg_seq={})", fragments.len(), message_seq); + + drop(sessions); // Release lock before transmission + + for (i, fragment) in fragments.iter().enumerate() { + let wire_bytes = fragment.to_bytes(); + + // Send fragment via existing mesh message mechanism + self.send_mesh_message(target_address, &wire_bytes).await?; + + debug!("📶 WiFi Direct fragment {}/{} sent ({} bytes)", i + 1, fragments.len(), wire_bytes.len()); + } + + Ok(()) + } + + /// Add a received fragment to the session's v2 reassembler + /// + /// Returns the complete message if all fragments have been received. + pub async fn add_fragment_v2(&self, mac_address: &str, fragment: Vec) -> Result>> { + let mut sessions = self.wifi_sessions.write().await; + let session = sessions.entry(mac_address.to_string()) + .or_insert_with(WiFiDirectSession::new); + + // Parse fragment using v2 format (10-byte header) + let parsed = FragmentV1::from_bytes(&fragment)?; + + debug!("📶 WiFi Direct fragment received: msg_seq={}, idx={}/{} from {}", + parsed.header.message_seq, + parsed.header.fragment_index, + parsed.header.total_fragments, + mac_address); + + // Delegate to session's v2 reassembler (handles timeout cleanup automatically) + let result = session.reassembler.add_fragment(parsed)?; + + if let Some(ref data) = result { + info!("✅ Reassembled WiFi Direct message: {} bytes (session: {})", + data.len(), + session.session_id.to_short_string()); + } + + Ok(result) + } + /// Transmit data over established WiFi Direct connection async fn transmit_over_wifi_direct(&self, device: &WiFiDirectConnection, message: &[u8]) -> Result<()> { #[cfg(target_os = "linux")] diff --git a/lib-storage/src/dht/transport.rs b/lib-storage/src/dht/transport.rs index 42fbb366..25d5d21f 100644 --- a/lib-storage/src/dht/transport.rs +++ b/lib-storage/src/dht/transport.rs @@ -104,13 +104,18 @@ pub trait DhtTransport: Send + Sync { /// Get maximum transmission unit for this transport fn mtu(&self) -> usize { + // Use centralized MTU constants from lib-types + use lib_types::mtu::{ + UDP_MTU, BLE_MAX_MTU, WIFI_DIRECT_MTU, LORAWAN_MAX_PAYLOAD, QUIC_MTU, MESH_MTU + }; + match self.local_peer_id() { - PeerId::Udp(_) => 1400, // UDP mesh default - PeerId::Bluetooth(_) => 512, // BLE MTU minus overhead - PeerId::WiFiDirect(_) => 1400, // Similar to UDP - PeerId::LoRaWAN(_) => 242, // LoRaWAN SF7 max payload - PeerId::Quic(_) => 1200, // QUIC recommended MTU - PeerId::Mesh(_) => 65536, // Mesh handles fragmentation + PeerId::Udp(_) => UDP_MTU, + PeerId::Bluetooth(_) => BLE_MAX_MTU, + PeerId::WiFiDirect(_) => WIFI_DIRECT_MTU, + PeerId::LoRaWAN(_) => LORAWAN_MAX_PAYLOAD, + PeerId::Quic(_) => QUIC_MTU, + PeerId::Mesh(_) => MESH_MTU, } } diff --git a/lib-types/src/lib.rs b/lib-types/src/lib.rs index 34557091..7acb3ea2 100644 --- a/lib-types/src/lib.rs +++ b/lib-types/src/lib.rs @@ -5,8 +5,10 @@ pub mod node_id; pub mod dht; pub mod chunk; pub mod errors; +pub mod mtu; pub use node_id::NodeId; pub use dht::*; pub use chunk::*; pub use errors::*; +pub use mtu::*; diff --git a/lib-types/src/mtu.rs b/lib-types/src/mtu.rs new file mode 100644 index 00000000..b99f0798 --- /dev/null +++ b/lib-types/src/mtu.rs @@ -0,0 +1,178 @@ +//! MTU (Maximum Transmission Unit) Constants +//! +//! Centralized MTU values for all network protocols to ensure consistency +//! and prevent hardcoded values scattered across the codebase. +//! +//! ## Protocol-Specific MTU Values +//! +//! - **BLE (Bluetooth Low Energy)**: 512 bytes (max negotiated), 247 bytes (typical) +//! - **Bluetooth Classic (RFCOMM)**: 1000 bytes +//! - **LoRaWAN**: 242 bytes (SF7/SF8), lower for higher spreading factors +//! - **WiFi Direct**: 1400 bytes (safe UDP payload) +//! - **UDP**: 1400 bytes (avoids fragmentation on most networks) +//! - **QUIC**: 1200 bytes (conservative for initial packets) +//! - **Mesh**: 65536 bytes (internal routing, no wire limit) + +use serde::{Deserialize, Serialize}; + +/// BLE minimum MTU (ATT protocol minimum) +pub const BLE_MIN_MTU: usize = 23; + +/// BLE typical MTU (common negotiated value) +pub const BLE_TYPICAL_MTU: usize = 247; + +/// BLE maximum MTU (maximum negotiated value) +pub const BLE_MAX_MTU: usize = 512; + +/// BLE recommended chunk size for fragmentation (leaves room for headers) +pub const BLE_CHUNK_SIZE: usize = 200; + +/// Bluetooth Classic (RFCOMM) MTU +pub const BLUETOOTH_CLASSIC_MTU: usize = 1000; + +/// Bluetooth Classic recommended chunk size +pub const BLUETOOTH_CLASSIC_CHUNK_SIZE: usize = 1000; + +/// LoRaWAN maximum payload size (SF7/SF8) +/// Note: Higher spreading factors have lower maximums: +/// - SF9: 115 bytes +/// - SF10: 51 bytes +/// - SF11: 11 bytes +/// - SF12: 11 bytes +pub const LORAWAN_MAX_PAYLOAD: usize = 242; + +/// WiFi Direct MTU (safe UDP payload size) +pub const WIFI_DIRECT_MTU: usize = 1400; + +/// WiFi Direct recommended chunk size +pub const WIFI_DIRECT_CHUNK_SIZE: usize = 1400; + +/// UDP recommended MTU (avoids IP fragmentation on most networks) +pub const UDP_MTU: usize = 1400; + +/// QUIC conservative MTU (for initial packets before path MTU discovery) +pub const QUIC_MTU: usize = 1200; + +/// Internal mesh routing MTU (no wire constraint) +pub const MESH_MTU: usize = 65536; + +/// Default chunk size for unknown protocols +pub const DEFAULT_CHUNK_SIZE: usize = 200; + +/// Protocol identifier for MTU selection +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Protocol { + BluetoothLE, + BluetoothClassic, + LoRaWAN, + WiFiDirect, + Udp, + Quic, + Mesh, +} + +impl Protocol { + /// Get the MTU for this protocol + pub fn mtu(&self) -> usize { + match self { + Protocol::BluetoothLE => BLE_MAX_MTU, + Protocol::BluetoothClassic => BLUETOOTH_CLASSIC_MTU, + Protocol::LoRaWAN => LORAWAN_MAX_PAYLOAD, + Protocol::WiFiDirect => WIFI_DIRECT_MTU, + Protocol::Udp => UDP_MTU, + Protocol::Quic => QUIC_MTU, + Protocol::Mesh => MESH_MTU, + } + } + + /// Get the recommended chunk size for this protocol + /// (MTU minus typical header overhead) + pub fn chunk_size(&self) -> usize { + match self { + Protocol::BluetoothLE => BLE_CHUNK_SIZE, + Protocol::BluetoothClassic => BLUETOOTH_CLASSIC_CHUNK_SIZE, + Protocol::LoRaWAN => LORAWAN_MAX_PAYLOAD, // LoRa has no chunking, use full payload + Protocol::WiFiDirect => WIFI_DIRECT_CHUNK_SIZE, + Protocol::Udp => UDP_MTU, + Protocol::Quic => QUIC_MTU, + Protocol::Mesh => MESH_MTU, + } + } + + /// Get the MTU with a custom negotiated value (for BLE) + pub fn negotiated_mtu(&self, negotiated: usize) -> usize { + match self { + Protocol::BluetoothLE => negotiated.min(BLE_MAX_MTU).max(BLE_MIN_MTU), + _ => self.mtu(), + } + } + + /// Get chunk size for a negotiated MTU (leaves room for protocol headers) + pub fn chunk_size_for_mtu(&self, mtu: usize) -> usize { + match self { + Protocol::BluetoothLE => { + // Leave room for fragment header (8 bytes: msg_id + total + index + flags) + mtu.saturating_sub(8).max(20) + } + Protocol::BluetoothClassic => { + // Leave room for RFCOMM overhead + mtu.saturating_sub(10).max(100) + } + Protocol::LoRaWAN => { + // LoRa has fixed payload, no additional headers + mtu + } + _ => { + // General UDP/IP overhead + mtu.saturating_sub(50).max(512) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_protocol_mtu() { + assert_eq!(Protocol::BluetoothLE.mtu(), BLE_MAX_MTU); + assert_eq!(Protocol::BluetoothClassic.mtu(), BLUETOOTH_CLASSIC_MTU); + assert_eq!(Protocol::LoRaWAN.mtu(), LORAWAN_MAX_PAYLOAD); + assert_eq!(Protocol::WiFiDirect.mtu(), WIFI_DIRECT_MTU); + } + + #[test] + fn test_negotiated_mtu() { + // BLE MTU clamping + assert_eq!(Protocol::BluetoothLE.negotiated_mtu(1000), BLE_MAX_MTU); + assert_eq!(Protocol::BluetoothLE.negotiated_mtu(10), BLE_MIN_MTU); + assert_eq!(Protocol::BluetoothLE.negotiated_mtu(247), 247); + + // Other protocols ignore negotiation + assert_eq!(Protocol::Udp.negotiated_mtu(1000), UDP_MTU); + } + + #[test] + fn test_chunk_size_calculation() { + // BLE chunk should leave room for headers + assert!(Protocol::BluetoothLE.chunk_size_for_mtu(512) < 512); + assert!(Protocol::BluetoothLE.chunk_size_for_mtu(512) >= 500); + + // LoRa uses full payload + assert_eq!( + Protocol::LoRaWAN.chunk_size_for_mtu(LORAWAN_MAX_PAYLOAD), + LORAWAN_MAX_PAYLOAD + ); + } + + #[test] + fn test_constants() { + // Sanity checks + assert!(BLE_MIN_MTU < BLE_TYPICAL_MTU); + assert!(BLE_TYPICAL_MTU < BLE_MAX_MTU); + assert!(BLE_CHUNK_SIZE < BLE_TYPICAL_MTU); + assert!(LORAWAN_MAX_PAYLOAD < UDP_MTU); + assert!(QUIC_MTU < UDP_MTU); + } +} diff --git a/zhtp/src/server/protocols/bluetooth_le.rs b/zhtp/src/server/protocols/bluetooth_le.rs index bcd6a078..65445883 100644 --- a/zhtp/src/server/protocols/bluetooth_le.rs +++ b/zhtp/src/server/protocols/bluetooth_le.rs @@ -212,7 +212,7 @@ impl BluetoothRouter { let ble_connection = lib_network::protocols::bluetooth::BluetoothConnection { peer_id: handshake.node_id.to_string(), address: gatt_address.clone(), - mtu: 247, // Default BLE MTU + mtu: lib_network::mtu::BLE_TYPICAL_MTU as u16, // Use centralized MTU constant rssi: -50, // Placeholder RSSI connected_at: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/zhtp/src/web4_stub.rs b/zhtp/src/web4_stub.rs index 5fc02831..1742cab9 100644 --- a/zhtp/src/web4_stub.rs +++ b/zhtp/src/web4_stub.rs @@ -952,7 +952,7 @@ impl DhtTransport for MeshDhtTransport { } fn mtu(&self) -> usize { - 1200 + lib_network::mtu::QUIC_MTU } fn typical_latency_ms(&self) -> u32 {