Skip to content

Commit 1691ae0

Browse files
channel: net: update unit-tests 2/N (#911)
Summary: Pull Request resolved: #911 replace `tokio_util::codec::LengthDelimitedCodec`/`Framed` in `test_ack_in_net_tx_basic` and `test_persistent_net_tx` with `hyperactor::channel::net`’s `FrameReader`/`FrameWrite`. introduce `take_receiver2`, `verify_message2`, `verify_stream2`, and `send_frame2` helpers. behavior is unchanged; tests now exercise the production framer (zero-copy, cancellation-safe) directly. Reviewed By: mariusae Differential Revision: D80477756 fbshipit-source-id: 4cb8e3d22f89d47109cd7300444659940741896c
1 parent aaac251 commit 1691ae0

File tree

1 file changed

+130
-53
lines changed

1 file changed

+130
-53
lines changed

hyperactor/src/channel/net.rs

Lines changed: 130 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2607,6 +2607,15 @@ mod tests {
26072607
(join_handle, framed, rx, cancel_token)
26082608
}
26092609

2610+
async fn send_frame2<W>(writer: W, bytes: bytes::Bytes) -> W
2611+
where
2612+
W: AsyncWrite + Unpin,
2613+
{
2614+
let mut fw = FrameWrite::new(writer, bytes);
2615+
fw.send().await.unwrap();
2616+
fw.complete()
2617+
}
2618+
26102619
async fn write_stream2<M, W>(
26112620
mut writer: W,
26122621
session_id: u64,
@@ -2838,6 +2847,15 @@ mod tests {
28382847
verify_tx_closed(&mut tx_receiver, "failed to deliver message within timeout").await;
28392848
}
28402849

2850+
async fn take_receiver2(
2851+
receiver_storage: &MVar<DuplexStream>,
2852+
) -> (FrameReader<ReadHalf<DuplexStream>>, WriteHalf<DuplexStream>) {
2853+
let receiver = receiver_storage.take().await;
2854+
let (r, writer) = tokio::io::split(receiver);
2855+
let reader = FrameReader::new(r, config::global::get(config::CODEC_MAX_FRAME_LENGTH));
2856+
(reader, writer)
2857+
}
2858+
28412859
async fn take_receiver(
28422860
receiver_storage: &MVar<DuplexStream>,
28432861
) -> (
@@ -2849,6 +2867,18 @@ mod tests {
28492867
futures::StreamExt::split(framed)
28502868
}
28512869

2870+
async fn verify_message2<M: RemoteMessage + PartialEq>(
2871+
reader: &mut FrameReader<ReadHalf<DuplexStream>>,
2872+
expect: (u64, M),
2873+
loc: u32,
2874+
) {
2875+
let expected = Frame::Message(expect.0, expect.1);
2876+
let bytes = reader.next().await.unwrap().expect("unexpected EOF");
2877+
let frame: Frame<M> = bincode::deserialize(bytes.as_ref()).unwrap();
2878+
2879+
assert_eq!(frame, expected, "from ln={loc}");
2880+
}
2881+
28522882
async fn verify_message<M: RemoteMessage + std::cmp::PartialEq>(
28532883
stream: &mut SplitStream<Framed<DuplexStream, LengthDelimitedCodec>>,
28542884
expect: (u64, M),
@@ -2864,6 +2894,32 @@ mod tests {
28642894
assert_eq!(frame, expected, "from ln={loc}");
28652895
}
28662896

2897+
async fn verify_stream2<M: RemoteMessage + PartialEq + Clone>(
2898+
reader: &mut FrameReader<ReadHalf<DuplexStream>>,
2899+
expects: &[(u64, M)],
2900+
expect_session_id: Option<u64>,
2901+
loc: u32,
2902+
) -> u64 {
2903+
let session_id = {
2904+
let bytes = reader.next().await.unwrap().expect("unexpected EOF");
2905+
let frame: Frame<M> = bincode::deserialize(bytes.as_ref()).unwrap();
2906+
match frame {
2907+
Frame::Init(session_id) => session_id,
2908+
_ => panic!("the 1st frame is not Init: {:?}. from ln={loc}", frame),
2909+
}
2910+
};
2911+
2912+
if let Some(expected_id) = expect_session_id {
2913+
assert_eq!(session_id, expected_id, "from ln={loc}");
2914+
}
2915+
2916+
for expect in expects {
2917+
verify_message2(reader, expect.clone(), loc).await;
2918+
}
2919+
2920+
session_id
2921+
}
2922+
28672923
async fn verify_stream<M: RemoteMessage + std::cmp::PartialEq + Clone>(
28682924
stream: &mut SplitStream<Framed<DuplexStream, LengthDelimitedCodec>>,
28692925
expects: &[(u64, M)],
@@ -2910,30 +2966,39 @@ mod tests {
29102966
// Send some messages, but not acking any of them.
29112967
net_tx_send(&tx, &[100, 101, 102, 103, 104]).await;
29122968
let session_id = {
2913-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
2914-
let id = verify_stream(
2915-
&mut stream,
2916-
&[(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)],
2969+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
2970+
let id = verify_stream2(
2971+
&mut reader,
2972+
&[
2973+
(0u64, 100u64),
2974+
(1u64, 101u64),
2975+
(2u64, 102u64),
2976+
(3u64, 103u64),
2977+
(4u64, 104u64),
2978+
],
29172979
None,
29182980
line!(),
29192981
)
29202982
.await;
29212983

2922-
for i in 0..5 {
2923-
sink.send(serialize_ack(i)).await.unwrap();
2984+
for i in 0u64..5u64 {
2985+
writer = send_frame2(writer, serialize_ack(i)).await;
29242986
}
29252987
// Wait for the acks to be processed by NetTx.
29262988
RealClock.sleep(Duration::from_secs(3)).await;
2927-
// client DuplexStream is dropped here. This breaks the connection.
2989+
// Drop both halves to break the in-memory connection (parity with old drop of DuplexStream).
2990+
drop(reader);
2991+
drop(writer);
2992+
29282993
id
29292994
};
29302995

29312996
// Sent a new message to verify all sent messages will not be resent.
2932-
net_tx_send(&tx, &[105]).await;
2997+
net_tx_send(&tx, &[105u64]).await;
29332998
{
2934-
let (_sink, mut stream) = take_receiver(&receiver_storage).await;
2935-
verify_stream(&mut stream, &[(5, 105)], Some(session_id), line!()).await;
2936-
// client DuplexStream is dropped here. This breaks the connection.
2999+
let (mut reader, _writer) = take_receiver2(&receiver_storage).await;
3000+
verify_stream2(&mut reader, &[(5u64, 105u64)], Some(session_id), line!()).await;
3001+
// Reader/writer dropped here. This breaks the connection.
29373002
};
29383003
}
29393004

@@ -2956,10 +3021,16 @@ mod tests {
29563021
// because none of them is acked.
29573022
for i in 0..n {
29583023
{
2959-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
2960-
let id = verify_stream(
2961-
&mut stream,
2962-
&[(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)],
3024+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
3025+
let id = verify_stream2(
3026+
&mut reader,
3027+
&[
3028+
(0u64, 100u64),
3029+
(1u64, 101u64),
3030+
(2u64, 102u64),
3031+
(3u64, 103u64),
3032+
(4u64, 104u64),
3033+
],
29633034
session_id,
29643035
line!(),
29653036
)
@@ -2972,51 +3043,51 @@ mod tests {
29723043
// In the last iteration, ack part of the messages. This should
29733044
// prune them from future resent.
29743045
if i == n - 1 {
2975-
sink.send(serialize_ack(1)).await.unwrap();
3046+
writer = send_frame2(writer, serialize_ack(1)).await;
29763047
// Wait for the acks to be processed by NetTx.
29773048
RealClock.sleep(Duration::from_secs(3)).await;
29783049
}
29793050
// client DuplexStream is dropped here. This breaks the connection.
3051+
drop(reader);
3052+
drop(writer);
29803053
};
29813054
}
29823055

29833056
// Verify only unacked are resent.
29843057
for _ in 0..n {
29853058
{
2986-
let client = receiver_storage.take().await;
2987-
let framed = Framed::new(client, build_codec());
2988-
let (_sink, mut stream) = futures::StreamExt::split(framed);
2989-
verify_stream(
2990-
&mut stream,
2991-
&[(2, 102), (3, 103), (4, 104)],
3059+
let (mut reader, mut _writer) = take_receiver2(&receiver_storage).await;
3060+
verify_stream2(
3061+
&mut reader,
3062+
&[(2u64, 102u64), (3u64, 103u64), (4u64, 104u64)],
29923063
session_id,
29933064
line!(),
29943065
)
29953066
.await;
2996-
// client DuplexStream is dropped here. This breaks the connection.
3067+
// drop(reader/_writer) at scope end
29973068
};
29983069
}
29993070

30003071
// Now send more messages.
3001-
net_tx_send(&tx, &[105, 106, 107, 108, 109]).await;
3072+
net_tx_send(&tx, &[105u64, 106u64, 107u64, 108u64, 109u64]).await;
30023073
// Verify the unacked messages from the 1st send will be grouped with
30033074
// the 2nd send.
30043075
for i in 0..n {
30053076
{
3006-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
3007-
verify_stream(
3008-
&mut stream,
3077+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
3078+
verify_stream2(
3079+
&mut reader,
30093080
&[
30103081
// From the 1st send.
3011-
(2, 102),
3012-
(3, 103),
3013-
(4, 104),
3082+
(2u64, 102u64),
3083+
(3u64, 103u64),
3084+
(4u64, 104u64),
30143085
// From the 2nd send.
3015-
(5, 105),
3016-
(6, 106),
3017-
(7, 107),
3018-
(8, 108),
3019-
(9, 109),
3086+
(5u64, 105u64),
3087+
(6u64, 106u64),
3088+
(7u64, 107u64),
3089+
(8u64, 108u64),
3090+
(9u64, 109u64),
30203091
],
30213092
session_id,
30223093
line!(),
@@ -3028,30 +3099,32 @@ mod tests {
30283099
if i == n - 1 {
30293100
// Intentionally ack 1 again to verify it is okay to ack
30303101
// messages that was already acked.
3031-
sink.send(serialize_ack(1)).await.unwrap();
3032-
sink.send(serialize_ack(2)).await.unwrap();
3033-
sink.send(serialize_ack(3)).await.unwrap();
3102+
writer = send_frame2(writer, serialize_ack(1)).await;
3103+
writer = send_frame2(writer, serialize_ack(2)).await;
3104+
writer = send_frame2(writer, serialize_ack(3)).await;
30343105
// Wait for the acks to be processed by NetTx.
30353106
RealClock.sleep(Duration::from_secs(3)).await;
30363107
}
30373108
// client DuplexStream is dropped here. This breaks the connection.
3109+
drop(reader);
3110+
drop(writer);
30383111
};
30393112
}
30403113

30413114
for i in 0..n {
30423115
{
3043-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
3044-
verify_stream(
3045-
&mut stream,
3116+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
3117+
verify_stream2(
3118+
&mut reader,
30463119
&[
30473120
// From the 1st send.
3048-
(4, 104),
3121+
(4u64, 104),
30493122
// From the 2nd send.
3050-
(5, 105),
3051-
(6, 106),
3052-
(7, 107),
3053-
(8, 108),
3054-
(9, 109),
3123+
(5u64, 105u64),
3124+
(6u64, 106u64),
3125+
(7u64, 107u64),
3126+
(8u64, 108u64),
3127+
(9u64, 109u64),
30553128
],
30563129
session_id,
30573130
line!(),
@@ -3060,29 +3133,33 @@ mod tests {
30603133

30613134
// In the last iteration, ack part of the messages from the 2nd send.
30623135
if i == n - 1 {
3063-
sink.send(serialize_ack(7)).await.unwrap();
3136+
writer = send_frame2(writer, serialize_ack(7)).await;
30643137
// Wait for the acks to be processed by NetTx.
30653138
RealClock.sleep(Duration::from_secs(3)).await;
30663139
}
30673140
// client DuplexStream is dropped here. This breaks the connection.
3141+
drop(reader);
3142+
drop(writer);
30683143
};
30693144
}
30703145

30713146
for _ in 0..n {
30723147
{
3073-
let (_sink, mut stream) = take_receiver(&receiver_storage).await;
3074-
verify_stream(
3075-
&mut stream,
3148+
let (mut reader, writer) = take_receiver2(&receiver_storage).await;
3149+
verify_stream2(
3150+
&mut reader,
30763151
&[
30773152
// From the 2nd send.
3078-
(8, 108),
3079-
(9, 109),
3153+
(8u64, 108u64),
3154+
(9u64, 109u64),
30803155
],
30813156
session_id,
30823157
line!(),
30833158
)
30843159
.await;
30853160
// client DuplexStream is dropped here. This breaks the connection.
3161+
drop(reader);
3162+
drop(writer);
30863163
};
30873164
}
30883165
}

0 commit comments

Comments
 (0)