Skip to content

Commit e911ff0

Browse files
channel: net: update unit-tests 3/N (#912)
Summary: Pull Request resolved: #912 replace `tokio_util::codec::LengthDelimitedCodec`/`Framed` in `test_ack_before_redelivery_in_net_tx`, `test_ack_exceeded_limit_with_connected_link`, and `test_ack_exceeded_limit_with_broken_link` with `hyperactor::channel::net`’s `FrameReader`/`FrameWrite`. behavior is unchanged; tests now exercise the production framer (zero-copy, cancellation-safe) directly. Reviewed By: mariusae Differential Revision: D80482767 fbshipit-source-id: bddca27f6a180f3903ad05c0db5de5a11a3d81f7
1 parent 4c2c4cb commit e911ff0

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

hyperactor/src/channel/net.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3174,10 +3174,10 @@ mod tests {
31743174
// trigger a connection.
31753175
let (return_channel_tx, return_channel_rx) = oneshot::channel();
31763176
net_tx.try_post(100, return_channel_tx).unwrap();
3177-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
3178-
verify_stream(&mut stream, &[(0, 100)], None, line!()).await;
3177+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
3178+
verify_stream2(&mut reader, &[(0u64, 100u64)], None, line!()).await;
31793179
// ack it
3180-
sink.send(serialize_ack(0)).await.unwrap();
3180+
writer = send_frame2(writer, serialize_ack(0)).await;
31813181
// confirm Tx received ack
31823182
//
31833183
// Using `is_err` to confirm the message is delivered/acked is confusing,
@@ -3188,12 +3188,12 @@ mod tests {
31883188
// Although Tx did not actually send seq=1, we still ack it from Rx to
31893189
// pretend Tx already sent it, just it did not know it was sent
31903190
// successfully.
3191-
sink.send(serialize_ack(1)).await.unwrap();
3191+
writer = send_frame2(writer, serialize_ack(1)).await;
31923192

31933193
let (return_channel_tx, return_channel_rx) = oneshot::channel();
31943194
net_tx.try_post(101, return_channel_tx).unwrap();
31953195
// Verify the message is sent to Rx.
3196-
verify_message(&mut stream, (1, 101), line!()).await;
3196+
verify_message2(&mut reader, (1u64, 101u64), line!()).await;
31973197
// although we did not ack the message after it is sent, since we already
31983198
// acked it previously, Tx will treat it as acked, and considered the
31993199
// message delivered successfully.
@@ -3216,19 +3216,19 @@ mod tests {
32163216
let mut tx_status = tx.status().clone();
32173217
// send a message
32183218
tx.try_post(100, unused_return_channel()).unwrap();
3219-
let (mut sink, mut stream) = take_receiver(&receiver_storage).await;
3219+
let (mut reader, mut writer) = take_receiver2(&receiver_storage).await;
32203220
// Confirm message is sent to rx.
3221-
verify_stream(&mut stream, &[(0, 100)], None, line!()).await;
3221+
verify_stream2(&mut reader, &[(0u64, 100u64)], None, line!()).await;
32223222
// ack it
3223-
sink.send(serialize_ack(0)).await.unwrap();
3223+
writer = send_frame2(writer, serialize_ack(0)).await;
32243224
RealClock.sleep(Duration::from_secs(3)).await;
32253225
// Channel should be still alive because ack was sent.
32263226
assert!(!tx_status.has_changed().unwrap());
32273227
assert_eq!(*tx_status.borrow(), TxStatus::Active);
32283228

32293229
tx.try_post(101, unused_return_channel()).unwrap();
32303230
// Confirm message is sent to rx.
3231-
verify_message(&mut stream, (1, 101), line!()).await;
3231+
verify_message2(&mut reader, (1u64, 101u64), line!()).await;
32323232

32333233
if disconnect_before_ack {
32343234
// Prevent link from reconnect

0 commit comments

Comments
 (0)