Skip to content

Commit ba728c5

Browse files
committed
refactor/fix: overhaul BeeSerde and msg buffers
* Serializer now requires a preallocated buffer in form of a &mut [u8]. This moves the responsibility to allocate the required memory to the user. Before it used a dynamically growing buffer (which would reallocate internally if needed). This change is crucial for future RDMA support. * Since we now no longer use `BytesMut`, remove the bytes crate. * The buffers handled by the store have a fixed size of 4 MiB. This is apparently the maximum size of a BeeMsg (see `WORKER_BUF(IN|OUT)_SIZE` in `Worker.h`). C++ server code also uses these fixed size. * Generalize `msg_feature_flags` to a `Header` that can be modified from within the `Serializable` implementation and can be read out from within the `Deserializable` implementation. * Collect all BeeMsg (de)serialization functions in one module and provide functions for header, body and both combined. The split is required because depending on where the data comes from / goes to different actions need to be taken. This also provides an easy interface for potential external users to handle BeeMsges. * Remove the MsgBuf struct, instead just pass a `&mut [u8]` into the dispatcher. * Add documentation * Various small code cleanups in BeeSerde and other locations
1 parent d1710f5 commit ba728c5

File tree

16 files changed

+489
-508
lines changed

16 files changed

+489
-508
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ publish = false
1111

1212
[workspace.dependencies]
1313
anyhow = "1"
14-
bytes = "1"
1514
clap = { version = "4", features = ["derive"] }
1615
env_logger = "0"
1716
itertools = "0"

mgmtd/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::bee_msg::dispatch_request;
44
use crate::license::LicenseVerifier;
55
use crate::{ClientPulledStateNotification, StaticInfo};
66
use anyhow::Result;
7-
use shared::conn::Pool;
87
use shared::conn::msg_dispatch::*;
8+
use shared::conn::outgoing::Pool;
99
use shared::run_state::WeakRunStateHandle;
1010
use shared::types::{NodeId, NodeType};
1111
use std::ops::Deref;

