Skip to content

Commit a04a812

Browse files
zdevitofacebook-github-bot
authored andcommitted
Move supervision directly to PortReceiver object (#771)
Summary: Pull Request resolved: #771 To de-futureify actor spawning, I have to be able to create a port before the rust actormesh exists. So to monitor that port the monitor has to first wait for the actormesh and then wait for the monitor. This refactors how monitors are handled to make it possible to write this. As an added benefit, it removes a bunch of copy/paste for difference receiver types. ghstack-source-id: 301241546 exported-using-ghexport Reviewed By: mariusae Differential Revision: D79695335 fbshipit-source-id: d14cb519ea00702e1de7f29f63ad3a038380eb8a
1 parent d64085b commit a04a812

File tree

7 files changed

+105
-174
lines changed

7 files changed

+105
-174
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 16 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -194,29 +194,24 @@ impl PythonActorMesh {
194194
.map(ActorRef::into_actor_id)
195195
.map(PyActorId::from))
196196
}
197-
198-
fn supervise(&self, py: Python<'_>, receiver: Bound<'_, PyAny>) -> PyResult<PyObject> {
199-
if let Ok(r) = receiver.extract::<PyRef<PythonPortReceiver>>() {
200-
let rx = SupervisedPythonPortReceiver {
201-
inner: r.inner(),
202-
monitor: ActorMeshMonitor {
203-
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
204-
},
205-
};
206-
rx.into_py_any(py)
207-
} else if let Ok(r) = receiver.extract::<PyRef<PythonOncePortReceiver>>() {
208-
let rx = SupervisedPythonOncePortReceiver {
209-
inner: r.inner(),
210-
monitor: ActorMeshMonitor {
211-
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
197+
fn supervision_event(&self) -> PyResult<PyPythonTask> {
198+
let mut receiver = self.user_monitor_sender.subscribe();
199+
PyPythonTask::new(async move {
200+
let event = receiver.recv().await;
201+
let event = match event {
202+
Ok(Some(event)) => PyActorSupervisionEvent::from(event.clone()),
203+
Ok(None) | Err(_) => PyActorSupervisionEvent {
204+
// Dummy actor as placeholder to indicate the whole mesh is stopped
205+
// TODO(albertli): remove this when pushing all supervision logic to rust.
206+
actor_id: id!(default[0].actor[0]).into(),
207+
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
212208
},
213209
};
214-
rx.into_py_any(py)
215-
} else {
216-
Err(PyTypeError::new_err(
217-
"Expected a PortReceiver or OncePortReceiver",
218-
))
219-
}
210+
Ok(PyErr::new::<SupervisionError, _>(format!(
211+
"supervision error: {:?}",
212+
event
213+
)))
214+
})
220215
}
221216

222217
#[pyo3(signature = (**kwargs))]
@@ -406,106 +401,6 @@ impl Drop for PythonActorMesh {
406401
}
407402
}
408403

409-
#[derive(Debug, Clone)]
410-
struct ActorMeshMonitor {
411-
receiver: SharedCell<Mutex<tokio::sync::broadcast::Receiver<Option<ActorSupervisionEvent>>>>,
412-
}
413-
414-
impl ActorMeshMonitor {
415-
pub async fn next(&self) -> Result<PyActorSupervisionEvent, PyErr> {
416-
let receiver = self.receiver.clone();
417-
let receiver = receiver
418-
.borrow()
419-
.expect("`Actor mesh receiver` is shutdown");
420-
let mut receiver = receiver.lock().await;
421-
let event = receiver.recv().await;
422-
Ok(match event {
423-
Ok(Some(event)) => PyActorSupervisionEvent::from(event.clone()),
424-
Ok(None) | Err(_) => PyActorSupervisionEvent {
425-
// Dummy actor as placeholder to indicate the whole mesh is stopped
426-
// TODO(albertli): remove this when pushing all supervision logic to rust.
427-
actor_id: id!(default[0].actor[0]).into(),
428-
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
429-
},
430-
})
431-
}
432-
}
433-
434-
// Values of this type can only be created by calling
435-
// `PythonActorMesh::supervise()`.
436-
#[pyclass(
437-
name = "SupervisedPortReceiver",
438-
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
439-
)]
440-
struct SupervisedPythonPortReceiver {
441-
inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
442-
monitor: ActorMeshMonitor,
443-
}
444-
445-
#[pymethods]
446-
impl SupervisedPythonPortReceiver {
447-
fn __repr__(&self) -> &'static str {
448-
"<SupervisedPortReceiver>"
449-
}
450-
451-
fn recv_task(&mut self) -> PyPythonTask {
452-
let receiver = self.inner.clone();
453-
let monitor = self.monitor.clone();
454-
PythonTask::new(async move {
455-
let mut receiver = receiver.lock().await;
456-
let result = tokio::select! {
457-
result = receiver.recv() => {
458-
result.map_err(|err| PyErr::new::<PyEOFError, _>(format!("port closed: {}", err)))
459-
}
460-
event = monitor.next() => {
461-
Python::with_gil(|_py| {
462-
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", event)))
463-
})
464-
}
465-
};
466-
result.and_then(|message: PythonMessage| Python::with_gil(|py| message.into_py_any(py)))
467-
}).into()
468-
}
469-
}
470-
471-
// Values of this type can only be created by calling
472-
// `PythonActorMesh::supervise()`.
473-
#[pyclass(
474-
name = "SupervisedOncePortReceiver",
475-
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
476-
)]
477-
struct SupervisedPythonOncePortReceiver {
478-
inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
479-
monitor: ActorMeshMonitor,
480-
}
481-
482-
#[pymethods]
483-
impl SupervisedPythonOncePortReceiver {
484-
fn __repr__(&self) -> &'static str {
485-
"<SupervisedOncePortReceiver>"
486-
}
487-
488-
fn recv_task(&mut self) -> PyResult<PyPythonTask> {
489-
let Some(receiver) = self.inner.lock().unwrap().take() else {
490-
return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
491-
};
492-
let monitor = self.monitor.clone();
493-
Ok(PythonTask::new(async move {
494-
let result = tokio::select! {
495-
result = receiver.recv() => {
496-
result.map_err(|err| PyErr::new::<PyEOFError, _>(format!("port closed: {}", err)))
497-
}
498-
event = monitor.next() => {
499-
Python::with_gil(|_py| {
500-
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", event)))
501-
})
502-
}
503-
};
504-
result.and_then(|message: PythonMessage| Python::with_gil(|py| message.into_py_any(py)))
505-
}).into())
506-
}
507-
}
508-
509404
#[pyclass(
510405
name = "ActorSupervisionEvent",
511406
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
@@ -543,8 +438,6 @@ impl From<ActorSupervisionEvent> for PyActorSupervisionEvent {
543438
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
544439
hyperactor_mod.add_class::<PythonActorMesh>()?;
545440
hyperactor_mod.add_class::<PythonActorMeshRef>()?;
546-
hyperactor_mod.add_class::<SupervisedPythonPortReceiver>()?;
547-
hyperactor_mod.add_class::<SupervisedPythonOncePortReceiver>()?;
548441
hyperactor_mod.add_class::<PyActorSupervisionEvent>()?;
549442
Ok(())
550443
}

