Skip to content

Commit c2c7854

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Compute resource aware simulation (#857)
Summary: The sim allocator will now register the location (region, dc, zone, rack, host, gpu) of every ProcId upon creation with the simnet. Differential Revision: D80137963
1 parent 446770f commit c2c7854

File tree

3 files changed

+54
-9
lines changed

3 files changed

+54
-9
lines changed

hyperactor/src/simnet.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use dashmap::DashMap;
2727
use dashmap::DashSet;
2828
use enum_as_inner::EnumAsInner;
29+
use ndslice::view::Point;
2930
use serde::Deserialize;
3031
use serde::Deserializer;
3132
use serde::Serialize;
@@ -43,6 +44,7 @@ use tokio::time::interval;
4344
use crate::ActorId;
4445
use crate::Mailbox;
4546
use crate::OncePortRef;
47+
use crate::ProcId;
4648
use crate::channel::ChannelAddr;
4749
use crate::clock::Clock;
4850
use crate::clock::RealClock;
@@ -295,6 +297,7 @@ pub struct SimNetHandle {
295297
training_script_state_tx: tokio::sync::watch::Sender<TrainingScriptState>,
296298
/// Signal to stop the simnet loop
297299
stop_signal: Arc<AtomicBool>,
300+
resources: DashMap<ProcId, Point>,
298301
}
299302

300303
impl SimNetHandle {
@@ -405,6 +408,11 @@ impl SimNetHandle {
405408
"timeout waiting for received events to be scheduled".to_string(),
406409
))
407410
}
411+
412+
/// Register the location in resource space for a Proc
413+
pub fn register_proc(&self, proc_id: ProcId, point: Point) {
414+
self.resources.insert(proc_id, point);
415+
}
408416
}
409417

410418
pub(crate) type Topology = DashMap<SimNetEdge, SimNetEdgeInfo>;
@@ -478,6 +486,7 @@ pub fn start() {
478486
pending_event_count,
479487
training_script_state_tx,
480488
stop_signal,
489+
resources: DashMap::new(),
481490
});
482491
}
483492

hyperactor_mesh/src/alloc/sim.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#![allow(dead_code)] // until it is used outside of testing
1212

1313
use async_trait::async_trait;
14+
use hyperactor::ProcId;
1415
use hyperactor::WorldId;
1516
use hyperactor::channel::ChannelAddr;
1617
use hyperactor::channel::ChannelTransport;
@@ -61,12 +62,23 @@ pub struct SimAlloc {
6162

6263
impl SimAlloc {
6364
fn new(spec: AllocSpec) -> Self {
64-
Self {
65-
inner: LocalAlloc::new_with_transport(
66-
spec,
67-
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68-
),
69-
}
65+
let inner = LocalAlloc::new_with_transport(
66+
spec,
67+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68+
);
69+
let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
70+
71+
let ext = inner.extent();
72+
73+
hyperactor::simnet::simnet_handle()
74+
.expect("simnet event loop not running")
75+
.register_proc(
76+
client_proc_id.clone(),
77+
ext.point(ext.sizes().iter().map(|_| 0).collect())
78+
.expect("should be valid point"),
79+
);
80+
81+
Self { inner }
7082
}
7183
/// A chaos monkey that can be used to stop procs at random.
7284
pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
@@ -90,7 +102,13 @@ impl SimAlloc {
90102
#[async_trait]
91103
impl Alloc for SimAlloc {
92104
async fn next(&mut self) -> Option<ProcState> {
93-
self.inner.next().await
105+
let proc_state = self.inner.next().await;
106+
if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state {
107+
hyperactor::simnet::simnet_handle()
108+
.expect("simnet event loop not running")
109+
.register_proc(proc_id.clone(), point.clone());
110+
}
111+
proc_state
94112
}
95113

96114
fn extent(&self) -> &Extent {

python/monarch/_src/actor/proc_mesh.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,8 +454,26 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
454454
return _proc_mesh_from_allocator(allocator=LocalAllocator(), gpus=gpus, hosts=hosts)
455455

456456

457-
def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
458-
return _proc_mesh_from_allocator(allocator=SimAllocator(), gpus=gpus, hosts=hosts)
457+
def sim_proc_mesh(
458+
*,
459+
gpus: int = 1,
460+
hosts: int = 1,
461+
racks: int = 1,
462+
zones: int = 1,
463+
dcs: int = 1,
464+
regions: int = 1,
465+
) -> ProcMesh:
466+
spec: AllocSpec = AllocSpec(
467+
AllocConstraints(),
468+
hosts=hosts,
469+
gpus=gpus,
470+
racks=racks,
471+
zones=zones,
472+
dcs=dcs,
473+
regions=regions,
474+
)
475+
alloc = SimAllocator().allocate(spec)
476+
return ProcMesh.from_alloc(alloc, None, True)
459477

460478

461479
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"

0 commit comments

Comments
 (0)