Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

# Whitelist valid technical terms to avoid typos false positives
[default.extend-words]
# French for coffee, used in UTF-8 test strings (cafe with accent)
caf = "caf"
# Valid bi-directional map data structure term
bimap = "bimap"
# Valid compression algorithm abbreviation
Expand All @@ -29,8 +31,6 @@ bais = "bais"
Strat = "Strat"
# Same as above
strin = "strin"
# Valid prefix in Unicode escape test strings (e.g. "caf\u{00e9}" = cafe)
caf = "caf"

# Exclude auto-generated/non-editable files from typos check
[files]
Expand Down
52 changes: 52 additions & 0 deletions core/binary_protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,58 @@ pub fn read_str(buf: &[u8], offset: usize, len: usize) -> Result<String, WireErr
.map_err(|_| WireError::InvalidUtf8 { offset })
}

/// Helper to read a `u128` LE from `buf` at `offset`.
///
/// # Errors
/// Returns `WireError::UnexpectedEof` if fewer than 16 bytes remain.
#[allow(clippy::missing_panics_doc)]
#[inline]
pub fn read_u128_le(buf: &[u8], offset: usize) -> Result<u128, WireError> {
let end = offset
.checked_add(16)
.ok_or_else(|| WireError::UnexpectedEof {
offset,
need: 16,
have: buf.len().saturating_sub(offset),
})?;
let slice = buf
.get(offset..end)
.ok_or_else(|| WireError::UnexpectedEof {
offset,
need: 16,
have: buf.len().saturating_sub(offset),
})?;
Ok(u128::from_le_bytes(
slice.try_into().expect("slice is exactly 16 bytes"),
))
}

/// Helper to read an `f32` LE from `buf` at `offset`.
///
/// # Errors
/// Returns `WireError::UnexpectedEof` if fewer than 4 bytes remain.
#[allow(clippy::missing_panics_doc)]
#[inline]
pub fn read_f32_le(buf: &[u8], offset: usize) -> Result<f32, WireError> {
let end = offset
.checked_add(4)
.ok_or_else(|| WireError::UnexpectedEof {
offset,
need: 4,
have: buf.len().saturating_sub(offset),
})?;
let slice = buf
.get(offset..end)
.ok_or_else(|| WireError::UnexpectedEof {
offset,
need: 4,
have: buf.len().saturating_sub(offset),
})?;
Ok(f32::from_le_bytes(
slice.try_into().expect("slice is exactly 4 bytes"),
))
}

/// Helper to read a byte slice of `len` bytes from `buf` at `offset`.
///
/// # Errors
Expand Down
20 changes: 0 additions & 20 deletions core/binary_protocol/src/consensus/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
fn size(&self) -> u32;
}

// ---------------------------------------------------------------------------
// GenericHeader - type-erased dispatch
// ---------------------------------------------------------------------------

/// Type-erased 256-byte header for initial message dispatch.
#[repr(C)]
Expand Down Expand Up @@ -84,9 +82,7 @@ impl ConsensusHeader for GenericHeader {
}
}

// ---------------------------------------------------------------------------
// RequestHeader - client -> primary
// ---------------------------------------------------------------------------

/// Client -> primary request header. 256 bytes.
#[repr(C)]
Expand Down Expand Up @@ -167,9 +163,7 @@ impl ConsensusHeader for RequestHeader {
}
}

// ---------------------------------------------------------------------------
// ReplyHeader - primary -> client
// ---------------------------------------------------------------------------

/// Primary -> client reply header. 256 bytes.
#[repr(C)]
Expand Down Expand Up @@ -251,9 +245,7 @@ impl ConsensusHeader for ReplyHeader {
}
}

// ---------------------------------------------------------------------------
// PrepareHeader - primary -> replicas (replication)
// ---------------------------------------------------------------------------

/// Primary -> replicas: replicate this operation.
#[repr(C)]
Expand Down Expand Up @@ -340,9 +332,7 @@ impl ConsensusHeader for PrepareHeader {
}
}

// ---------------------------------------------------------------------------
// PrepareOkHeader - replica -> primary (acknowledgement)
// ---------------------------------------------------------------------------

/// Replica -> primary: acknowledge a Prepare.
#[repr(C)]
Expand Down Expand Up @@ -427,9 +417,7 @@ impl ConsensusHeader for PrepareOkHeader {
}
}

// ---------------------------------------------------------------------------
// CommitHeader - primary -> replicas (commit, header-only)
// ---------------------------------------------------------------------------

/// Primary -> replicas: commit up to this op. Header-only (no body).
#[repr(C)]
Expand Down Expand Up @@ -484,9 +472,7 @@ impl ConsensusHeader for CommitHeader {
}
}

// ---------------------------------------------------------------------------
// StartViewChangeHeader - failure detection (header-only)
// ---------------------------------------------------------------------------

/// Replica suspects primary failure. Header-only.
#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
Expand Down Expand Up @@ -540,9 +526,7 @@ impl ConsensusHeader for StartViewChangeHeader {
}
}

// ---------------------------------------------------------------------------
// DoViewChangeHeader - view change vote (header-only)
// ---------------------------------------------------------------------------