monarch_hyperactor/src/pytokio.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,23 @@ impl PyPythonTask {
244244
});
245245
Ok(PyShared { rx })
246246
}
247+
248+
#[staticmethod]
249+
fn select_one(mut tasks: Vec<PyRefMut<'_, PyPythonTask>>) -> PyResult<PyPythonTask> {
250+
if tasks.is_empty() {
251+
return Err(PyValueError::new_err("Cannot select from empty task list"));
252+
}
253+
254+
let mut futures = Vec::new();
255+
for task_ref in tasks.iter_mut() {
256+
futures.push(task_ref.take_task()?);
257+
}
258+
259+
PyPythonTask::new(async move {
260+
let (result, index, _remaining) = futures::future::select_all(futures).await;
261+
result.map(|r| (r, index))
262+
})
263+
}
247264
}
248265

249266
#[pyclass(

python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66

77
# pyre-strict
88

9-
from typing import AsyncIterator, final
9+
from typing import AsyncIterator, final, NoReturn
1010

1111
from monarch._rust_bindings.monarch_hyperactor.actor import PythonMessage
1212
from monarch._rust_bindings.monarch_hyperactor.mailbox import (
1313
Mailbox,
1414
OncePortReceiver,
1515
PortReceiver,
16-
PortReceiverBase,
1716
)
1817
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId
18+
from monarch._rust_bindings.monarch_hyperactor.pytokio import PythonTask
1919
from monarch._rust_bindings.monarch_hyperactor.selection import Selection
2020
from monarch._rust_bindings.monarch_hyperactor.shape import Shape
2121
from typing_extensions import Self
@@ -97,23 +97,15 @@ class PythonActorMesh:
9797
"""
9898
...
9999

100-
def get(self, rank: int) -> ActorId | None:
100+
def supervision_event(self) -> PythonTask[Exception]:
101101
"""
102-
Get the actor id for the actor at the given rank.
102+
Completes with an exception when there is a supervision error.
103103
"""
104104
...
105105

106-
def supervise(
107-
self, r: PortReceiver | OncePortReceiver
108-
) -> SupervisedPortReceiver | SupervisedOncePortReceiver:
109-
"""Return a monitored port receiver.
110-
111-
A monitored port receiver behaves like a regular port receiver
112-
but also observes the health of the actor mesh associated with
113-
the sender. If the actor mesh becomes unhealthy, the receiver
114-
will yield a supervision error instead of waiting indefinitely
115-
for a message.
116-
106+
def get(self, rank: int) -> ActorId | None:
107+
"""
108+
Get the actor id for the actor at the given rank.
117109
"""
118110
...
119111

