Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions monarch_extension/src/code_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use monarch_hyperactor::context::PyInstance;
use monarch_hyperactor::instance_dispatch;
use monarch_hyperactor::proc_mesh::PyProcMesh;
use monarch_hyperactor::runtime::signal_safe_block_on;
use monarch_hyperactor::v1::proc_mesh::PyProcMesh as PyProcMeshV1;
use pyo3::Bound;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
Expand Down Expand Up @@ -264,16 +266,35 @@ impl CodeSyncMeshClient {
impl CodeSyncMeshClient {
#[staticmethod]
#[pyo3(signature = (*, client, proc_mesh))]
fn spawn_blocking(py: Python, client: PyInstance, proc_mesh: &PyProcMesh) -> PyResult<Self> {
let proc_mesh = proc_mesh.try_inner()?;
signal_safe_block_on(py, async move {
let actor_mesh = instance_dispatch!(client, |cx| {
proc_mesh
.spawn(cx, "code_sync_manager", &CodeSyncManagerParams {})
.await?
});
Ok(Self { actor_mesh })
})?
fn spawn_blocking(
py: Python,
client: PyInstance,
proc_mesh: &Bound<'_, PyAny>,
) -> PyResult<Self> {
if let Ok(v0) = proc_mesh.downcast::<PyProcMesh>() {
let proc_mesh = v0.borrow().try_inner()?;
signal_safe_block_on(py, async move {
let actor_mesh = instance_dispatch!(client, |cx| {
proc_mesh
.spawn(cx, "code_sync_manager", &CodeSyncManagerParams {})
.await?
});
Ok(Self { actor_mesh })
})?
} else {
let proc_mesh = proc_mesh.downcast::<PyProcMeshV1>()?.borrow().mesh_ref()?;
signal_safe_block_on(py, async move {
let actor_mesh = instance_dispatch!(client, |cx| {
proc_mesh
.spawn_service(cx, "code_sync_manager", &CodeSyncManagerParams {})
.await
.map_err(|err| PyException::new_err(err.to_string()))?
});
Ok(Self {
actor_mesh: SharedCell::from(RootActorMesh::from(actor_mesh)),
})
})?
}
}

#[pyo3(signature = (*, local, remote, method = PyCodeSyncMethod::Rsync {}, auto_reload = false))]
Expand Down
7 changes: 5 additions & 2 deletions python/monarch/_rust_bindings/monarch_extension/code_sync.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
from pathlib import Path
from typing import Any, Dict, final

from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcMesh
from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcMesh as ProcMeshV0

from monarch._rust_bindings.monarch_hyperactor.shape import Shape
from monarch._rust_bindings.monarch_hyperactor.v1.proc_mesh import (
ProcMesh as ProcMeshV1,
)

class WorkspaceLocation:
"""
Expand Down Expand Up @@ -84,7 +87,7 @@ class CodeSyncMeshClient:
@staticmethod
def spawn_blocking(
client: Any,
proc_mesh: ProcMesh,
proc_mesh: ProcMeshV0 | ProcMeshV1,
) -> CodeSyncMeshClient: ...
async def sync_workspace(
self,
Expand Down
Loading