Skip to content

Commit 7fd1028

Browse files
dstaay-fbfacebook-github-bot
authored andcommitted
Move Parameter Server Example (meta-pytorch#966)
Summary: Pull Request resolved: meta-pytorch#966 Move PS example inside its own directory, and add swap out println! with properly logging. Reviewed By: allenwang28 Differential Revision: D80756352 fbshipit-source-id: c8ffc1444fde688d9bc76eddafb304bbab02680f
1 parent 4525030 commit 7fd1028

File tree

4 files changed

+30
-27
lines changed

4 files changed

+30
-27
lines changed

monarch_rdma/examples/Cargo.toml renamed to monarch_rdma/examples/parameter_server/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/monarch_rdma/examples:[parameter_server,parameter_server_bootstrap,parameter_server_example]
1+
# @generated by autocargo from //monarch/monarch_rdma/examples/parameter_server:[parameter_server,parameter_server_bootstrap,parameter_server_example]
22

33
[package]
44
name = "parameter_server"
@@ -24,14 +24,14 @@ test = false
2424
anyhow = "1.0.98"
2525
async-trait = "0.1.86"
2626
buck-resources = "1"
27-
hyperactor = { version = "0.0.0", path = "../../hyperactor" }
28-
hyperactor_mesh = { version = "0.0.0", path = "../../hyperactor_mesh" }
29-
monarch_rdma = { version = "0.0.0", path = ".." }
30-
ndslice = { version = "0.0.0", path = "../../ndslice" }
27+
hyperactor = { version = "0.0.0", path = "../../../hyperactor" }
28+
hyperactor_mesh = { version = "0.0.0", path = "../../../hyperactor_mesh" }
29+
monarch_rdma = { version = "0.0.0", path = "../.." }
30+
ndslice = { version = "0.0.0", path = "../../../ndslice" }
3131
serde = { version = "1.0.219", features = ["derive", "rc"] }
3232
tokio = { version = "1.46.1", features = ["full", "test-util", "tracing"] }
3333
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
3434
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
3535

3636
[dev-dependencies]
37-
timed_test = { version = "0.0.0", path = "../../timed_test" }
37+
timed_test = { version = "0.0.0", path = "../../../timed_test" }

monarch_rdma/examples/parameter_server.rs renamed to monarch_rdma/examples/parameter_server/parameter_server.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl Actor for ParameterServerActor {
112112

113113
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
114114
let (owner_ref, worker_world_size) = _params;
115-
println!("creating parameter server actor");
115+
tracing::info!("creating parameter server actor");
116116
let weights_data = vec![0u8; BUFFER_SIZE].into_boxed_slice();
117117
let grad_buffer_data =
118118
vec![vec![0u8; BUFFER_SIZE].into_boxed_slice(); worker_world_size].into_boxed_slice();
@@ -203,7 +203,7 @@ impl Handler<PsUpdate> for ParameterServerActor {
203203
}
204204
grad.fill(0);
205205
}
206-
println!("[parameter server actor] updated");
206+
tracing::info!("[parameter server actor] updated");
207207
reply.send(cx, true)?;
208208
Ok(())
209209
}
@@ -213,9 +213,10 @@ impl Handler<PsUpdate> for ParameterServerActor {
213213
impl Handler<Log> for ParameterServerActor {
214214
/// Logs the server's weights and gradient buffer
215215
async fn handle(&mut self, _this_: &Context<Self>, _msg_: Log) -> Result<(), anyhow::Error> {
216-
println!(
216+
tracing::info!(
217217
"[parameter server actor] weights: {:?}, grad_buffer: {:?}",
218-
self.weights_data, self.grad_buffer_data,
218+
self.weights_data,
219+
self.grad_buffer_data,
219220
);
220221
Ok(())
221222
}
@@ -305,7 +306,7 @@ impl Handler<WorkerInit> for WorkerActor {
305306
) -> Result<(), anyhow::Error> {
306307
let (rank, _) = cx.cast_info();
307308

308-
println!("[worker_actor_{}] initializing", rank);
309+
tracing::info!("[worker_actor_{}] initializing", rank);
309310

310311
let client = cx.mailbox_for_py();
311312
let (handle, receiver) = client.open_once_port::<(RdmaBuffer, RdmaBuffer)>();
@@ -345,9 +346,10 @@ impl Handler<WorkerStep> for WorkerActor {
345346
{
346347
*grad_value = grad_value.wrapping_add(*weight + 1);
347348
}
348-
println!(
349+
tracing::info!(
349350
"[worker_actor_{}] pushing gradients {:?}",
350-
rank, self.local_gradients
351+
rank,
352+
self.local_gradients
351353
);
352354

353355
let owner_ref = self
@@ -387,9 +389,10 @@ impl Handler<WorkerUpdate> for WorkerActor {
387389
) -> Result<(), anyhow::Error> {
388390
let (rank, _) = cx.cast_info();
389391

390-
println!(
392+
tracing::info!(
391393
"[worker_actor_{}] pulling new weights from parameter server (before: {:?})",
392-
rank, self.weights_data,
394+
rank,
395+
self.weights_data,
393396
);
394397
let /*mut*/ lbuffer = self
395398
.rdma_manager
@@ -419,7 +422,7 @@ impl Handler<Log> for WorkerActor {
419422
/// Logs the worker's weights
420423
async fn handle(&mut self, cx: &Context<Self>, _: Log) -> Result<(), anyhow::Error> {
421424
let (rank, _) = cx.cast_info();
422-
println!("[worker_actor_{}] weights: {:?}", rank, self.weights_data);
425+
tracing::info!("[worker_actor_{}] weights: {:?}", rank, self.weights_data);
423426
Ok(())
424427
}
425428
}
@@ -456,7 +459,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
456459
};
457460
} else {
458461
// For other configurations, use default settings (parameter server + workers all use the same ibv device)
459-
println!(
462+
tracing::info!(
460463
"using default IbverbsConfig as {} devices were found (expected > 4 for H100)",
461464
devices.len()
462465
);
@@ -465,10 +468,10 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
465468
}
466469

467470
// As normal, create a proc mesh for the parameter server.
468-
println!("creating parameter server proc mesh...");
471+
tracing::info!("creating parameter server proc mesh...");
469472

470473
let mut alloc = ProcessAllocator::new(Command::new(
471-
buck_resources::get("monarch/monarch_rdma/examples/bootstrap").unwrap(),
474+
buck_resources::get("monarch/monarch_rdma/examples/parameter_server/bootstrap").unwrap(),
472475
));
473476

474477
let ps_proc_mesh = ProcMesh::allocate(
@@ -481,7 +484,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
481484
)
482485
.await?;
483486

484-
println!(
487+
tracing::info!(
485488
"creating parameter server's RDMA manager with config: {}",
486489
ps_ibv_config
487490
);
@@ -496,7 +499,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
496499
.unwrap();
497500

498501
// Create a proc mesh for workers, where each worker is assigned to its own GPU.
499-
println!("creating worker proc mesh ({} workers)...", num_workers);
502+
tracing::info!("creating worker proc mesh ({} workers)...", num_workers);
500503
let worker_proc_mesh = ProcMesh::allocate(
501504
alloc
502505
.allocate(AllocSpec {
@@ -507,7 +510,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
507510
)
508511
.await?;
509512

510-
println!(
513+
tracing::info!(
511514
"creating worker's RDMA manager with config: {}",
512515
worker_ibv_config
513516
);
@@ -517,7 +520,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
517520
.await
518521
.unwrap();
519522

520-
println!("spawning parameter server");
523+
tracing::info!("spawning parameter server");
521524
let ps_actor_mesh: RootActorMesh<'_, ParameterServerActor> = ps_proc_mesh
522525
.spawn(
523526
"parameter_server",
@@ -529,7 +532,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
529532
// The parameter server is a single actor, we can just grab it and call it directly.
530533
let ps_actor = ps_actor_mesh.iter().next().unwrap();
531534

532-
println!("spawning worker actors");
535+
tracing::info!("spawning worker actors");
533536
let worker_actor_mesh: RootActorMesh<'_, WorkerActor> =
534537
worker_proc_mesh.spawn("worker_actors", &()).await.unwrap();
535538

@@ -539,7 +542,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
539542
// We intentionally decouple spawning with initialization, which is fairly common in Ray workloads
540543
// In this case, we use it for dual purpose - be able to use the cast APIs to assign rank (Monarch specific) and
541544
// to get access to return values for error messaging (applies to both Monarch and Ray)
542-
println!("initializing worker actor mesh");
545+
tracing::info!("initializing worker actor mesh");
543546
worker_actor_mesh
544547
.cast(
545548
worker_proc_mesh.client(),
@@ -549,9 +552,9 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err
549552
)
550553
.unwrap();
551554

552-
println!("starting training loop");
555+
tracing::info!("starting training loop");
553556
for step in 0..num_steps {
554-
println!("===== starting step {} =====", step);
557+
tracing::info!("===== starting step {} =====", step);
555558
worker_actor_mesh
556559
.cast(
557560
worker_proc_mesh.client(),

0 commit comments

Comments
 (0)