/// Replica -> primary candidate: vote for view change. Header-only.
#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
Expand Down Expand Up @@ -614,9 +598,7 @@ impl ConsensusHeader for DoViewChangeHeader {
}
}

// ---------------------------------------------------------------------------
// StartViewHeader - new view announcement (header-only)
// ---------------------------------------------------------------------------

/// New primary -> all replicas: start new view. Header-only.
#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
Expand Down Expand Up @@ -681,9 +663,7 @@ impl ConsensusHeader for StartViewHeader {
}
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
Expand Down
2 changes: 1 addition & 1 deletion core/binary_protocol/src/consensus/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<H: ConsensusHeader> Message<H> {
});
}

// TODO: bytemuck::checked::try_from_bytes requires the buffer to be aligned
// TODO(hubcio): bytemuck::checked::try_from_bytes requires the buffer to be aligned
// to the target type's alignment. Consensus headers contain u128 fields (16-byte
// alignment), but Bytes from Vec<u8> only guarantees 8-byte alignment. The checked
// variant returns Err on misalignment (no UB), but production code can fail on
Expand Down
6 changes: 6 additions & 0 deletions core/binary_protocol/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.

// TODO(hubcio): Legacy framing constants for the current binary protocol.
// Once VSR consensus is integrated, both client-server and
// replica-replica traffic will use the unified 256-byte
// consensus header (`consensus::header::HEADER_SIZE`).
// These constants will be removed at that point.

/// Request frame: `[length:4 LE][code:4 LE][payload:N]`
/// `length` = size of code + payload = 4 + N
pub const REQUEST_HEADER_SIZE: usize = 4;
Expand Down
15 changes: 12 additions & 3 deletions core/binary_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//!
//! # Design
//!
//! Protocol types are independent from domain types (`iggy_common`).
//! Protocol types are independent from domain types.
//! Conversion between wire types and domain types happens at the boundary
//! (SDK client impls, server handlers).
//!
Expand All @@ -47,12 +47,21 @@ pub mod codes;
pub mod consensus;
pub mod error;
pub mod frame;
pub mod identifier;
pub mod message_layout;
pub mod message_view;
pub mod primitives;
pub mod requests;
pub mod responses;

pub use codec::{WireDecode, WireEncode};
pub use codes::*;
pub use error::WireError;
pub use frame::*;
pub use identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName};
pub use message_layout::*;
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName};
pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning};
pub use primitives::permissions::{
WireGlobalPermissions, WirePermissions, WireStreamPermissions, WireTopicPermissions,
};
pub use primitives::polling_strategy::WirePollingStrategy;
56 changes: 56 additions & 0 deletions core/binary_protocol/src/message_layout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! 64-byte message frame layout constants.
//!
//! This module is the single source of truth for the on-wire message
//! header layout shared between encoders, decoders, and zero-copy views.
//!
//! ```text
//! [checksum:8][id:16][offset:8][timestamp:8][origin_timestamp:8]
//! [user_headers_length:4][payload_length:4][reserved:8]
//! ```

/// Fixed-size message header on the wire (64 bytes).
pub const WIRE_MESSAGE_HEADER_SIZE: usize = 64;

/// Fixed-size index entry per message (16 bytes).
///
/// Layout: `[0x00000000:4][cumulative_size:u32_le:4][0x0000000000000000:8]`
pub const WIRE_MESSAGE_INDEX_SIZE: usize = 16;

// Field offsets within the 64-byte message header.
pub const MSG_CHECKSUM_OFFSET: usize = 0;
pub const MSG_ID_OFFSET: usize = 8;
pub const MSG_OFFSET_OFFSET: usize = 24;
pub const MSG_TIMESTAMP_OFFSET: usize = 32;
pub const MSG_ORIGIN_TIMESTAMP_OFFSET: usize = 40;
pub const MSG_USER_HEADERS_LEN_OFFSET: usize = 48;
pub const MSG_PAYLOAD_LEN_OFFSET: usize = 52;
pub const MSG_RESERVED_OFFSET: usize = 56;

// Compile-time verification that field offsets are contiguous and sum to the header size.
const _: () = {
assert!(MSG_ID_OFFSET == MSG_CHECKSUM_OFFSET + 8);
assert!(MSG_OFFSET_OFFSET == MSG_ID_OFFSET + 16);
assert!(MSG_TIMESTAMP_OFFSET == MSG_OFFSET_OFFSET + 8);
assert!(MSG_ORIGIN_TIMESTAMP_OFFSET == MSG_TIMESTAMP_OFFSET + 8);
assert!(MSG_USER_HEADERS_LEN_OFFSET == MSG_ORIGIN_TIMESTAMP_OFFSET + 8);
assert!(MSG_PAYLOAD_LEN_OFFSET == MSG_USER_HEADERS_LEN_OFFSET + 4);
assert!(MSG_RESERVED_OFFSET == MSG_PAYLOAD_LEN_OFFSET + 4);
assert!(MSG_RESERVED_OFFSET + 8 == WIRE_MESSAGE_HEADER_SIZE);
};
Loading
Loading