Skip to content

Commit 3750fce

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Assign seq for PortHandle/PortRef
Differential Revision: D83591305
1 parent 597e07b commit 3750fce

File tree

6 files changed

+400
-38
lines changed

6 files changed

+400
-38
lines changed

hyperactor/src/actor.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,9 +716,12 @@ pub trait RemoteHandles<M: RemoteMessage>: Referable {}
716716

717717
#[cfg(test)]
718718
mod tests {
719+
use std::collections::HashMap;
719720
use std::sync::Mutex;
720721
use std::time::Duration;
721722

723+
use timed_test::async_timed_test;
724+
use tokio::sync::mpsc;
722725
use tokio::time::timeout;
723726

724727
use super::*;
@@ -728,6 +731,13 @@ mod tests {
728731
use crate::PortRef;
729732
use crate::checkpoint::CheckpointError;
730733
use crate::checkpoint::Checkpointable;
734+
use crate::config;
735+
use crate::id;
736+
use crate::mailbox::BoxedMailboxSender;
737+
use crate::mailbox::MailboxSender;
738+
use crate::mailbox::monitored_return_handle;
739+
use crate::proc::SEQ_INFO;
740+
use crate::proc::SeqInfo;
731741
use crate::test_utils::pingpong::PingPongActor;
732742
use crate::test_utils::pingpong::PingPongActorParams;
733743
use crate::test_utils::pingpong::PingPongMessage;
@@ -1072,4 +1082,210 @@ mod tests {
10721082
handle.drain_and_stop().unwrap();
10731083
handle.await;
10741084
}
1085+
1086+
// Returning the sequence number assigned to the message.
1087+
#[derive(Debug)]
1088+
#[hyperactor::export(handlers = [String])]
1089+
struct GetSeqActor(PortRef<(String, SeqInfo)>);
1090+
1091+
#[async_trait]
1092+
impl Actor for GetSeqActor {
1093+
type Params = PortRef<(String, SeqInfo)>;
1094+
1095+
async fn new(params: PortRef<(String, SeqInfo)>) -> Result<Self, anyhow::Error> {
1096+
Ok(Self(params))
1097+
}
1098+
}
1099+
1100+
#[async_trait]
1101+
impl Handler<String> for GetSeqActor {
1102+
async fn handle(
1103+
&mut self,
1104+
cx: &Context<Self>,
1105+
message: String,
1106+
) -> Result<(), anyhow::Error> {
1107+
let Self(port) = self;
1108+
let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1109+
port.send(cx, (message, seq_info.clone()))?;
1110+
Ok(())
1111+
}
1112+
}
1113+
1114+
#[async_timed_test(timeout_secs = 30)]
1115+
async fn test_sequencing_actor_handle_basic() {
1116+
let proc = Proc::local();
1117+
let (client, _) = proc.instance("client").unwrap();
1118+
let (tx, mut rx) = client.open_port();
1119+
1120+
let actor_handle = proc
1121+
.spawn::<GetSeqActor>("get_seq", tx.bind())
1122+
.await
1123+
.unwrap();
1124+
let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1125+
1126+
let session_id = client.sequencer().session_id();
1127+
let mut expected_seq = 0;
1128+
// Interleave messages sent through the handle and the reference.
1129+
for _ in 0..10 {
1130+
actor_handle.send(&client, "".to_string()).unwrap();
1131+
expected_seq += 1;
1132+
assert_eq!(
1133+
rx.recv().await.unwrap().1,
1134+
SeqInfo {
1135+
session_id,
1136+
seq: expected_seq,
1137+
}
1138+
);
1139+
1140+
for _ in 0..2 {
1141+
actor_ref.port().send(&client, "".to_string()).unwrap();
1142+
expected_seq += 1;
1143+
assert_eq!(
1144+
rx.recv().await.unwrap().1,
1145+
SeqInfo {
1146+
session_id,
1147+
seq: expected_seq,
1148+
}
1149+
);
1150+
}
1151+
}
1152+
}
1153+
1154+
// Adding a delay before sending the destination proc. Useful for tests
1155+
// requiring latency injection.
1156+
#[derive(Debug)]
1157+
struct DelayedMailboxSender {
1158+
relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1159+
}
1160+
1161+
impl DelayedMailboxSender {
1162+
// Use a random latency between 0 and 1 second if the plan is empty.
1163+
fn boxed(dest_proc: Proc, latency_plan: HashMap<u64, Duration>) -> BoxedMailboxSender {
1164+
let (relay_tx, mut relay_rx) = mpsc::unbounded_channel();
1165+
tokio::spawn(async move {
1166+
let mut count = 0;
1167+
while let Some(envelope) = relay_rx.recv().await {
1168+
count += 1;
1169+
1170+
let latency = if latency_plan.is_empty() {
1171+
Duration::from_millis(1000)
1172+
} else {
1173+
latency_plan.get(&count).unwrap().clone()
1174+
};
1175+
1176+
let dest_proc_clone = dest_proc.clone();
1177+
tokio::spawn(async move {
1178+
// Need Clock::sleep is an async function.
1179+
#[allow(clippy::disallowed_methods)]
1180+
tokio::time::sleep(latency).await;
1181+
dest_proc_clone.post(envelope, monitored_return_handle());
1182+
});
1183+
}
1184+
});
1185+
1186+
BoxedMailboxSender::new(Self { relay_tx })
1187+
}
1188+
}
1189+
1190+
impl MailboxSender for DelayedMailboxSender {
1191+
fn post_unchecked(
1192+
&self,
1193+
envelope: MessageEnvelope,
1194+
_return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1195+
) {
1196+
self.relay_tx.send(envelope).unwrap();
1197+
}
1198+
}
1199+
1200+
async fn assert_out_of_order_delivery(
1201+
expected: Vec<(String, u64)>,
1202+
latency_plan: HashMap<u64, Duration>,
1203+
) {
1204+
let local_proc: Proc = Proc::local();
1205+
let (client, _) = local_proc.instance("local").unwrap();
1206+
let (tx, mut rx) = client.open_port();
1207+
1208+
let handle = local_proc
1209+
.spawn::<GetSeqActor>("get_seq", tx.bind())
1210+
.await
1211+
.unwrap();
1212+
1213+
let actor_ref: ActorRef<GetSeqActor> = handle.bind();
1214+
1215+
let remote_proc = Proc::new(
1216+
id!(remote[0]),
1217+
DelayedMailboxSender::boxed(local_proc.clone(), latency_plan),
1218+
);
1219+
let (remote_client, _) = remote_proc.instance("remote").unwrap();
1220+
// Send the messages out in the order of their expected sequence numbers.
1221+
let mut messages = expected.clone();
1222+
messages.sort_by_key(|v| v.1);
1223+
for (message, _seq) in messages {
1224+
actor_ref.send(&remote_client, message).unwrap();
1225+
}
1226+
let session_id = remote_client.sequencer().session_id();
1227+
for expect in expected {
1228+
let expected = (
1229+
expect.0,
1230+
SeqInfo {
1231+
session_id,
1232+
seq: expect.1,
1233+
},
1234+
);
1235+
assert_eq!(rx.recv().await.unwrap(), expected);
1236+
}
1237+
1238+
handle.drain_and_stop().unwrap();
1239+
handle.await;
1240+
}
1241+
1242+
// Send several messages, use DelayedMailboxSender and the latency plan to
1243+
// ensure these messages will arrive at handler's workq in a determinstic
1244+
// out-of-order way. Then verify the actor handler will still process these
1245+
// messages based on their sending order if reordering buffer is enabled.
1246+
#[async_timed_test(timeout_secs = 30)]
1247+
async fn test_sequencing_actor_ref_out_of_order_deterministic() {
1248+
let config = config::global::lock();
1249+
1250+
let latency_plan = maplit::hashmap! {
1251+
1 => Duration::from_millis(1000),
1252+
2 => Duration::from_millis(0),
1253+
};
1254+
1255+
// By disabling the actor side re-ordering buffer, the mssages will
1256+
// be processed in the same order as they sent out.
1257+
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, false);
1258+
assert_out_of_order_delivery(
1259+
vec![("second".to_string(), 2), ("first".to_string(), 1)],
1260+
latency_plan.clone(),
1261+
)
1262+
.await;
1263+
1264+
// By enabling the actor side re-ordering buffer, the mssages will
1265+
// be re-ordered before being processed.
1266+
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, true);
1267+
assert_out_of_order_delivery(
1268+
vec![("first".to_string(), 1), ("second".to_string(), 2)],
1269+
latency_plan.clone(),
1270+
)
1271+
.await;
1272+
}
1273+
1274+
// Send a large nubmer of messages, use DelayedMailboxSender to ensure these
1275+
// messages will arrive at handler's workq in a random order. Then verify the
1276+
// actor handler will still process these messages based on their sending
1277+
// order with reordering buffer enabled.
1278+
#[async_timed_test(timeout_secs = 30)]
1279+
async fn test_sequencing_actor_ref_out_of_order_random() {
1280+
let config = config::global::lock();
1281+
1282+
// By enabling the actor side re-ordering buffer, the mssages will
1283+
// be re-ordered before being processed.
1284+
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, true);
1285+
let expected = (1..10000)
1286+
.map(|i| (format!("msg{i}"), i))
1287+
.collect::<Vec<_>>();
1288+
1289+
assert_out_of_order_delivery(expected, HashMap::new()).await;
1290+
}
10751291
}