mgmtd/src/db/import_v7.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ targetStates=1
8282
fn check_target_states(f: &Path) -> Result<()> {
8383
let s = std::fs::read(f)?;
8484

85-
let mut des = Deserializer::new(&s, 0);
85+
let mut des = Deserializer::new(&s);
8686
let states = des.map(
8787
false,
8888
|des| TargetId::deserialize(des),
@@ -184,7 +184,7 @@ struct ReadNodesResult {
184184
fn read_nodes(f: &Path) -> Result<ReadNodesResult> {
185185
let s = std::fs::read(f)?;
186186

187-
let mut des = Deserializer::new(&s, 0);
187+
let mut des = Deserializer::new(&s);
188188
let version = des.u32()?;
189189
let root_id = des.u32()?;
190190
let root_mirrored = des.u8()?;
@@ -286,7 +286,7 @@ fn storage_targets(tx: &Transaction, targets_path: &Path) -> Result<()> {
286286
fn storage_pools(tx: &Transaction, f: &Path) -> Result<()> {
287287
let s = std::fs::read(f)?;
288288

289-
let mut des = Deserializer::new(&s, 0);
289+
let mut des = Deserializer::new(&s);
290290
// Serialized as size_t, which should usually be 64 bit.
291291
let count = des.i64()?;
292292
let mut used_aliases = vec![];
@@ -412,7 +412,7 @@ fn quota_default_limits(tx: &Transaction, f: &Path, pool_id: PoolId) -> Result<(
412412
Err(err) => return Err(err.into()),
413413
};
414414

415-
let mut des = Deserializer::new(&s, 0);
415+
let mut des = Deserializer::new(&s);
416416
let user_inode_limit = des.u64()?;
417417
let user_space_limit = des.u64()?;
418418
let group_inode_limit = des.u64()?;
@@ -494,7 +494,7 @@ fn quota_limits(
494494
Err(err) => return Err(err.into()),
495495
};
496496

497-
let mut des = Deserializer::new(&s, 0);
497+
let mut des = Deserializer::new(&s);
498498
let limits = des.seq(false, |des| QuotaEntry::deserialize(des))?;
499499
des.finish()?;
500500

mgmtd/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use bee_msg::notify_nodes;
1919
use db::node_nic::ReplaceNic;
2020
use license::LicenseVerifier;
2121
use shared::bee_msg::target::RefreshTargetStates;
22-
use shared::conn::{Pool, incoming};
22+
use shared::conn::incoming;
23+
use shared::conn::outgoing::Pool;
2324
use shared::nic::Nic;
2425
use shared::run_state::{self, RunStateControl};
2526
use shared::types::{AuthSecret, MGMTD_UID, NicType, NodeId, NodeType};

shared/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ publish.workspace = true
1111
bee_serde_derive = { path = "../bee_serde_derive" }
1212

1313
anyhow = { workspace = true }
14-
bytes = { workspace = true }
1514
log = { workspace = true }
1615
pnet_datalink = "0"
1716
protobuf = { workspace = true, optional = true }

shared/src/bee_msg.rs

Lines changed: 166 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
33
use crate::bee_serde::*;
44
use crate::types::*;
5-
use anyhow::Result;
5+
use anyhow::{Context, Result, anyhow};
66
use bee_serde_derive::BeeSerde;
77
use std::collections::{HashMap, HashSet};
88

99
pub mod buddy_group;
10-
pub mod header;
1110
pub mod misc;
1211
pub mod node;
1312
pub mod quota;
@@ -41,3 +40,168 @@ impl OpsErr {
4140
pub const AGAIN: Self = Self(22);
4241
pub const UNKNOWN_POOL: Self = Self(30);
4342
}
43+
44+
/// The BeeMsg header
45+
#[derive(Clone, Debug, PartialEq, Eq, BeeSerde)]
46+
pub struct Header {
47+
/// Total length of the serialized message, including the header itself
48+
msg_len: u32,
49+
/// Sometimes used for additional message specific payload and/or serialization info
50+
pub msg_feature_flags: u16,
51+
/// Sometimes used for additional message specific payload and/or serialization info
52+
pub msg_compat_feature_flags: u8,
53+
/// Sometimes used for additional message specific payload and/or serialization info
54+
pub msg_flags: u8,
55+
/// Fixed value to identify a BeeMsg header (see MSG_PREFIX below)
56+
msg_prefix: u64,
57+
/// Uniquely identifies the message type as defined in the C++ codebase in NetMessageTypes.h
58+
msg_id: MsgId,
59+
/// Sometimes used for additional message specific payload and/or serialization info
60+
pub msg_target_id: TargetId,
61+
/// Sometimes used for additional message specific payload and/or serialization info
62+
pub msg_user_id: u32,
63+
/// Mirroring related information
64+
pub msg_seq: u64,
65+
/// Mirroring related information
66+
pub msg_seq_done: u64,
67+
}
68+
69+
impl Header {
70+
/// The serialized length of the header
71+
pub const LEN: usize = 40;
72+
/// Fixed value for identifying BeeMsges. In theory, this has some kind of version modifier
73+
/// (thus the + 0), but it is unused
74+
#[allow(clippy::identity_op)]
75+
pub const MSG_PREFIX: u64 = (0x42474653 << 32) + 0;
76+
77+
/// The total length of the serialized message
78+
pub fn msg_len(&self) -> usize {
79+
self.msg_len as usize
80+
}
81+
82+
/// The messages id
83+
pub fn msg_id(&self) -> MsgId {
84+
self.msg_id
85+
}
86+
}
87+
88+
impl Default for Header {
89+
fn default() -> Self {
90+
Self {
91+
msg_len: 0,
92+
msg_feature_flags: 0,
93+
msg_compat_feature_flags: 0,
94+
msg_flags: 0,
95+
msg_prefix: Self::MSG_PREFIX,
96+
msg_id: 0,
97+
msg_target_id: 0,
98+
msg_user_id: 0,
99+
msg_seq: 0,
100+
msg_seq_done: 0,
101+
}
102+
}
103+
}
104+
105+
/// Serializes a BeeMsg body into the provided buffer.
106+
///
107+
/// The data is written from the beginning of the slice, it's up to the caller to pass the correct
108+
/// sub slice if space for the header should be reserved.
109+
///
110+
/// # Return value
111+
/// Returns the number of bytes written and the header modified by serialization function.
112+
pub fn serialize_body<M: Msg + Serializable>(msg: &M, buf: &mut [u8]) -> Result<(usize, Header)> {
113+
let mut ser = Serializer::new(buf);
114+
msg.serialize(&mut ser)
115+
.context("BeeMsg body serialization failed")?;
116+
117+
Ok((ser.bytes_written(), ser.finish()))
118+
}
119+
120+
/// Serializes a BeeMsg header into the provided buffer.
121+
///
122+
/// # Return value
123+
/// Returns the number of bytes written.
124+
pub fn serialize_header(header: &Header, buf: &mut [u8]) -> Result<usize> {
125+
let mut ser_header = Serializer::new(buf);
126+
header
127+
.serialize(&mut ser_header)
128+
.context("BeeMsg header serialization failed")?;
129+
130+
Ok(ser_header.bytes_written())
131+
}
132+
133+
/// Serializes a complete BeeMsg (header + body) into the provided buffer.
134+
///
135+
/// # Return value
136+
/// Returns the number of bytes written.
137+
pub fn serialize<M: Msg + Serializable>(msg: &M, buf: &mut [u8]) -> Result<usize> {
138+
let (written, mut header) = serialize_body(msg, &mut buf[Header::LEN..])?;
139+
140+
header.msg_len = (written + Header::LEN) as u32;
141+
header.msg_id = M::ID;
142+
143+
let _ = serialize_header(&header, &mut buf[0..Header::LEN])?;
144+
145+
Ok(header.msg_len())
146+
}
147+
148+
/// Deserializes a BeeMsg header from the provided buffer.
149+
///
150+
/// # Return value
151+
/// Returns the deserialized header.
152+
pub fn deserialize_header(buf: &[u8]) -> Result<Header> {
153+
const CTX: &str = "BeeMsg header deserialization failed";
154+
155+
let header_buf = buf
156+
.get(..Header::LEN)
157+
.ok_or_else(|| {
158+
anyhow!(
159+
"Header buffer must be at least {} bytes big, got {}",
160+
Header::LEN,
161+
buf.len()
162+
)
163+
})
164+
.context(CTX)?;
165+
166+
let mut des = Deserializer::new(header_buf);
167+
let header = Header::deserialize(&mut des).context(CTX)?;
168+
des.finish().context(CTX)?;
169+
170+
if header.msg_prefix != Header::MSG_PREFIX {
171+
return Err(anyhow!(
172+
"Invalid BeeMsg prefix: Must be {}, got {}",
173+
Header::MSG_PREFIX,
174+
header.msg_prefix
175+
))
176+
.context(CTX);
177+
}
178+
179+
Ok(header)
180+
}
181+
182+
/// Deserializes a BeeMsg body from the provided buffer.
183+
///
184+
/// The data is read from the beginning of the slice, it's up to the caller to pass the correct
185+
/// sub slice if space for the header should be excluded from the source.
186+
///
187+
/// # Return value
188+
/// Returns the deserialized message.
189+
pub fn deserialize_body<M: Msg + Deserializable>(header: &Header, buf: &[u8]) -> Result<M> {
190+
const CTX: &str = "BeeMsg body deserialization failed";
191+
192+
let mut des = Deserializer::with_header(&buf[0..(header.msg_len() - Header::LEN)], header);
193+
let des_msg = M::deserialize(&mut des).context(CTX)?;
194+
des.finish().context(CTX)?;
195+
196+
Ok(des_msg)
197+
}
198+
199+
/// Deserializes a complete BeeMsg (header + body) from the provided buffer.
200+
///
201+
/// # Return value
202+
/// Returns the deserialized message.
203+
pub fn deserialize<M: Msg + Deserializable>(buf: &[u8]) -> Result<M> {
204+
let header = deserialize_header(&buf[0..Header::LEN])?;
205+
let msg = deserialize_body(&header, &buf[Header::LEN..])?;
206+
Ok(msg)
207+
}

shared/src/bee_msg/header.rs

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)