Skip to content

Commit 7a10f7a

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Add actor to sync conda environments (#860)
Summary: Pull Request resolved: #860 Adds support to coda sync to handle syncing conda environments, via: ``` await proc_mesh.sync_workspace(conda=True) ``` Reviewed By: LucasLLC, highker Differential Revision: D78458897 fbshipit-source-id: 173f9f986a40592872bdbff24223829a5374924f
1 parent 4193374 commit 7a10f7a

File tree

8 files changed

+444
-56
lines changed

8 files changed

+444
-56
lines changed

monarch_extension/src/code_sync.rs

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ use std::path::PathBuf;
1212

1313
use anyhow::Result;
1414
use futures::TryFutureExt;
15-
use futures::future::try_join_all;
1615
use hyperactor_mesh::Mesh;
1716
use hyperactor_mesh::RootActorMesh;
1817
use hyperactor_mesh::shared_cell::SharedCell;
1918
use monarch_hyperactor::code_sync::WorkspaceLocation;
2019
use monarch_hyperactor::code_sync::manager::CodeSyncManager;
2120
use monarch_hyperactor::code_sync::manager::CodeSyncManagerParams;
21+
use monarch_hyperactor::code_sync::manager::CodeSyncMethod;
2222
use monarch_hyperactor::code_sync::manager::WorkspaceConfig;
2323
use monarch_hyperactor::code_sync::manager::WorkspaceShape;
2424
use monarch_hyperactor::code_sync::manager::code_sync_mesh;
@@ -118,6 +118,44 @@ impl RemoteWorkspace {
118118
}
119119
}
120120

121+
#[pyclass(
122+
frozen,
123+
name = "CodeSyncMethod",
124+
module = "monarch._rust_bindings.monarch_extension.code_sync"
125+
)]
126+
#[derive(Clone, Debug, Serialize, Deserialize)]
127+
enum PyCodeSyncMethod {
128+
Rsync,
129+
CondaSync,
130+
}
131+
132+
impl From<PyCodeSyncMethod> for CodeSyncMethod {
133+
fn from(method: PyCodeSyncMethod) -> CodeSyncMethod {
134+
match method {
135+
PyCodeSyncMethod::Rsync => CodeSyncMethod::Rsync,
136+
PyCodeSyncMethod::CondaSync => CodeSyncMethod::CondaSync,
137+
}
138+
}
139+
}
140+
141+
#[pymethods]
142+
impl PyCodeSyncMethod {
143+
#[staticmethod]
144+
fn from_bytes(bytes: &Bound<'_, PyBytes>) -> PyResult<Self> {
145+
bincode::deserialize(bytes.as_bytes())
146+
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))
147+
}
148+
149+
fn __reduce__<'py>(
150+
slf: &Bound<'py, Self>,
151+
) -> PyResult<(Bound<'py, PyAny>, (Bound<'py, PyBytes>,))> {
152+
let bytes = bincode::serialize(&*slf.borrow())
153+
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
154+
let py_bytes = PyBytes::new(slf.py(), &bytes);
155+
Ok((slf.as_any().getattr("from_bytes")?, (py_bytes,)))
156+
}
157+
}
158+
121159
#[pyclass(
122160
frozen,
123161
name = "WorkspaceConfig",
@@ -127,14 +165,19 @@ impl RemoteWorkspace {
127165
struct PyWorkspaceConfig {
128166
local: PathBuf,
129167
remote: RemoteWorkspace,
168+
method: PyCodeSyncMethod,
130169
}
131170

132171
#[pymethods]
133172
impl PyWorkspaceConfig {
134173
#[new]
135-
#[pyo3(signature = (*, local, remote))]
136-
fn new(local: PathBuf, remote: RemoteWorkspace) -> Self {
137-
Self { local, remote }
174+
#[pyo3(signature = (*, local, remote, method = PyCodeSyncMethod::Rsync))]
175+
fn new(local: PathBuf, remote: RemoteWorkspace, method: PyCodeSyncMethod) -> Self {
176+
Self {
177+
local,
178+
remote,
179+
method,
180+
}
138181
}
139182
}
140183

@@ -152,6 +195,7 @@ impl CodeSyncMeshClient {
152195
actor_mesh: SharedCell<RootActorMesh<'static, CodeSyncManager>>,
153196
local: PathBuf,
154197
remote: RemoteWorkspace,
198+
method: CodeSyncMethod,
155199
auto_reload: bool,
156200
) -> Result<()> {
157201
let actor_mesh = actor_mesh.borrow()?;
@@ -164,7 +208,9 @@ impl CodeSyncMeshClient {
164208
location: remote.location.into(),
165209
shape,
166210
};
167-
code_sync_mesh(&actor_mesh, local, remote, auto_reload).await?;
211+
code_sync_mesh(&actor_mesh, local, remote, method, auto_reload)
212+
.await
213+
.map_err(|err| PyRuntimeError::new_err(format!("{:#?}", err)))?;
168214
Ok(())
169215
}
170216
}
@@ -183,12 +229,13 @@ impl CodeSyncMeshClient {
183229
})?
184230
}
185231