hyperactor/src/data.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use serde::de::DeserializeOwned;
2323
use crate as hyperactor;
2424
use crate::config;
2525

26+
/// Actor handler port should have its most significant bit set to 1.
27+
pub(crate) static ACTOR_PORT_BIT: u64 = 1 << 63;
28+
2629
/// A [`Named`] type is a type that has a globally unique name.
2730
pub trait Named: Sized + 'static {
2831
/// The globally unique type name for the type.
@@ -46,7 +49,7 @@ pub trait Named: Sized + 'static {
4649
/// The globally unique port for this type. Typed ports are in the range
4750
/// of 1<<63..1<<64-1.
4851
fn port() -> u64 {
49-
Self::typehash() | (1 << 63)
52+
Self::typehash() | ACTOR_PORT_BIT
5053
}
5154

5255
/// If the named type is an enum, this returns the name of the arm

hyperactor/src/mailbox.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ use crate::context;
121121
use crate::data::Serialized;
122122
use crate::id;
123123
use crate::metrics;
124+
use crate::proc::SEQ_INFO;
125+
use crate::proc::SeqInfo;
124126
use crate::reference::ActorId;
125127
use crate::reference::PortId;
126128
use crate::reference::Reference;
@@ -1575,20 +1577,35 @@ impl<M: Message> PortHandle<M> {
15751577
}
15761578
}
15771579

