Skip to content

Commit 1731e62

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
tighten the usage of bind_to (meta-pytorch#1437)
Summary: Pull Request resolved: meta-pytorch#1437 `PortHandle.bind_to` is not meant to be a public method. This diff makes it `pub(crate)`, rename it to `bind_to_actor_port` and remove its `port_index` parameter. https://www.internalfb.com/code/fbsource/[5e07de58da822924da9c2108a46ba4cd173a75c8]/fbcode/monarch/hyperactor/src/mailbox.rs?lines=1600%2C1613 However, this method is already used outside the `hyperactor` crate in a couple of places. This diff hides those usage behind a new method `Instance::<()>::bind_actor_port`. This diff also removes `Ports::bind_to` and fold its logic to where it is currently used. https://www.internalfb.com/code/fbsource/[5e07de58da822924da9c2108a46ba4cd173a75c8]/fbcode/monarch/hyperactor/src/proc.rs?lines=1862%2C1958 Reviewed By: mariusae Differential Revision: D83931993 fbshipit-source-id: 35dce72c235ec54b883df4c12c44d6141446db2c
1 parent 6ca383a commit 1731e62

File tree

10 files changed

+61
-67
lines changed

10 files changed

+61
-67
lines changed

controller/src/lib.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,6 @@ mod tests {
634634
use hyperactor::clock::Clock;
635635
use hyperactor::clock::RealClock;
636636
use hyperactor::context::Mailbox as _;
637-
use hyperactor::data::Named;
638637
use hyperactor::id;
639638
use hyperactor::mailbox::BoxedMailboxSender;
640639
use hyperactor::mailbox::DialMailboxRouter;
@@ -1129,8 +1128,7 @@ mod tests {
11291128

11301129
// Build a supervisor.
11311130
let sup_mail = system.attach().await.unwrap();
1132-
let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1133-
sup_tx.bind_to(ProcSupervisionMessage::port());
1131+
let (_sup_tx, _sup_rx) = sup_mail.bind_actor_port::<ProcSupervisionMessage>();
11341132
let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.self_id().clone());
11351133

11361134
// Construct a system sender.
@@ -1360,8 +1358,7 @@ mod tests {
13601358

13611359
// Build a supervisor.
13621360
let sup_mail = system.attach().await.unwrap();
1363-
let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1364-
sup_tx.bind_to(ProcSupervisionMessage::port());
1361+
let (_sup_tx, _sup_rx) = sup_mail.bind_actor_port::<ProcSupervisionMessage>();
13651362
let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.self_id().clone());
13661363

13671364
// Construct a system sender.
@@ -1665,9 +1662,8 @@ mod tests {
16651662
.await
16661663
.unwrap();
16671664

1668-
let (client_supervision_tx, mut client_supervision_rx) =
1669-
client_mailbox.open_port::<ClientMessage>();
1670-
client_supervision_tx.bind_to(ClientMessage::port());
1665+
let (_client_supervision_tx, mut client_supervision_rx) =
1666+
client_mailbox.bind_actor_port::<ClientMessage>();
16711667

16721668
// mock a proc actor that doesn't update supervision state
16731669
let (
@@ -1726,9 +1722,8 @@ mod tests {
17261722
// Client actor.
17271723
let mut system = System::new(server_handle.local_addr().clone());
17281724
let client_mailbox = system.attach().await.unwrap();
1729-
let (client_supervision_tx, mut client_supervision_rx) =
1730-
client_mailbox.open_port::<ClientMessage>();
1731-
client_supervision_tx.bind_to(ClientMessage::port());
1725+
let (_client_supervision_tx, mut client_supervision_rx) =
1726+
client_mailbox.bind_actor_port::<ClientMessage>();
17321727

17331728
// Bootstrap the controller
17341729
let controller_id = id!(controller[0].root);
@@ -1865,9 +1860,8 @@ mod tests {
18651860
// Client actor.
18661861
let mut system = System::new(server_handle.local_addr().clone());
18671862
let client_mailbox = system.attach().await.unwrap();
1868-
let (client_supervision_tx, mut client_supervision_rx) =
1863+
let (_client_supervision_tx, mut client_supervision_rx) =
18691864
client_mailbox.open_port::<ClientMessage>();
1870-
client_supervision_tx.bind_to(ClientMessage::port());
18711865

18721866
// Bootstrap the controller
18731867
let controller_id = id!(controller[0].root);

hyperactor/src/clock.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use serde::Serialize;
2222

2323
use crate::Mailbox;
2424
use crate::channel::ChannelAddr;
25-
use crate::data::Named;
2625
use crate::id;
2726
use crate::mailbox::DeliveryError;
2827
use crate::mailbox::MailboxSender;
@@ -261,9 +260,8 @@ impl SimClock {
261260
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
262261
SIMCLOCK_MAILBOX.get_or_init(|| {
263262
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
264-
let (undeliverable_messages, mut rx) =
265-
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
266-
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
263+
let (_undeliverable_messages, mut rx) =
264+
mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
267265
tokio::spawn(async move {
268266
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
269267
envelope.set_error(DeliveryError::BrokenLink(

hyperactor/src/mailbox.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,18 @@ impl Mailbox {
12441244
)
12451245
}
12461246

1247+
/// Bind this message's actor port to this actor's mailbox. This method is
1248+
/// normally used:
1249+
/// 1. when we need to intercept a message sent to a handler, and re-route
1250+
/// that message to the returned receiver;
1251+
/// 2. mock this message's handler when it is not implemented for this actor
1252+
/// type, with the returned receiver.
1253+
pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1254+
let (handle, receiver) = self.open_port();
1255+
handle.bind_actor_port();
1256+
(handle, receiver)
1257+
}
1258+
12471259
/// Open a new port with an accumulator with default reduce options.
12481260
/// See [`open_accum_port_opts`] for more details.
12491261
pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
@@ -1389,13 +1401,14 @@ impl Mailbox {
13891401
PortRef::attest(port_id)
13901402
}
13911403

1392-
fn bind_to<M: RemoteMessage>(&self, handle: &PortHandle<M>, port_index: u64) {
1404+
fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
13931405
assert_eq!(
13941406
handle.mailbox.actor_id(),
13951407
self.actor_id(),
13961408
"port does not belong to mailbox"
13971409
);
13981410

1411+
let port_index = M::port();
13991412
let port_id = self.actor_id().port_id(port_index);
14001413
match self.inner.ports.entry(port_index) {
14011414
Entry::Vacant(entry) => {
@@ -1625,10 +1638,13 @@ impl<M: RemoteMessage> PortHandle<M> {
16251638
)
16261639
}
16271640

1628-
/// Bind to a specific port index. This is used by [`actor::Binder`] implementations to
1629-
/// bind actor refs. This is not intended for general use.
1630-
pub fn bind_to(&self, port_index: u64) {
1631-
self.mailbox.bind_to(self, port_index);
1641+
/// Bind to this message's actor port. This method will panic if the handle
1642+
/// is already bound.
1643+
///
1644+
/// This is used by [`actor::Binder`] implementations to bind actor refs.
1645+
/// This is not intended for general use.
1646+
pub(crate) fn bind_actor_port(&self) {
1647+
self.mailbox.bind_to_actor_port(self);
16321648
}
16331649
}
16341650

@@ -3648,8 +3664,7 @@ mod tests {
36483664
actor_id.clone(),
36493665
BoxedMailboxSender::new(AsyncLoopForwarder),
36503666
);
3651-
let (ret_port, mut ret_rx) = mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3652-
ret_port.bind_to(Undeliverable::<MessageEnvelope>::port());
3667+
let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
36533668

36543669
// Create a destination not owned by this mailbox to force
36553670
// forwarding.
@@ -3696,9 +3711,8 @@ mod tests {
36963711
actor_id.clone(),
36973712
BoxedMailboxSender::new(PanickingMailboxSender),
36983713
);
3699-
let (undeliverable_tx, mut undeliverable_rx) =
3700-
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3701-
undeliverable_tx.bind_to(Undeliverable::<MessageEnvelope>::port());
3714+
let (_undeliverable_tx, mut undeliverable_rx) =
3715+
mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
37023716

37033717
// Open a local user u64 port.
37043718
let (user_port, mut user_rx) = mailbox.open_port::<u64>();

hyperactor/src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ impl<M: Bind> IndexedErasedUnbound<M> {
249249
Ok(())
250250
}
251251
});
252-
port_handle.bind_to(IndexedErasedUnbound::<M>::port());
252+
port_handle.bind_actor_port();
253253
Ok(())
254254
}
255255
}

hyperactor/src/proc.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,7 @@ impl Proc {
477477
R: Referable + RemoteHandles<M>,
478478
{
479479
let (instance, _handle) = self.instance(name)?;
480-
let (handle, rx) = instance.open_port::<M>();
481-
handle.bind_to(M::port());
480+
let (_handle, rx) = instance.bind_actor_port::<M>();
482481
let actor_ref = ActorRef::attest(instance.self_id().clone());
483482
Ok((instance, actor_ref, rx))
484483
}
@@ -1509,6 +1508,17 @@ impl<A: Actor> context::Actor for &Context<'_, A> {
15091508
}
15101509
}
15111510

1511+
impl Instance<()> {
1512+
/// See [Mailbox::bind_actor_port] for details.
1513+
pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1514+
assert!(
1515+
self.actor_task_handle().is_none(),
1516+
"can only bind actor port on instance with no running actor task"
1517+
);
1518+
self.mailbox.bind_actor_port()
1519+
}
1520+
}
1521+
15121522
#[derive(Debug)]
15131523
enum ActorType {
15141524
Named(&'static TypeInfo),
@@ -1947,24 +1957,15 @@ impl<A: Actor> Ports<A> {
19471957
}
19481958
}
19491959

1950-
/// Bind the given message type to its default port.
1960+
/// Bind the given message type to its actor port.
19511961
pub fn bind<M: RemoteMessage>(&self)
19521962
where
19531963
A: Handler<M>,
19541964
{
1955-
self.bind_to::<M>(M::port());
1956-
}
1957-
1958-
/// Bind the given message type to the provided port.
1959-
/// Ports cannot be rebound to different message types;
1960-
/// and attempting to do so will result in a panic.
1961-
pub fn bind_to<M: RemoteMessage>(&self, port_index: u64)
1962-
where
1963-
A: Handler<M>,
1964-
{
1965+
let port_index = M::port();
19651966
match self.bound.entry(port_index) {
19661967
Entry::Vacant(entry) => {
1967-
self.get::<M>().bind_to(port_index);
1968+
self.get::<M>().bind_actor_port();
19681969
entry.insert(M::typename());
19691970
}
19701971
Entry::Occupied(entry) => {

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use hyperactor::ActorHandle;
2222
use hyperactor::ActorId;
2323
use hyperactor::ActorRef;
2424
use hyperactor::Instance;
25-
use hyperactor::Named;
2625
use hyperactor::RemoteMessage;
2726
use hyperactor::WorldId;
2827
use hyperactor::actor::ActorStatus;
@@ -188,9 +187,8 @@ pub fn global_root_client() -> &'static Instance<()> {
188187
// The hook logs each undeliverable, along with whether a sink
189188
// was present at the time of receipt, which helps diagnose
190189
// lost or misrouted events.
191-
let (undeliverable_tx, undeliverable_rx) =
192-
client.open_port::<Undeliverable<MessageEnvelope>>();
193-
undeliverable_tx.bind_to(Undeliverable::<MessageEnvelope>::port());
190+
let (_undeliverable_tx, undeliverable_rx) =
191+
client.bind_actor_port::<Undeliverable<MessageEnvelope>>();
194192
hyperactor::mailbox::supervise_undeliverable_messages_with(
195193
undeliverable_rx,
196194
crate::proc_mesh::get_global_supervision_sink,
@@ -346,9 +344,8 @@ impl ProcMesh {
346344
// `global_root_client()`.
347345
let (client, _handle) = client_proc.instance("client")?;
348346
// Bind an undeliverable message port in the client.
349-
let (undeliverable_messages, client_undeliverable_receiver) =
350-
client.open_port::<Undeliverable<MessageEnvelope>>();
351-
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
347+
let (_undeliverable_messages, client_undeliverable_receiver) =
348+
client.bind_actor_port::<Undeliverable<MessageEnvelope>>();
352349
hyperactor::mailbox::supervise_undeliverable_messages(
353350
supervision_port.clone(),
354351
client_undeliverable_receiver,

hyperactor_multiprocess/src/proc_actor.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,6 @@ mod tests {
878878
use hyperactor::channel::ChannelTransport;
879879
use hyperactor::clock::Clock;
880880
use hyperactor::clock::RealClock;
881-
use hyperactor::data::Named;
882881
use hyperactor::forward;
883882
use hyperactor::id;
884883
use hyperactor::reference::ActorRef;
@@ -1205,9 +1204,8 @@ mod tests {
12051204
// A test supervisor.
12061205
let mut system = System::new(server_handle.local_addr().clone());
12071206
let supervisor = system.attach().await.unwrap();
1208-
let (supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1209-
supervisor.open_port::<ProcSupervisionMessage>();
1210-
supervisor_supervision_tx.bind_to(ProcSupervisionMessage::port());
1207+
let (_supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1208+
supervisor.bind_actor_port::<ProcSupervisionMessage>();
12111209
let supervisor_actor_ref: ActorRef<ProcSupervisor> =
12121210
ActorRef::attest(supervisor.self_id().clone());
12131211

@@ -1388,8 +1386,7 @@ mod tests {
13881386

13891387
// Build a supervisor.
13901388
let supervisor = system.attach().await.unwrap();
1391-
let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1392-
sup_tx.bind_to(ProcSupervisionMessage::port());
1389+
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
13931390
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
13941391

13951392
// Construct a system sender.
@@ -1512,8 +1509,7 @@ mod tests {
15121509

15131510
// Build a supervisor.
15141511
let supervisor = system.attach().await.unwrap();
1515-
let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1516-
sup_tx.bind_to(ProcSupervisionMessage::port());
1512+
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
15171513
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
15181514

15191515
// Construct a system sender.

hyperactor_multiprocess/src/system_actor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,8 +2242,7 @@ mod tests {
22422242

22432243
// Build a supervisor.
22442244
let supervisor = system.attach().await.unwrap();
2245-
let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
2246-
sup_tx.bind_to(ProcSupervisionMessage::port());
2245+
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
22472246
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
22482247

22492248
// Construct a system sender.

monarch_hyperactor/src/proc.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use std::time::SystemTime;
2525

2626
use anyhow::Result;
2727
use hyperactor::ActorRef;
28-
use hyperactor::Named;
2928
use hyperactor::RemoteMessage;
3029
use hyperactor::actor::Signal;
3130
use hyperactor::channel;
@@ -453,11 +452,9 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
453452

454453
fn new_with_instance_and_clock(instance: Instance<()>, clock: ClockKind) -> Result<Self> {
455454
// TEMPORARY: remove after using fixed message ports.
456-
let (message_port, message_receiver) = instance.open_port::<M>();
457-
message_port.bind_to(M::port());
455+
let (_message_port, message_receiver) = instance.bind_actor_port::<M>();
458456

459-
let (signal_port, signal_receiver) = instance.open_port::<Signal>();
460-
signal_port.bind_to(<Signal as Named>::port());
457+
let (signal_port, signal_receiver) = instance.bind_actor_port::<Signal>();
461458

462459
let (controller_error_sender, controller_error_receiver) = watch::channel("".to_string());
463460
let actor_id = instance.self_id().clone();

monarch_tensor_worker/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,6 @@ mod tests {
11741174

11751175
use anyhow::Result;
11761176
use hyperactor::Instance;
1177-
use hyperactor::Named;
11781177
use hyperactor::WorldId;
11791178
use hyperactor::actor::ActorStatus;
11801179
use hyperactor::channel::ChannelAddr;
@@ -2348,8 +2347,7 @@ mod tests {
23482347

23492348
// Create a fake controller for the workers to talk to.
23502349
let client = System::new(system_addr.clone()).attach().await?;
2351-
let (handle, mut controller_rx) = client.open_port::<ControllerMessage>();
2352-
handle.bind_to(ControllerMessage::port());
2350+
let (_handle, mut controller_rx) = client.bind_actor_port::<ControllerMessage>();
23532351
let controller_ref: ActorRef<ControllerActor> = ActorRef::attest(client.self_id().clone());
23542352

23552353
// Create the worker world

0 commit comments

Comments
 (0)