Skip to content

Commit b87a84f

Browse files
authored
feat(rust): add wire protocol codec and types to binary_protocol crate (#2946)
1 parent 68abb60 commit b87a84f

32 files changed

+3550
-18
lines changed

.typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ bais = "bais"
2929
Strat = "Strat"
3030
# Same as above
3131
strin = "strin"
32+
# Valid prefix in Unicode escape test strings (e.g. "caf\u{00e9}" = cafe)
33+
caf = "caf"
3234

3335
# Exclude auto-generated/non-editable files from typos check
3436
[files]

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/binary_protocol/Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
[package]
1919
name = "iggy_binary_protocol"
2020
version = "0.9.2-edge.1"
21-
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
21+
description = "Wire protocol types and codec for the Iggy binary protocol. Shared between server and SDK."
2222
edition = "2024"
2323
license = "Apache-2.0"
2424
keywords = ["iggy", "messaging", "streaming"]
@@ -29,3 +29,12 @@ repository = "https://github.com/apache/iggy"
2929
readme = "../../README.md"
3030

3131
[dependencies]
32+
bytemuck = { workspace = true }
33+
bytes = { workspace = true }
34+
enumset = { workspace = true }
35+
thiserror = { workspace = true }
36+
37+
[lints.clippy]
38+
enum_glob_use = "deny"
39+
nursery = "warn"
40+
pedantic = "deny"

core/binary_protocol/src/codec.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::WireError;
19+
use bytes::{Bytes, BytesMut};
20+
21+
/// Encode a wire type into a caller-owned buffer.
22+
///
23+
/// Buffer-first design: the caller controls allocation. The `to_bytes()`
24+
/// convenience method allocates when needed, but hot paths can reuse
25+
/// buffers via `encode()` directly.
26+
pub trait WireEncode {
27+
/// Write the encoded representation into `buf`.
28+
fn encode(&self, buf: &mut BytesMut);
29+
30+
/// Return the exact encoded size in bytes.
31+
fn encoded_size(&self) -> usize;
32+
33+
/// Convenience: allocate a new [`Bytes`] and encode into it.
34+
#[must_use]
35+
fn to_bytes(&self) -> Bytes {
36+
let mut buf = BytesMut::with_capacity(self.encoded_size());
37+
self.encode(&mut buf);
38+
buf.freeze()
39+
}
40+
}
41+
42+
/// Decode a wire type from a byte slice.
43+
///
44+
/// Takes `&[u8]` instead of [`Bytes`] to avoid requiring reference-counted
45+
/// ownership at the decode boundary.
46+
pub trait WireDecode: Sized {
47+
/// Decode from `buf`, consuming exactly the bytes needed.
48+
/// Returns the decoded value and the number of bytes consumed.
49+
///
50+
/// # Errors
51+
/// Returns `WireError` if the buffer is too short or contains invalid data.
52+
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError>;
53+
54+
/// Convenience: decode from the entire buffer, ignoring trailing bytes.
55+
///
56+
/// # Errors
57+
/// Returns `WireError` if decoding fails.
58+
fn decode_from(buf: &[u8]) -> Result<Self, WireError> {
59+
Self::decode(buf).map(|(val, _)| val)
60+
}
61+
}
62+
63+
/// Helper to read a `u8` from `buf` at `offset`.
64+
///
65+
/// # Errors
66+
/// Returns `WireError::UnexpectedEof` if `offset` is out of bounds.
67+
#[inline]
68+
pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, WireError> {
69+
buf.get(offset)
70+
.copied()
71+
.ok_or_else(|| WireError::UnexpectedEof {
72+
offset,
73+
need: 1,
74+
have: buf.len().saturating_sub(offset),
75+
})
76+
}
77+
78+
/// Helper to read a `u32` LE from `buf` at `offset`.
79+
///
80+
/// # Errors
81+
/// Returns `WireError::UnexpectedEof` if fewer than 4 bytes remain.
82+
#[allow(clippy::missing_panics_doc)]
83+
#[inline]
84+
pub fn read_u32_le(buf: &[u8], offset: usize) -> Result<u32, WireError> {
85+
let end = offset
86+
.checked_add(4)
87+
.ok_or_else(|| WireError::UnexpectedEof {
88+
offset,
89+
need: 4,
90+
have: buf.len().saturating_sub(offset),
91+
})?;
92+
let slice = buf
93+
.get(offset..end)
94+
.ok_or_else(|| WireError::UnexpectedEof {
95+
offset,
96+
need: 4,
97+
have: buf.len().saturating_sub(offset),
98+
})?;
99+
Ok(u32::from_le_bytes(
100+
slice.try_into().expect("slice is exactly 4 bytes"),
101+
))
102+
}
103+
104+
/// Helper to read a `u64` LE from `buf` at `offset`.
105+
///
106+
/// # Errors
107+
/// Returns `WireError::UnexpectedEof` if fewer than 8 bytes remain.
108+
#[allow(clippy::missing_panics_doc)]
109+
#[inline]
110+
pub fn read_u64_le(buf: &[u8], offset: usize) -> Result<u64, WireError> {
111+
let end = offset
112+
.checked_add(8)
113+
.ok_or_else(|| WireError::UnexpectedEof {
114+
offset,
115+
need: 8,
116+
have: buf.len().saturating_sub(offset),
117+
})?;
118+
let slice = buf
119+
.get(offset..end)
120+
.ok_or_else(|| WireError::UnexpectedEof {
121+
offset,
122+
need: 8,
123+
have: buf.len().saturating_sub(offset),
124+
})?;
125+
Ok(u64::from_le_bytes(
126+
slice.try_into().expect("slice is exactly 8 bytes"),
127+
))
128+
}
129+
130+
/// Helper to read a UTF-8 string of `len` bytes from `buf` at `offset`.
131+
///
132+
/// # Errors
133+
/// Returns `WireError::UnexpectedEof` or `WireError::InvalidUtf8` on failure.
134+
#[inline]
135+
pub fn read_str(buf: &[u8], offset: usize, len: usize) -> Result<String, WireError> {
136+
let end = offset
137+
.checked_add(len)
138+
.ok_or_else(|| WireError::UnexpectedEof {
139+
offset,
140+
need: len,
141+
have: buf.len().saturating_sub(offset),
142+
})?;
143+
let slice = buf
144+
.get(offset..end)
145+
.ok_or_else(|| WireError::UnexpectedEof {
146+
offset,
147+
need: len,
148+
have: buf.len().saturating_sub(offset),
149+
})?;
150+
std::str::from_utf8(slice)
151+
.map(str::to_string)
152+
.map_err(|_| WireError::InvalidUtf8 { offset })
153+
}
154+
155+
/// Helper to read a byte slice of `len` bytes from `buf` at `offset`.
156+
///
157+
/// # Errors
158+
/// Returns `WireError::UnexpectedEof` if fewer than `len` bytes remain.
159+
#[inline]
160+
pub fn read_bytes(buf: &[u8], offset: usize, len: usize) -> Result<&[u8], WireError> {
161+
let end = offset
162+
.checked_add(len)
163+
.ok_or_else(|| WireError::UnexpectedEof {
164+
offset,
165+
need: len,
166+
have: buf.len().saturating_sub(offset),
167+
})?;
168+
buf.get(offset..end)
169+
.ok_or_else(|| WireError::UnexpectedEof {
170+
offset,
171+
need: len,
172+
have: buf.len().saturating_sub(offset),
173+
})
174+
}

0 commit comments

Comments
 (0)