@@ -18,6 +18,7 @@ use monarch_hyperactor::instance_dispatch;
18
18
use monarch_hyperactor:: proc_mesh:: PyProcMesh ;
19
19
use monarch_hyperactor:: pytokio:: PyPythonTask ;
20
20
use monarch_hyperactor:: runtime:: signal_safe_block_on;
21
+ use monarch_hyperactor:: v1:: proc_mesh:: PyProcMesh as PyProcMeshV1 ;
21
22
use monarch_rdma:: RdmaBuffer ;
22
23
use monarch_rdma:: RdmaManagerActor ;
23
24
use monarch_rdma:: RdmaManagerMessageClient ;
@@ -36,6 +37,7 @@ fn setup_rdma_context(
36
37
local_proc_id : String ,
37
38
) -> ( ActorRef < RdmaManagerActor > , RdmaBuffer ) {
38
39
let proc_id: ProcId = local_proc_id. parse ( ) . unwrap ( ) ;
40
+ // TODO: find some better way to look this up, or else formally define "service names"
39
41
let local_owner_id = ActorId ( proc_id, "rdma_manager" . to_string ( ) , 0 ) ;
40
42
let local_owner_ref: ActorRef < RdmaManagerActor > = ActorRef :: attest ( local_owner_id) ;
41
43
let buffer = rdma_buffer. buffer . clone ( ) ;
@@ -56,6 +58,7 @@ async fn create_rdma_buffer(
56
58
client : PyInstance ,
57
59
) -> PyResult < PyRdmaBuffer > {
58
60
// Get the owning RdmaManagerActor's ActorRef
61
+ // TODO: find some better way to look this up, or else formally define "service names"
59
62
let owner_id = ActorId ( proc_id, "rdma_manager" . to_string ( ) , 0 ) ;
60
63
let owner_ref: ActorRef < RdmaManagerActor > = ActorRef :: attest ( owner_id) ;
61
64
@@ -289,31 +292,54 @@ impl PyRdmaManager {
289
292
#[ classmethod]
290
293
fn create_rdma_manager_nonblocking (
291
294
_cls : & Bound < ' _ , PyType > ,
292
- proc_mesh : & PyProcMesh ,
295
+ proc_mesh : & Bound < ' _ , PyAny > ,
293
296
client : PyInstance ,
294
297
) -> PyResult < PyPythonTask > {
295
298
tracing:: debug!( "spawning RDMA manager on target proc_mesh nodes" ) ;
296
299
297
- let tracked_proc_mesh = proc_mesh. try_inner ( ) ?;
300
+ if let Ok ( v0) = proc_mesh. downcast :: < PyProcMesh > ( ) {
301
+ let tracked_proc_mesh = v0. borrow ( ) . try_inner ( ) ?;
302
+ PyPythonTask :: new ( async move {
303
+ // Spawns the `RdmaManagerActor` on the target proc_mesh.
304
+ // This allows the `RdmaController` to run on any node while real RDMA operations occur on appropriate hardware.
305
+ let actor_mesh = instance_dispatch ! ( client, |cx| {
306
+ tracked_proc_mesh
307
+ // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
308
+ // TODO - make IbverbsConfig configurable
309
+ . spawn:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
310
+ . await
311
+ . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
312
+ } ) ;
298
313
299
- PyPythonTask :: new ( async move {
300
- // Spawns the `RdmaManagerActor` on the target proc_mesh.
301
- // This allows the `RdmaController` to run on any node while real RDMA operations occur on appropriate hardware.
302
- let actor_mesh = instance_dispatch ! ( client, |cx| {
303
- tracked_proc_mesh
304
- // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
305
- // TODO - make IbverbsConfig configurable
306
- . spawn:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
307
- . await
308
- . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
309
- } ) ;
314
+ // Use placeholder device name since actual device is determined on remote node
315
+ Ok ( Some ( PyRdmaManager {
316
+ inner : actor_mesh,
317
+ device : "remote_rdma_device" . to_string ( ) ,
318
+ } ) )
319
+ } )
320
+ } else {
321
+ let proc_mesh = proc_mesh. downcast :: < PyProcMeshV1 > ( ) ?. borrow ( ) . mesh_ref ( ) ?;
322
+ PyPythonTask :: new ( async move {
323
+ let actor_mesh = instance_dispatch ! ( client, |cx| {
324
+ proc_mesh
325
+ // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
326
+ // TODO - make IbverbsConfig configurable
327
+ . spawn_service:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
328
+ . await
329
+ . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
330
+ } ) ;
310
331
311
- // Use placeholder device name since actual device is determined on remote node
312
- Ok ( Some ( PyRdmaManager {
313
- inner : actor_mesh,
314
- device : "remote_rdma_device" . to_string ( ) ,
315
- } ) )
316
- } )
332
+ eprintln ! ( "spawned rdma_manager: {:?}" , actor_mesh) ;
333
+
334
+ let actor_mesh = RootActorMesh :: from ( actor_mesh) ;
335
+ let actor_mesh = SharedCell :: from ( actor_mesh) ;
336
+
337
+ Ok ( Some ( PyRdmaManager {
338
+ inner : actor_mesh,
339
+ device : "remote_rdma_device" . to_string ( ) ,
340
+ } ) )
341
+ } )
342
+ }
317
343
}
318
344
}
319
345
0 commit comments