Skip to content

Commit caa40b8

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Compute resource aware simulation (#857)
Summary: The sim allocator will now register the location (region, dc, zone, rack, host, gpu) of every ProcId upon creation with the simnet. Differential Revision: D80137963
1 parent 4b269d1 commit caa40b8

File tree

3 files changed

+54
-9
lines changed

3 files changed

+54
-9
lines changed

hyperactor/src/simnet.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use dashmap::DashMap;
2727
use dashmap::DashSet;
2828
use enum_as_inner::EnumAsInner;
29+
use ndslice::view::Point;
2930
use serde::Deserialize;
3031
use serde::Deserializer;
3132
use serde::Serialize;
@@ -43,6 +44,7 @@ use tokio::time::interval;
4344
use crate::ActorId;
4445
use crate::Mailbox;
4546
use crate::OncePortRef;
47+
use crate::ProcId;
4648
use crate::channel::ChannelAddr;
4749
use crate::clock::Clock;
4850
use crate::clock::RealClock;
@@ -295,6 +297,7 @@ pub struct SimNetHandle {
295297
training_script_state_tx: tokio::sync::watch::Sender<TrainingScriptState>,
296298
/// Signal to stop the simnet loop
297299
stop_signal: Arc<AtomicBool>,
300+
resources: DashMap<ProcId, Point>,
298301
}
299302

300303
impl SimNetHandle {
@@ -405,6 +408,11 @@ impl SimNetHandle {
405408
"timeout waiting for received events to be scheduled".to_string(),
406409
))
407410
}
411+
412+
/// Register the location in resource space for a Proc
413+
pub fn register_proc(&self, proc_id: ProcId, point: Point) {
414+
self.resources.insert(proc_id, point);
415+
}
408416
}
409417

410418
pub(crate) type Topology = DashMap<SimNetEdge, SimNetEdgeInfo>;
@@ -478,6 +486,7 @@ pub fn start() {
478486
pending_event_count,
479487
training_script_state_tx,
480488
stop_signal,
489+
resources: DashMap::new(),
481490
});
482491
}
483492

hyperactor_mesh/src/alloc/sim.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#![allow(dead_code)] // until it is used outside of testing
1212

1313
use async_trait::async_trait;
14+
use hyperactor::ProcId;
1415
use hyperactor::WorldId;
1516
use hyperactor::channel::ChannelAddr;
1617
use hyperactor::channel::ChannelTransport;
@@ -61,12 +62,23 @@ pub struct SimAlloc {
6162

6263
impl SimAlloc {
6364
fn new(spec: AllocSpec) -> Self {
64-
Self {
65-
inner: LocalAlloc::new_with_transport(
66-
spec,
67-
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68-
),
69-
}
65+
let inner = LocalAlloc::new_with_transport(
66+
spec,
67+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68+
);
69+
let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
70+
71+
let ext = inner.extent();
72+
73+
hyperactor::simnet::simnet_handle()
74+
.expect("simnet event loop not running")
75+
.register_proc(
76+
client_proc_id.clone(),
77+
ext.point(ext.sizes().iter().map(|_| 0).collect())
78+
.expect("should be valid point"),
79+
);
80+
81+
Self { inner }
7082
}
7183
/// A chaos monkey that can be used to stop procs at random.
7284
pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
@@ -90,7 +102,13 @@ impl SimAlloc {
90102
#[async_trait]
91103
impl Alloc for SimAlloc {
92104
async fn next(&mut self) -> Option<ProcState> {
93-
self.inner.next().await
105+
let proc_state = self.inner.next().await;
106+
if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state {
107+
hyperactor::simnet::simnet_handle()
108+
.expect("simnet event loop not running")
109+
.register_proc(proc_id.clone(), point.clone());
110+
}
111+
proc_state
94112
}
95113

96114
fn extent(&self) -> &Extent {

python/monarch/_src/actor/proc_mesh.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,26 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
486486
return _proc_mesh_from_allocator(allocator=LocalAllocator(), gpus=gpus, hosts=hosts)
487487

488488

489-
def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
490-
return _proc_mesh_from_allocator(allocator=SimAllocator(), gpus=gpus, hosts=hosts)
489+
def sim_proc_mesh(
490+
*,
491+
gpus: int = 1,
492+
hosts: int = 1,
493+
racks: int = 1,
494+
zones: int = 1,
495+
dcs: int = 1,
496+
regions: int = 1,
497+
) -> ProcMesh:
498+
spec: AllocSpec = AllocSpec(
499+
AllocConstraints(),
500+
hosts=hosts,
501+
gpus=gpus,
502+
racks=racks,
503+
zones=zones,
504+
dcs=dcs,
505+
regions=regions,
506+
)
507+
alloc = SimAllocator().allocate(spec)
508+
return ProcMesh.from_alloc(alloc, None, True)
491509

492510

493511
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"

0 commit comments

Comments
 (0)