@@ -149,18 +141,6 @@ class PythonActorMesh:
149141
"""
150142
...
151143

152-
@final
153-
class SupervisedPortReceiver(PortReceiverBase):
154-
"""A monitored receiver to which PythonMessages are sent. Values
155-
of this type cannot be constructed directly in Python.
156-
"""
157-
158-
@final
159-
class SupervisedOncePortReceiver(PortReceiverBase):
160-
"""A monitored once receiver to which PythonMessages are sent.
161-
Values of this type cannot be constructed directly in Python.
162-
"""
163-
164144
@final
165145
class ActorSupervisionEvent:
166146
@property

python/monarch/_rust_bindings/monarch_hyperactor/pytokio.pyi

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,18 @@
77
# pyre-strict
88

99
import asyncio
10-
from typing import Any, Awaitable, Callable, Coroutine, Generator, Generic, TypeVar
10+
from typing import (
11+
Any,
12+
Awaitable,
13+
Callable,
14+
Coroutine,
15+
Generator,
16+
Generic,
17+
NoReturn,
18+
Sequence,
19+
Tuple,
20+
TypeVar,
21+
)
1122

1223
T = TypeVar("T")
1324

@@ -62,6 +73,15 @@ class PythonTask(Generic[T], Awaitable[T]):
6273
"""
6374
...
6475

76+
@staticmethod
77+
def select_one(
78+
tasks: "Sequence[PythonTask[T]]",
79+
) -> "PythonTask[Tuple[T, int]]":
80+
"""
81+
Run the tasks concurrently and return the first one to finish along with the index of which task it was.
82+
"""
83+
...
84+
6585
class Shared(Generic[T]):
6686
"""
6787
The result of a spawned PythonTask, which can be awaited on multiple times like Python Futures.

0 commit comments

Comments
 (0)