Skip to content

Commit 5bff727

Browse files
channel: net: framed: new fn FrameWrite::write_frame (#925)
Summary: Pull Request resolved: #925 replace ad-hoc `send_frame` helper with a new `FrameWrite::write_frame` utility method (per https://www.internalfb.com/diff/D80477756?dst_version_fbid=1325594515669182&transaction_fbid=942604394698850). update net.rs tests to call the method directly (returning the writer). add a smoke test in framed.rs to verify round-trip behavior. Reviewed By: mariusae Differential Revision: D80541567 fbshipit-source-id: c573f01a259a705271f32d8dac8d774d24b7d432
1 parent 46cdad9 commit 5bff727

File tree

2 files changed

+84
-18
lines changed

2 files changed

+84
-18
lines changed

hyperactor/src/channel/net.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2577,15 +2577,6 @@ mod tests {
25772577
(join_handle, reader, writer, rx, cancel_token)
25782578
}
25792579

2580-
async fn send_frame<W>(writer: W, bytes: bytes::Bytes) -> W
2581-
where
2582-
W: AsyncWrite + Unpin,
2583-
{
2584-
let mut fw = FrameWrite::new(writer, bytes);
2585-
fw.send().await.unwrap();
2586-
fw.complete()
2587-
}
2588-
25892580
async fn write_stream<M, W>(
25902581
mut writer: W,
25912582
session_id: u64,
@@ -2866,7 +2857,9 @@ mod tests {
28662857
.await;
28672858

28682859
for i in 0u64..5u64 {
2869-
writer = send_frame(writer, serialize_ack(i)).await;
2860+
writer = FrameWrite::write_frame(writer, serialize_ack(i))
2861+
.await
2862+
.unwrap();
28702863
}
28712864
// Wait for the acks to be processed by NetTx.
28722865
RealClock.sleep(Duration::from_secs(3)).await;
@@ -2927,7 +2920,9 @@ mod tests {
29272920
// In the last iteration, ack part of the messages. This should
29282921
// prune them from future resent.
29292922
if i == n - 1 {
2930-
writer = send_frame(writer, serialize_ack(1)).await;
2923+
writer = FrameWrite::write_frame(writer, serialize_ack(1))
2924+
.await
2925+
.unwrap();
29312926
// Wait for the acks to be processed by NetTx.
29322927
RealClock.sleep(Duration::from_secs(3)).await;
29332928
}
@@ -2983,9 +2978,15 @@ mod tests {
29832978
if i == n - 1 {
29842979
// Intentionally ack 1 again to verify it is okay to ack
29852980
// messages that was already acked.
2986-
writer = send_frame(writer, serialize_ack(1)).await;
2987-
writer = send_frame(writer, serialize_ack(2)).await;
2988-
writer = send_frame(writer, serialize_ack(3)).await;
2981+
writer = FrameWrite::write_frame(writer, serialize_ack(1))
2982+
.await
2983+
.unwrap();
2984+
writer = FrameWrite::write_frame(writer, serialize_ack(2))
2985+
.await
2986+
.unwrap();
2987+
writer = FrameWrite::write_frame(writer, serialize_ack(3))
2988+
.await
2989+
.unwrap();
29892990
// Wait for the acks to be processed by NetTx.
29902991
RealClock.sleep(Duration::from_secs(3)).await;
29912992
}
@@ -3017,7 +3018,9 @@ mod tests {
30173018

30183019
// In the last iteration, ack part of the messages from the 2nd send.
30193020
if i == n - 1 {
3020-
writer = send_frame(writer, serialize_ack(7)).await;
3021+
writer = FrameWrite::write_frame(writer, serialize_ack(7))
3022+
.await
3023+
.unwrap();
30213024
// Wait for the acks to be processed by NetTx.
30223025
RealClock.sleep(Duration::from_secs(3)).await;
30233026
}
@@ -3061,7 +3064,9 @@ mod tests {
30613064
let (mut reader, mut writer) = take_receiver(&receiver_storage).await;
30623065
verify_stream(&mut reader, &[(0u64, 100u64)], None, line!()).await;
30633066
// ack it
3064-
writer = send_frame(writer, serialize_ack(0)).await;
3067+
writer = FrameWrite::write_frame(writer, serialize_ack(0))
3068+
.await
3069+
.unwrap();
30653070
// confirm Tx received ack
30663071
//
30673072
// Using `is_err` to confirm the message is delivered/acked is confusing,
@@ -3072,7 +3077,9 @@ mod tests {
30723077
// Although Tx did not actually send seq=1, we still ack it from Rx to
30733078
// pretend Tx already sent it, just it did not know it was sent
30743079
// successfully.
3075-
let _ = send_frame(writer, serialize_ack(1)).await;
3080+
let _ = FrameWrite::write_frame(writer, serialize_ack(1))
3081+
.await
3082+
.unwrap();
30763083

30773084
let (return_channel_tx, return_channel_rx) = oneshot::channel();
30783085
net_tx.try_post(101, return_channel_tx).unwrap();
@@ -3104,7 +3111,9 @@ mod tests {
31043111
// Confirm message is sent to rx.
31053112
verify_stream(&mut reader, &[(0u64, 100u64)], None, line!()).await;
31063113
// ack it
3107-
let _ = send_frame(writer, serialize_ack(0)).await;
3114+
let _ = FrameWrite::write_frame(writer, serialize_ack(0))
3115+
.await
3116+
.unwrap();
31083117
RealClock.sleep(Duration::from_secs(3)).await;
31093118
// Channel should be still alive because ack was sent.
31103119
assert!(!tx_status.has_changed().unwrap());

hyperactor/src/channel/net/framed.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,41 @@ impl<W: AsyncWrite + Unpin> FrameWrite<W> {
170170
let Self { writer, .. } = self;
171171
writer
172172
}
173+
174+
/// Writes a single frame into the underlying writer and returns
175+
/// it.
176+
///
177+
/// This is a convenience for the common pattern:
178+
/// `FrameWrite::new(writer, bytes).send().await?.complete()`.
179+
///
180+
/// Frame writes are atomic: either the entire frame is sent, or
181+
/// an error is returned. No partial frames are observed by the
182+
/// receiver.
183+
///
184+
/// # Arguments
185+
///
186+
/// * `writer` — the `AsyncWrite` sink to write into.
187+
/// * `bytes` — the serialized frame body to send.
188+
///
189+
/// # Returns
190+
///
191+
/// On success, returns the underlying writer so the caller can
192+
/// continue using it for further frames. On error, returns the
193+
/// I/O error from the underlying write.
194+
///
195+
/// # Examples
196+
///
197+
/// ```ignore
198+
/// use bytes::Bytes;
199+
///
200+
/// // `writer` is any AsyncWrite + Unpin (e.g. a tokio `WriteHalf`)
201+
/// let writer = FrameWrite::write_frame(writer, Bytes::from_static(b"hello")).await?;
202+
/// ```
203+
pub async fn write_frame(writer: W, bytes: Bytes) -> std::io::Result<W> {
204+
let mut fw = FrameWrite::new(writer, bytes);
205+
fw.send().await?;
206+
Ok(fw.complete())
207+
}
173208
}
174209

175210
#[cfg(test)]
@@ -208,4 +243,26 @@ mod tests {
208243
}
209244

210245
// todo: test cancellation, frame size
246+
247+
#[tokio::test]
248+
async fn test_write_frame_smoke() {
249+
let (a, b) = tokio::io::duplex(4096);
250+
let (r, _w_unused) = tokio::io::split(a);
251+
let (_r_unused, w) = tokio::io::split(b);
252+
253+
let mut reader = FrameReader::new(r, 1024);
254+
255+
let w = FrameWrite::write_frame(w, Bytes::from_static(b"hello"))
256+
.await
257+
.unwrap();
258+
let _ = FrameWrite::write_frame(w, Bytes::from_static(b"world"))
259+
.await
260+
.unwrap();
261+
262+
let f1 = reader.next().await.unwrap().unwrap();
263+
let f2 = reader.next().await.unwrap().unwrap();
264+
265+
assert_eq!(f1.as_ref(), b"hello");
266+
assert_eq!(f2.as_ref(), b"world");
267+
}
211268
}

0 commit comments

Comments
 (0)