Skip to content

Commit 5dad2de

Browse files
net: framed: upgrade test support for multibuf cancel safe testing (#956)
Summary: Pull Request resolved: #956 added vectored write support to the test harness (`SharedWriter`, `BudgetedWriter`). we now gate and truncate across multiple `IoSlices` just like we do for single buffers. correctness is unchanged; the existing bytes-based tests still pass untouched. this just makes the harness realistic for multipart, so the property tests can exercise the actual `write_vectored` path. Reviewed By: mariusae Differential Revision: D80731891 fbshipit-source-id: fab9b65ecc48caa3fb880c4311c0e7bdc004f009
1 parent 49a07ce commit 5dad2de

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

hyperactor/src/channel/net/framed.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ impl<W: AsyncWrite + Unpin, B: Buf> FrameWrite<W, B> {
223223
#[cfg(test)]
224224
mod test_support {
225225
use std::io;
226+
use std::io::IoSlice;
226227
use std::pin::Pin;
227228
use std::sync::Arc;
228229
use std::sync::Mutex;
@@ -341,6 +342,20 @@ mod test_support {
341342
Pin::new(&mut *w).poll_write(cx, buf)
342343
}
343344

345+
fn poll_write_vectored(
346+
self: Pin<&mut Self>,
347+
cx: &mut Context<'_>,
348+
bufs: &[IoSlice<'_>],
349+
) -> Poll<io::Result<usize>> {
350+
let mut w = self.0.lock().unwrap();
351+
Pin::new(&mut *w).poll_write_vectored(cx, bufs)
352+
}
353+
354+
fn is_write_vectored(&self) -> bool {
355+
let mut w = self.0.lock().unwrap();
356+
Pin::new(&mut *w).is_write_vectored()
357+
}
358+
344359
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
345360
let mut w = self.0.lock().unwrap();
346361
Pin::new(&mut *w).poll_flush(cx)
@@ -518,16 +533,53 @@ mod test_support {
518533
cx: &mut Context<'_>,
519534
buf: &[u8],
520535
) -> Poll<io::Result<usize>> {
521-
// Ask the gate how much were allowed to write now.
536+
// Ask the gate how much we're allowed to write now.
522537
let n = self.gate.take_chunk(buf.len(), cx);
523538
if n == 0 {
524539
return Poll::Pending;
525540
}
526-
// Synchronous lock is fine: `poll_*` must not await; we only hold it for the call.
541+
// Synchronous lock is fine: `poll_*` must not await; we
542+
// only hold it for the call.
527543
let mut guard = self.inner.lock_guard();
528544
Pin::new(&mut *guard).poll_write(cx, &buf[..n])
529545
}
530546

547+
fn poll_write_vectored(
548+
self: Pin<&mut Self>,
549+
cx: &mut Context<'_>,
550+
bufs: &[IoSlice<'_>],
551+
) -> Poll<io::Result<usize>> {
552+
// Total bytes we *could* write this call.
553+
let total_len: usize = bufs.iter().map(|b| b.len()).sum();
554+
// Check with the gate how many bytes we're allowed to
555+
// write.
556+
let grant = self.gate.take_chunk(total_len, cx);
557+
if grant == 0 {
558+
return Poll::Pending;
559+
}
560+
// Build a truncated view of `bufs` whose total length ≤
561+
// grant. We may end with a shortened last slice.
562+
let mut left = grant;
563+
let mut granted: Vec<IoSlice<'_>> = Vec::with_capacity(bufs.len());
564+
for s in bufs {
565+
if left == 0 {
566+
break;
567+
}
568+
let take = s.len().min(left);
569+
if take > 0 {
570+
granted.push(IoSlice::new(&s[..take]));
571+
left -= take;
572+
}
573+
}
574+
575+
let mut guard = self.inner.lock_guard();
576+
Pin::new(&mut *guard).poll_write_vectored(cx, &granted)
577+
}
578+
579+
fn is_write_vectored(&self) -> bool {
580+
self.inner.is_write_vectored()
581+
}
582+
531583
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
532584
let mut guard = self.inner.lock_guard();
533585
Pin::new(&mut *guard).poll_flush(cx)

0 commit comments

Comments
 (0)