1578-
fn location(&self) -> PortLocation {
1580+
pub(crate) fn location(&self) -> PortLocation {
15791581
match self.bound.get() {
15801582
Some(port_id) => PortLocation::Bound(port_id.clone()),
15811583
None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
15821584
}
15831585
}
15841586

15851587
/// Send a message to this port.
1586-
pub fn send(&self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1588+
pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
15871589
let mut headers = Attrs::new();
15881590

15891591
crate::mailbox::headers::set_send_timestamp(&mut headers);
1590-
// TODO(pzhang) Use cx to add SEQ_INFO header.
1591-
1592+
// Message sent from handle is delivered immediately. It could race with
1593+
// messages from refs. So we need to assign seq if the handle is bound.
1594+
if let Some(bound_port) = self.bound.get()
1595+
&& bound_port.is_actor_port()
1596+
{
1597+
let sequencer = cx.instance().sequencer();
1598+
let seq = sequencer.assign_seq(self.mailbox.actor_id());
1599+
let seq_info = SeqInfo {
1600+
session_id: sequencer.session_id(),
1601+
seq,
1602+
};
1603+
headers.set(SEQ_INFO, seq_info);
1604+
}
1605+
// Encountering error means the port is closed. So we do not need to
1606+
// rollback the seq, because no message can be delivered to it, and
1607+
// subsequently do not need to worry about out-of-sequence for messages
1608+
// after this seq.
15921609
self.sender.send(headers, message).map_err(|err| {
15931610
MailboxSenderError::new_unbound::<M>(
15941611
self.mailbox.actor_id().clone(),
@@ -1602,7 +1619,6 @@ impl<M: Message> PortHandle<M> {
16021619
pub fn anon_send(&self, message: M) -> Result<(), MailboxSenderError> {
16031620
let mut headers = Attrs::new();
16041621
crate::mailbox::headers::set_send_timestamp(&mut headers);
1605-
16061622
self.sender.send(headers, message).map_err(|err| {
16071623
MailboxSenderError::new_unbound::<M>(
16081624
self.mailbox.actor_id().clone(),

hyperactor/src/proc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1846,7 +1846,7 @@ pub struct Ports<A: Actor> {
18461846
}
18471847

18481848
/// A message's sequencer number infomation.
1849-
#[derive(Serialize, Deserialize, Clone, Named, AttrValue)]
1849+
#[derive(Debug, Serialize, Deserialize, Clone, Named, AttrValue, PartialEq)]
18501850
pub struct SeqInfo {
18511851
/// Message's session ID
18521852
pub session_id: Uuid,

0 commit comments

Comments
 (0)