Skip to content

Commit bc2d912

Browse files
embedivermkj
authored andcommitted
Add method to fragment vectored payload
Signed-off-by: Marvin Gudel <[email protected]>
1 parent 3b38ffc commit bc2d912

File tree

2 files changed

+148
-25
lines changed

2 files changed

+148
-25
lines changed

mctp-estack/src/fragment.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,113 @@ impl Fragmenter {
144144
let used = max_total - rest.len();
145145
SendOutput::Packet(&mut out[..used])
146146
}
147+
148+
pub fn fragment_vectored<'f>(
149+
&mut self,
150+
payload: &[&[u8]],
151+
out: &'f mut [u8],
152+
) -> SendOutput<'f> {
153+
let total_payload_len =
154+
payload.iter().fold(0, |acc, part| acc + part.len());
155+
if total_payload_len < self.payload_used {
156+
// Caller is passing varying payload buffers
157+
debug!("varying payload");
158+
return SendOutput::failure(Error::BadArgument, self);
159+
}
160+
161+
// Require at least MTU buffer size, to ensure that all non-end
162+
// fragments are the same size per the spec.
163+
if out.len() < self.mtu {
164+
debug!("small out buffer");
165+
return SendOutput::failure(Error::BadArgument, self);
166+
}
167+
168+
// Reserve header space, the remaining buffer keeps being
169+
// updated in `rest`
170+
let max_total = out.len().min(self.mtu);
171+
let (h, mut rest) = out[..max_total].split_at_mut(MctpHeader::LEN);
172+
173+
// Append type byte
174+
if self.header.som {
175+
rest[0] = mctp::encode_type_ic(self.typ, self.ic);
176+
rest = &mut rest[1..];
177+
}
178+
179+
let remaining_payload_len = total_payload_len - self.payload_used;
180+
let l = remaining_payload_len.min(rest.len());
181+
let (d, rest) = rest.split_at_mut(l);
182+
copy_vectored(payload, self.payload_used, d);
183+
self.payload_used += l;
184+
185+
// Add the header
186+
if self.payload_used == total_payload_len {
187+
self.header.eom = true;
188+
}
189+
// OK unwrap: seq and tag are valid.
190+
h.copy_from_slice(&self.header.encode().unwrap());
191+
192+
self.header.som = false;
193+
self.header.seq = (self.header.seq + 1) & mctp::MCTP_SEQ_MASK;
194+
195+
let used = max_total - rest.len();
196+
SendOutput::Packet(&mut out[..used])
197+
}
198+
}
199+
200+
/// Copy data from a vectored src to dest
201+
///
202+
/// Copies `dest.len()` bytes from payload to dest,
203+
/// starting after `offset` bytes.
204+
///
205+
/// ## Panics
206+
///
207+
/// This function will panic when not enough bytes are available to fill dest.
208+
/// Total size of `payload` has to be `atleast dest.len()` + `offset`.
209+
fn copy_vectored(src: &[&[u8]], offset: usize, dest: &mut [u8]) {
210+
let mut i = 0;
211+
212+
while i < dest.len() {
213+
let payload_index = i + offset;
214+
let next = get_sub_slice(src, payload_index);
215+
let remaining = dest.len() - i;
216+
if remaining > next.len() {
217+
dest[i..(i + next.len())].copy_from_slice(next);
218+
i += next.len();
219+
} else {
220+
dest[i..].copy_from_slice(&next[..remaining]);
221+
return;
222+
}
223+
}
224+
}
225+
226+
/// Get a slice of `vector` indexed by `offset`
227+
///
228+
/// The `offset` is the absolute byte index.
229+
/// The returned slice is the remaining sub slice starting at `offset`.
230+
///
231+
/// ## Panics
232+
///
233+
/// Will panic when offset is larger than the size of `vector`.
234+
///
235+
/// ## Example
236+
/// ```ignore
237+
/// # use mctp_estack::fragment::get_slice;
238+
/// let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6]];
239+
///
240+
/// let slice = get_slice(vector, 4);
241+
///
242+
/// assert_eq!(slice, &[5, 6]);
243+
/// ```
244+
fn get_sub_slice<'a>(vector: &'a [&[u8]], offset: usize) -> &'a [u8] {
245+
let mut i = offset;
246+
for slice in vector {
247+
if i >= slice.len() {
248+
i -= slice.len();
249+
} else {
250+
return &slice[i..];
251+
}
252+
}
253+
panic!("offset for vector out of bounds");
147254
}
148255

