Skip to content

Commit f7c80fa

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add native v1 castings (#1304)
Summary: Pull Request resolved: #1304 This diff adds native v1 casting implementation, instead of piggybacking v0's implementation. Specifically, this diff: 1. Add V1 handlers in comm actors, i.e. `CastMessageV1` and `ForwardMessageV1`. 2. On the v1 ActorMesh side, use `Sequencers` to generate seq numbers, and send to comm actors through `CastMessageV1`. 3. Update several direct send call sites to use `ActorRef::seq_send`, so seq numbers are always assigned. Note that we still need to wait for all `&impl cap::CanSend` callsites to be updated with `&impl context::Actor` before we can turn on the native v1 casting in production, since we need to ensure all `direct send`s are also getting the sequence number assigned. Differential Revision: D82537988
1 parent 57ac46f commit f7c80fa

File tree

15 files changed

+673
-88
lines changed

15 files changed

+673
-88
lines changed

hyperactor/src/actor.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,7 @@ mod tests {
10851085

10861086
// Returning the sequence number assigned to the message.
10871087
#[derive(Debug)]
1088-
#[hyperactor::export(handlers = [String])]
1088+
#[hyperactor::export(handlers = [String, Callback])]
10891089
struct GetSeqActor(PortRef<(String, SeqInfo)>);
10901090

10911091
#[async_trait]
@@ -1111,6 +1111,28 @@ mod tests {
11111111
}
11121112
}
11131113

1114+
// Unlike Handler<String>, where the sender provides the string message
1115+
// directly, in Hanlder<Callback>, sender needs to provide a port, and
1116+
// handler will reply that port with its own callback port. Then sender can
1117+
// send the string message through thsi callback port.
1118+
#[derive(Clone, Debug, Serialize, Deserialize, Named)]
1119+
struct Callback(PortRef<PortRef<String>>);
1120+
1121+
#[async_trait]
1122+
impl Handler<Callback> for GetSeqActor {
1123+
async fn handle(
1124+
&mut self,
1125+
cx: &Context<Self>,
1126+
message: Callback,
1127+
) -> Result<(), anyhow::Error> {
1128+
let (handle, mut receiver) = cx.open_port::<String>();
1129+
let callback_ref = handle.bind();
1130+
message.0.send(cx, callback_ref).unwrap();
1131+
let msg = receiver.recv().await.unwrap();
1132+
self.handle(cx, msg).await
1133+
}
1134+
}
1135+
11141136
#[async_timed_test(timeout_secs = 30)]
11151137
async fn test_sequencing_actor_handle_basic() {
11161138
let proc = Proc::local();
@@ -1151,6 +1173,37 @@ mod tests {
11511173
}
11521174
}
11531175

1176+
// Verify that we can pass port refs between sender and destination actors
1177+
// back and forward, and send messages through them without being deadlocked.
1178+
#[async_timed_test(timeout_secs = 30)]
1179+
async fn test_sequencing_actor_handle_callback() {
1180+
let config = config::global::lock();
1181+
let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1182+
1183+
let proc = Proc::local();
1184+
let (client, _) = proc.instance("client").unwrap();
1185+
let (tx, mut rx) = client.open_port();
1186+
1187+
let actor_handle = proc
1188+
.spawn::<GetSeqActor>("get_seq", tx.bind())
1189+
.await
1190+
.unwrap();
1191+
let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1192+
1193+
let (callback_tx, mut callback_rx) = client.open_port();
1194+
actor_ref
1195+
.send(&client, Callback(callback_tx.bind()))
1196+
.unwrap();
1197+
let msg_port_ref = callback_rx.recv().await.unwrap();
1198+
msg_port_ref.send(&client, "finally".to_string()).unwrap();
1199+
1200+
let session_id = client.sequencer().session_id();
1201+
assert_eq!(
1202+
rx.recv().await.unwrap(),
1203+
("finally".to_string(), SeqInfo { session_id, seq: 1 })
1204+
);
1205+
}
1206+
11541207
// Adding a delay before sending the destination proc. Useful for tests
11551208
// requiring latency injection.
11561209
#[derive(Debug)]
@@ -1254,7 +1307,7 @@ mod tests {
12541307

12551308
// By disabling the actor side re-ordering buffer, the mssages will
12561309
// be processed in the same order as they sent out.
1257-
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, false);
1310+
let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
12581311
assert_out_of_order_delivery(
12591312
vec![("second".to_string(), 2), ("first".to_string(), 1)],
12601313
latency_plan.clone(),
@@ -1263,7 +1316,7 @@ mod tests {
12631316

12641317
// By enabling the actor side re-ordering buffer, the mssages will
12651318
// be re-ordered before being processed.
1266-
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, true);
1319+
let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
12671320
assert_out_of_order_delivery(
12681321
vec![("first".to_string(), 1), ("second".to_string(), 2)],
12691322
latency_plan.clone(),
@@ -1281,7 +1334,7 @@ mod tests {
12811334

12821335
// By enabling the actor side re-ordering buffer, the mssages will
12831336
// be re-ordered before being processed.
1284-
let _guard = config.override_key(config::ENABLE_CLIENT_SEQ_ASSIGNMENT, true);
1337+
let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
12851338
let expected = (1..10000)
12861339
.map(|i| (format!("msg{i}"), i))
12871340
.collect::<Vec<_>>();

hyperactor/src/config.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,12 @@ declare_attrs! {
110110
pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
111111

112112
/// Whether to enable client sequence assignment.
113-
pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
113+
@meta(CONFIG_ENV_VAR = "HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string())
114+
pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = false;
115+
116+
/// Whether to use native v1 casting in v1 ActorMesh.
117+
@meta(CONFIG_ENV_VAR = "HYPERACTOR_ENABLE_NATIVE_V1_CASTING".to_string())
118+
pub attr ENABLE_NATIVE_V1_CASTING: bool = false;
114119

115120
/// Timeout for [`Host::spawn`] to await proc readiness.
116121
///

hyperactor/src/proc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ impl<A: Actor> Instance<A> {
946946
let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
947947
let (work_tx, work_rx) = ordered_channel(
948948
actor_id.to_string(),
949-
config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
949+
config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
950950
);
951951
let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
952952
proc.state().proc_muxer.bind_mailbox(mailbox.clone());

hyperactor/src/reference.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,9 +899,21 @@ impl PortId {
899899
cx: &impl context::Actor,
900900
serialized: Serialized,
901901
mut headers: Attrs,
902+
) {
903+
self.send_with_headers_with_option(cx, serialized, headers, true);
904+
}
905+
906+
/// Similar to [`PortId::send_with_headers`], but allows the caller to
907+
/// decide whether to set the sequence info header with this method.
908+
pub fn send_with_headers_with_option(
909+
&self,
910+
cx: &impl context::Actor,
911+
serialized: Serialized,
912+
mut headers: Attrs,
913+
set_seq_info: bool,
902914
) {
903915
crate::mailbox::headers::set_send_timestamp(&mut headers);
904-
if self.is_actor_port() {
916+
if set_seq_info && self.is_actor_port() {
905917
// This method is infallible so is okay to assign the sequence number
906918
// without worrying about rollback.
907919
let sequencer = cx.instance().sequencer();

hyperactor_mesh/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal
8181
tokio-util = { version = "0.7.15", features = ["full"] }
8282
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
8383
tracing-subscriber = { version = "0.3.20", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
84+
uuid = { version = "1.2", features = ["serde", "v4", "v5", "v6", "v7", "v8"] }
8485

8586
[dev-dependencies]
8687
bytes = { version = "1.10", features = ["serde"] }
88+
fastrand = "2.1.1"
8789
itertools = "0.14.0"
8890
maplit = "1.0"
8991
proptest = "1.5"

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,17 +1092,17 @@ mod tests {
10921092
let actor_mesh: RootActorMesh<TestActor> = mesh.spawn("test", &()).await.unwrap();
10931093
let actor_ref = actor_mesh.get(0).unwrap();
10941094
let mut headers = Attrs::new();
1095-
set_cast_info_on_headers(&mut headers, extent.point_of_rank(0).unwrap(), mesh.client().self_id().clone());
1095+
set_cast_info_on_headers(&mut headers, extent.point_of_rank(0).unwrap(), mesh.client().self_id().clone(), None);
10961096
actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
10971097
assert_eq!(0, reply_port_receiver.recv().await.unwrap());
10981098

1099-
set_cast_info_on_headers(&mut headers, extent.point_of_rank(1).unwrap(), mesh.client().self_id().clone());
1099+
set_cast_info_on_headers(&mut headers, extent.point_of_rank(1).unwrap(), mesh.client().self_id().clone(), None);
11001100
actor_ref.port()
11011101
.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
11021102
.unwrap();
11031103
assert_eq!(1, reply_port_receiver.recv().await.unwrap());
11041104

1105-
set_cast_info_on_headers(&mut headers, extent.point_of_rank(2).unwrap(), mesh.client().self_id().clone());
1105+
set_cast_info_on_headers(&mut headers, extent.point_of_rank(2).unwrap(), mesh.client().self_id().clone(), None);
11061106
actor_ref.actor_id()
11071107
.port_id(GetRank::port())
11081108
.send_with_headers(

hyperactor_mesh/src/bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3322,7 +3322,7 @@ mod tests {
33223322
actor_mesh
33233323
.cast(&instance, testactor::GetActorId(port.bind()))
33243324
.unwrap();
3325-
let got_id = rx.recv().await.unwrap();
3325+
let (got_id, _seq) = rx.recv().await.unwrap();
33263326
assert_eq!(
33273327
got_id,
33283328
actor_mesh.values().next().unwrap().actor_id().clone()

0 commit comments

Comments
 (0)