186-
#[pyo3(signature = (*, local, remote, auto_reload = false))]
232+
#[pyo3(signature = (*, local, remote, method = PyCodeSyncMethod::Rsync, auto_reload = false))]
187233
fn sync_workspace<'py>(
188234
&self,
189235
py: Python<'py>,
190236
local: PathBuf,
191237
remote: RemoteWorkspace,
238+
method: PyCodeSyncMethod,
192239
auto_reload: bool,
193240
) -> PyResult<Bound<'py, PyAny>> {
194241
monarch_hyperactor::runtime::future_into_py(
@@ -197,6 +244,7 @@ impl CodeSyncMeshClient {
197244
self.actor_mesh.clone(),
198245
local,
199246
remote,
247+
method.into(),
200248
auto_reload,
201249
)
202250
.err_into(),
@@ -213,21 +261,27 @@ impl CodeSyncMeshClient {
213261
let actor_mesh = self.actor_mesh.clone();
214262
monarch_hyperactor::runtime::future_into_py(
215263
py,
216-
try_join_all(workspaces.into_iter().map(|workspace| {
217-
CodeSyncMeshClient::sync_workspace_(
218-
actor_mesh.clone(),
219-
workspace.local,
220-
workspace.remote,
221-
auto_reload,
222-
)
223-
}))
264+
async move {
265+
for workspace in workspaces.into_iter() {
266+
CodeSyncMeshClient::sync_workspace_(
267+
actor_mesh.clone(),
268+
workspace.local,
269+
workspace.remote,
270+
workspace.method.into(),
271+
auto_reload,
272+
)
273+
.await?
274+
}
275+
anyhow::Ok(())
276+
}
224277
.err_into(),
225278
)
226279
}
227280
}
228281

229282
pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
230283
module.add_class::<CodeSyncMeshClient>()?;
284+
module.add_class::<PyCodeSyncMethod>()?;
231285
module.add_class::<PyWorkspaceConfig>()?;
232286
module.add_class::<PyWorkspaceLocation>()?;
233287
module.add_class::<PyWorkspaceShape>()?;

monarch_hyperactor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
2525
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
2626
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
2727
inventory = "0.3.8"
28+
lazy_errors = "0.10.1"
2829
lazy_static = "1.5"
2930
libc = "0.2.139"
31+
monarch_conda = { version = "0.0.0", path = "../monarch_conda" }
3032
monarch_types = { version = "0.0.0", path = "../monarch_types" }
3133
ndslice = { version = "0.0.0", path = "../ndslice" }
3234
nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }

monarch_hyperactor/src/code_sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
pub mod auto_reload;
10+
pub mod conda_sync;
1011
pub mod manager;
1112
pub mod rsync;
1213
mod workspace;
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::collections::HashMap;
10+
use std::path::PathBuf;
11+
12+
use anyhow::Result;
13+
use async_trait::async_trait;
14+
use futures::FutureExt;
15+
use futures::StreamExt;
16+
use futures::TryStreamExt;
17+
use hyperactor::Actor;
18+
use hyperactor::Bind;
19+
use hyperactor::Handler;
20+
use hyperactor::Named;
21+
use hyperactor::PortRef;
22+
use hyperactor::Unbind;
23+
use hyperactor_mesh::actor_mesh::ActorMesh;
24+
use hyperactor_mesh::connect::Connect;
25+
use hyperactor_mesh::connect::accept;
26+
use hyperactor_mesh::sel;
27+
use lazy_errors::ErrorStash;
28+
use lazy_errors::OrStash;
29+
use lazy_errors::StashedResult;
30+
use lazy_errors::TryCollectOrStash;
31+
use monarch_conda::sync::Action;
32+
use monarch_conda::sync::receiver;
33+
use monarch_conda::sync::sender;
34+
use ndslice::Selection;
35+
use serde::Deserialize;
36+
use serde::Serialize;
37+
use tokio::io::AsyncReadExt;
38+
use tokio::io::AsyncWriteExt;
39+
40+
use crate::code_sync::WorkspaceLocation;
41+
42+
/// Represents the result of an conda sync operation with details about what was transferred
43+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
44+
pub struct CondaSyncResult {
45+
/// All changes that occurred during the sync operation
46+
pub changes: HashMap<PathBuf, Action>,
47+
}
48+
49+
#[derive(Debug, Clone, Named, Serialize, Deserialize, Bind, Unbind)]
50+
pub struct CondaSyncMessage {
51+
/// The connect message to create a duplex bytestream with the client.
52+
pub connect: PortRef<Connect>,
53+
/// A port to send back the result or any errors.
54+
pub result: PortRef<Result<CondaSyncResult, String>>,
55+
/// The location of the workspace to sync.
56+
pub workspace: WorkspaceLocation,
57+
}
58+
59+
#[derive(Debug, Named, Serialize, Deserialize)]
60+
pub struct CondaSyncParams {}
61+
62+
#[derive(Debug)]
63+
#[hyperactor::export(spawn = true, handlers = [CondaSyncMessage { cast = true }])]
64+
pub struct CondaSyncActor {}
65+
66+
#[async_trait]
67+
impl Actor for CondaSyncActor {
68+
type Params = CondaSyncParams;
69+
70+
async fn new(CondaSyncParams {}: Self::Params) -> Result<Self> {
71+
Ok(Self {})
72+
}
73+
}
74+
75+
#[async_trait]
76+
impl Handler<CondaSyncMessage> for CondaSyncActor {
77+
async fn handle(
78+
&mut self,
79+
cx: &hyperactor::Context<Self>,
80+
CondaSyncMessage {
81+
workspace,
82+
connect,
83+
result,
84+
}: CondaSyncMessage,
85+
) -> Result<(), anyhow::Error> {
86+
let res = async {
87+
let workspace = workspace.resolve()?;
88+
let (connect_msg, completer) = Connect::allocate(cx.self_id().clone(), cx);
89+
connect.send(cx, connect_msg)?;
90+
let (mut read, mut write) = completer.complete().await?.into_split();
91+
let changes_result = receiver(&workspace, &mut read, &mut write).await;
92+
93+
// Shutdown our end, then read from the other end till exhaustion to avoid undeliverable
94+
// message spam.
95+
write.shutdown().await?;
96+
let mut buf = vec![];
97+
read.read_to_end(&mut buf).await?;
98+
99+
anyhow::Ok(CondaSyncResult {
100+
changes: changes_result?,
101+
})
102+
}
103+
.await;
104+
result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
105+
Ok(())
106+
}
107+
}
108+
109+
pub async fn conda_sync_mesh<M>(
110+
actor_mesh: &M,
111+
local_workspace: PathBuf,
112+
remote_workspace: WorkspaceLocation,
113+
) -> Result<Vec<CondaSyncResult>>
114+
where
115+
M: ActorMesh<Actor = CondaSyncActor>,
116+
{
117+
let mailbox = actor_mesh.proc_mesh().client();
118+
let (conns_tx, conns_rx) = mailbox.open_port::<Connect>();
119+
120+
let (res1, res2) = futures::future::join(
121+
conns_rx
122+
.take(actor_mesh.shape().slice().len())
123+
.err_into::<anyhow::Error>()
124+
.try_for_each_concurrent(None, |connect| async {
125+
let (mut read, mut write) = accept(mailbox, mailbox.actor_id().clone(), connect)
126+
.await?
127+
.into_split();
128+
let res = sender(&local_workspace, &mut read, &mut write).await;
129+
130+
// Shutdown our end, then read from the other end till exhaustion to avoid undeliverable
131+
// message spam.
132+
write.shutdown().await?;
133+
let mut buf = vec![];
134+
read.read_to_end(&mut buf).await?;
135+
136+
res
137+
})
138+
.boxed(),
139+
async move {
140+
let (result_tx, result_rx) = mailbox.open_port::<Result<CondaSyncResult, String>>();
141+
actor_mesh.cast(
142+
mailbox,
143+
sel!(*),
144+
CondaSyncMessage {
145+
connect: conns_tx.bind(),
146+
result: result_tx.bind(),
147+
workspace: remote_workspace,
148+
},
149+
)?;
150+
151+
// Wait for all actors to report result.
152+
let results = result_rx
153+
.take(actor_mesh.shape().slice().len())
154+
.try_collect::<Vec<_>>()
155+
.await?;
156+
157+
// Combine all errors into one.
158+
let mut errs = ErrorStash::<_, _, anyhow::Error>::new(|| "remote failures");
159+
match results
160+
.into_iter()
161+
.map(|res| res.map_err(anyhow::Error::msg))
162+
.try_collect_or_stash::<Vec<_>>(&mut errs)
163+
{
164+
StashedResult::Ok(results) => anyhow::Ok(results),
165+
StashedResult::Err(_) => Err(errs.into_result().unwrap_err().into()),
166+
}
167+
},
168+
)
169+
.await;
170+
171+
// Combine code sync handler and cast errors into one.
172+
let mut errs = ErrorStash::<_, _, anyhow::Error>::new(|| "code sync failed");
173+
res1.or_stash(&mut errs);
174+
if let StashedResult::Ok(results) = res2.or_stash(&mut errs) {
175+
errs.into_result()?;
176+
return Ok(results);
177+
}
178+
Err(errs.into_result().unwrap_err().into())
179+
}

0 commit comments

Comments
 (0)