@@ -18,6 +18,7 @@ use monarch_hyperactor::instance_dispatch;
18
18
use monarch_hyperactor:: proc_mesh:: PyProcMesh ;
19
19
use monarch_hyperactor:: pytokio:: PyPythonTask ;
20
20
use monarch_hyperactor:: runtime:: signal_safe_block_on;
21
+ use monarch_hyperactor:: v1:: proc_mesh:: PyProcMesh as PyProcMeshV1 ;
21
22
use monarch_rdma:: RdmaBuffer ;
22
23
use monarch_rdma:: RdmaManagerActor ;
23
24
use monarch_rdma:: RdmaManagerMessageClient ;
@@ -289,31 +290,52 @@ impl PyRdmaManager {
289
290
#[ classmethod]
290
291
fn create_rdma_manager_nonblocking (
291
292
_cls : & Bound < ' _ , PyType > ,
292
- proc_mesh : & PyProcMesh ,
293
+ proc_mesh : & Bound < ' _ , PyAny > ,
293
294
client : PyInstance ,
294
295
) -> PyResult < PyPythonTask > {
295
296
tracing:: debug!( "spawning RDMA manager on target proc_mesh nodes" ) ;
296
297
297
- let tracked_proc_mesh = proc_mesh. try_inner ( ) ?;
298
+ if let Ok ( v0) = proc_mesh. downcast :: < PyProcMesh > ( ) {
299
+ let tracked_proc_mesh = v0. borrow ( ) . try_inner ( ) ?;
300
+ PyPythonTask :: new ( async move {
301
+ // Spawns the `RdmaManagerActor` on the target proc_mesh.
302
+ // This allows the `RdmaController` to run on any node while real RDMA operations occur on appropriate hardware.
303
+ let actor_mesh = instance_dispatch ! ( client, |cx| {
304
+ tracked_proc_mesh
305
+ // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
306
+ // TODO - make IbverbsConfig configurable
307
+ . spawn:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
308
+ . await
309
+ . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
310
+ } ) ;
298
311
299
- PyPythonTask :: new ( async move {
300
- // Spawns the `RdmaManagerActor` on the target proc_mesh.
301
- // This allows the `RdmaController` to run on any node while real RDMA operations occur on appropriate hardware.
302
- let actor_mesh = instance_dispatch ! ( client, |cx| {
303
- tracked_proc_mesh
304
- // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
305
- // TODO - make IbverbsConfig configurable
306
- . spawn:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
307
- . await
308
- . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
309
- } ) ;
312
+ // Use placeholder device name since actual device is determined on remote node
313
+ Ok ( Some ( PyRdmaManager {
314
+ inner : actor_mesh,
315
+ device : "remote_rdma_device" . to_string ( ) ,
316
+ } ) )
317
+ } )
318
+ } else {
319
+ let proc_mesh = proc_mesh. downcast :: < PyProcMeshV1 > ( ) ?. borrow ( ) . mesh_ref ( ) ?;
320
+ PyPythonTask :: new ( async move {
321
+ let actor_mesh = instance_dispatch ! ( client, |cx| {
322
+ proc_mesh
323
+ // Pass None to use default config - RdmaManagerActor will use default IbverbsConfig
324
+ // TODO - make IbverbsConfig configurable
325
+ . spawn:: <RdmaManagerActor >( cx, "rdma_manager" , & None )
326
+ . await
327
+ . map_err( |err| PyException :: new_err( err. to_string( ) ) ) ?
328
+ } ) ;
310
329
311
- // Use placeholder device name since actual device is determined on remote node
312
- Ok ( Some ( PyRdmaManager {
313
- inner : actor_mesh,
314
- device : "remote_rdma_device" . to_string ( ) ,
315
- } ) )
316
- } )
330
+ let actor_mesh = RootActorMesh :: from ( actor_mesh) ;
331
+ let actor_mesh = SharedCell :: from ( actor_mesh) ;
332
+
333
+ Ok ( Some ( PyRdmaManager {
334
+ inner : actor_mesh,
335
+ device : "remote_rdma_device" . to_string ( ) ,
336
+ } ) )
337
+ } )
338
+ }
317
339
}
318
340
}
319
341
0 commit comments