149256
pub enum SendOutput<'p> {
@@ -194,3 +301,41 @@ impl SendOutput<'_> {
194301
Self::Error { err, cookie: None }
195302
}
196303
}
304+
305+
#[cfg(test)]
306+
mod tests {
307+
#[test]
308+
fn test_get_slice() {
309+
use super::get_sub_slice;
310+
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5, 6], &[7, 8, 9]];
311+
let slice = get_sub_slice(vector, 4);
312+
assert_eq!(slice, &[5, 6]);
313+
let slice = get_sub_slice(vector, 0);
314+
assert_eq!(slice, &[1, 2, 3]);
315+
let slice = get_sub_slice(vector, 3);
316+
assert_eq!(slice, &[4, 5, 6]);
317+
}
318+
#[test]
319+
fn test_copy_vectored() {
320+
use super::copy_vectored;
321+
let vector: &[&[u8]] = &[&[1, 2, 3], &[4, 5], &[6, 7, 8, 9]];
322+
323+
let mut dest = [0; 6];
324+
copy_vectored(vector, 1, &mut dest);
325+
assert_eq!(&dest, &[2, 3, 4, 5, 6, 7]);
326+
327+
let mut dest = [0; 5];
328+
copy_vectored(vector, 4, &mut dest);
329+
assert_eq!(&dest, &[5, 6, 7, 8, 9]);
330+
331+
let mut dest = [0; 9];
332+
copy_vectored(vector, 0, &mut dest);
333+
assert_eq!(&dest, &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
334+
335+
let vector: &[&[u8]] = &[&[1, 2, 3]];
336+
337+
let mut dest = [0; 1];
338+
copy_vectored(vector, 2, &mut dest);
339+
assert_eq!(&dest, &[3]);
340+
}
341+
}

mctp-estack/src/router.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use core::task::{Poll, Waker};
1717

1818
use crate::{
1919
config, AppCookie, Fragmenter, MctpHeader, MctpMessage, SendOutput, Stack,
20-
MAX_MTU, MAX_PAYLOAD,
20+
MAX_MTU,
2121
};
2222
use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue};
2323

@@ -180,23 +180,8 @@ impl PortTop {
180180
&self,
181181
fragmenter: &mut Fragmenter,
182182
pkt: &[&[u8]],
183-
work_msg: &mut Vec<u8, MAX_PAYLOAD>,
184183
) -> Result<Tag> {
185184
trace!("send_message");
186-
let payload = if pkt.len() == 1 {
187-
// Avoid the copy when sending a single slice
188-
pkt[0]
189-
} else {
190-
work_msg.clear();
191-
for p in pkt {
192-
work_msg.extend_from_slice(p).map_err(|_| {
193-
debug!("Message too large");
194-
Error::NoSpace
195-
})?;
196-
}
197-
work_msg
198-
};
199-
200185
// send_message() needs to wait for packets to get enqueued to the PortTop channel.
201186
// It shouldn't hold the send_mutex() across an await, since that would block
202187
// forward_packet().
@@ -215,7 +200,7 @@ impl PortTop {
215200
};
216201

217202
qpkt.len = 0;
218-
match fragmenter.fragment(payload, &mut qpkt.data) {
203+
match fragmenter.fragment_vectored(pkt, &mut qpkt.data) {
219204
SendOutput::Packet(p) => {
220205
qpkt.len = p.len();
221206
sender.send_done();
@@ -452,10 +437,6 @@ pub struct Router<'r> {
452437
BlockingMutex<RefCell<Vec<(MsgType, AppCookie), MAX_LISTENERS>>>,
453438

454439
recv_wakers: WakerPool,
455-
456-
/// Temporary storage to flatten vectorised local sent messages
457-
// prior to fragmentation and queueing.
458-
work_msg: AsyncMutex<Vec<u8, MAX_PAYLOAD>>,
459440
}
460441

461442
pub struct RouterInner<'r> {
@@ -497,7 +478,6 @@ impl<'r> Router<'r> {
497478
app_listeners,
498479
ports: Vec::new(),
499480
recv_wakers: Default::default(),
500-
work_msg: AsyncMutex::new(Vec::new()),
501481
}
502482
}
503483

@@ -776,9 +756,7 @@ impl<'r> Router<'r> {
776756
// release to allow other ports to continue work
777757
drop(inner);
778758

779-
// lock the shared work buffer against other app_send_message()
780-
let mut work_msg = self.work_msg.lock().await;
781-
top.send_message(&mut fragmenter, buf, &mut work_msg).await
759+
top.send_message(&mut fragmenter, buf).await
782760
}
783761

784762
/// Create a `AsyncReqChannel` instance.

0 commit comments

Comments
 (0)