Skip to content

Distance based latency #858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "
opentelemetry = "0.29"
paste = "1.0.14"
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
regex = "1.11.1"
rustls-pemfile = "1.0.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
Expand Down
238 changes: 138 additions & 100 deletions hyperactor/src/channel/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
// SimRx contains a way to receive messages.

//! Local simulated channel implementation.
use std::any::Any;
// send leads to add to network.
use std::marker::PhantomData;
use std::sync::Arc;

use dashmap::DashMap;
use regex::Regex;
use tokio::sync::Mutex;

use super::*;
use crate::channel;
Expand All @@ -25,12 +25,9 @@ use crate::clock::RealClock;
use crate::clock::SimClock;
use crate::data::Serialized;
use crate::mailbox::MessageEnvelope;
use crate::simnet;
use crate::simnet::Dispatcher;
use crate::simnet::Event;
use crate::simnet::ScheduledEvent;
use crate::simnet::SimNetConfig;
use crate::simnet::SimNetEdge;
use crate::simnet::SimNetError;
use crate::simnet::simnet_handle;

Expand Down Expand Up @@ -126,65 +123,38 @@ impl fmt::Display for SimAddr {
/// Message Event that can be passed around in the simnet.
#[derive(Debug)]
pub(crate) struct MessageDeliveryEvent {
src_addr: Option<ChannelAddr>,
dest_addr: ChannelAddr,
data: Serialized,
duration_ms: u64,
latency: u64,
}

impl MessageDeliveryEvent {
/// Creates a new MessageDeliveryEvent.
pub fn new(src_addr: Option<ChannelAddr>, dest_addr: ChannelAddr, data: Serialized) -> Self {
pub fn new(dest_addr: ChannelAddr, data: Serialized, latency: u64) -> Self {
Self {
src_addr,
dest_addr,
data,
duration_ms: 100,
latency,
}
}
}

#[async_trait]
impl Event for MessageDeliveryEvent {
async fn handle(&self) -> Result<(), SimNetError> {
async fn handle(&mut self) -> Result<(), SimNetError> {
// Send the message to the correct receiver.
SENDER
.send(
self.src_addr.clone(),
self.dest_addr.clone(),
self.data.clone(),
)
.send(self.dest_addr.clone(), self.data.clone())
.await?;
Ok(())
}

fn duration_ms(&self) -> u64 {
self.duration_ms
self.latency
}

fn summary(&self) -> String {
format!(
"Sending message from {} to {}",
self.src_addr
.as_ref()
.map_or("unknown".to_string(), |addr| addr.to_string()),
self.dest_addr.clone()
)
}

async fn read_simnet_config(&mut self, topology: &Arc<Mutex<SimNetConfig>>) {
if let Some(src_addr) = &self.src_addr {
let edge = SimNetEdge {
src: src_addr.clone(),
dst: self.dest_addr.clone(),
};
self.duration_ms = topology
.lock()
.await
.topology
.get(&edge)
.map_or_else(|| 1, |v| v.latency.as_millis() as u64);
}
format!("Sending message to {}", self.dest_addr.clone())
}
}

Expand All @@ -194,12 +164,6 @@ pub async fn bind(addr: ChannelAddr) -> anyhow::Result<(), SimNetError> {
simnet_handle()?.bind(addr)
}

/// Update the configuration for simnet.
pub async fn update_config(config: simnet::NetworkConfig) -> anyhow::Result<(), SimNetError> {
// Only update network config for now, will add host config in the future.
simnet_handle()?.update_network_config(config).await
}

/// Returns a simulated channel address that is bound to "any" channel address.
pub(crate) fn any(transport: ChannelTransport) -> ChannelAddr {
ChannelAddr::Sim(SimAddr {
Expand Down Expand Up @@ -274,12 +238,7 @@ fn create_egress_sender(

#[async_trait]
impl Dispatcher<ChannelAddr> for SimDispatcher {
async fn send(
&self,
_src_addr: Option<ChannelAddr>,
addr: ChannelAddr,
data: Serialized,
) -> Result<(), SimNetError> {
async fn send(&self, addr: ChannelAddr, data: Serialized) -> Result<(), SimNetError> {
self.dispatchers
.get(&addr)
.ok_or_else(|| {
Expand Down Expand Up @@ -318,27 +277,34 @@ pub(crate) struct SimRx<M: RemoteMessage> {
}

#[async_trait]
impl<M: RemoteMessage> Tx<M> for SimTx<M> {
impl<M: RemoteMessage + Any> Tx<M> for SimTx<M> {
fn try_post(&self, message: M, _return_handle: oneshot::Sender<M>) -> Result<(), SendError<M>> {
let data = match Serialized::serialize(&message) {
Ok(data) => data,
Err(err) => return Err(SendError(err.into(), message)),
};

let envelope = (&message as &dyn Any)
.downcast_ref::<MessageEnvelope>()
.expect("RemoteMessage should always be a MessageEnvelope");

let (sender, dest) = (envelope.sender().clone(), envelope.dest().0.clone());

match simnet_handle() {
Ok(handle) => match &self.src_addr {
Some(_) if self.client => handle.send_scheduled_event(ScheduledEvent {
event: Box::new(MessageDeliveryEvent::new(
self.src_addr.clone(),
self.dst_addr.clone(),
data,
)),
time: SimClock.millis_since_start(RealClock.now()),
}),
_ => handle.send_event(Box::new(MessageDeliveryEvent::new(
self.src_addr.clone(),
Ok(handle) => {
let event = Box::new(MessageDeliveryEvent::new(
self.dst_addr.clone(),
data,
))),
handle.sample_latency(sender.proc_id(), dest.proc_id()),
));

match &self.src_addr {
Some(_) if self.client => handle.send_scheduled_event(ScheduledEvent {
event,
time: SimClock.millis_since_start(RealClock.now()),
}),
_ => handle.send_event(event),
}
}
.map_err(|err: SimNetError| SendError(ChannelError::from(err), message)),
Err(err) => Err(SendError(ChannelError::from(err), message)),
Expand Down Expand Up @@ -410,19 +376,27 @@ impl<M: RemoteMessage> Rx<M> for SimRx<M> {
mod tests {
use std::iter::zip;

use ndslice::extent;

use super::*;
use crate::PortId;
use crate::attrs::Attrs;
use crate::clock::Clock;
use crate::clock::RealClock;
use crate::clock::SimClock;
use crate::simnet::NetworkConfig;
use crate::id;
use crate::simnet;
use crate::simnet::LatencyConfig;
use crate::simnet::start;
use crate::simnet::start_with_config;

#[tokio::test]
async fn test_sim_basic() {
let dst_ok = vec!["tcp:[::1]:1234", "tcp:127.0.0.1:8080", "local:123"];
let srcs_ok = vec!["tcp:[::2]:1234", "tcp:127.0.0.2:8080", "local:124"];

start();
let handle = simnet_handle().unwrap();

// TODO: New NodeAdd event should do this for you..
for addr in dst_ok.iter().chain(srcs_ok.iter()) {
Expand All @@ -439,10 +413,24 @@ mod tests {
)
.unwrap();

let (_, mut rx) = sim::serve::<u64>(dst_addr.clone()).unwrap();
let tx = sim::dial::<u64>(dst_addr).unwrap();
tx.try_post(123, oneshot::channel().0).unwrap();
assert_eq!(rx.recv().await.unwrap(), 123);
let (_, mut rx) = sim::serve::<MessageEnvelope>(dst_addr.clone()).unwrap();
let tx = sim::dial::<MessageEnvelope>(dst_addr).unwrap();
let data = Serialized::serialize(&456).unwrap();
let sender = id!(world[0].hello);
let dest = id!(world[1].hello);
let ext = extent!(region = 1, dc = 1, rack = 4, host = 4, gpu = 8);
handle.register_proc(
sender.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 0]).unwrap(),
);
handle.register_proc(
dest.proc_id().clone(),
ext.point(vec![0, 0, 0, 1, 0]).unwrap(),
);

let msg = MessageEnvelope::new(sender, PortId(dest, 0), data.clone(), Attrs::new());
tx.try_post(msg, oneshot::channel().0).unwrap();
assert_eq!(*rx.recv().await.unwrap().data(), data);
}

let records = sim::simnet_handle().unwrap().close().await.unwrap();
Expand Down Expand Up @@ -481,30 +469,47 @@ mod tests {

#[tokio::test]
async fn test_realtime_frontier() {
start();

tokio::time::pause();
// 1 second of latency
start_with_config(LatencyConfig {
inter_host: (100, 100),
..Default::default()
});

let sim_addr = SimAddr::new("unix:@dst".parse::<ChannelAddr>().unwrap()).unwrap();
let sim_addr_with_src = SimAddr::new_with_src(
"unix:@src".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let (_, mut rx) = sim::serve::<()>(sim_addr.clone()).unwrap();
let tx = sim::dial::<()>(sim_addr_with_src).unwrap();
let simnet_config_yaml = r#"
edges:
- src: unix:@src
dst: unix:@dst
metadata:
latency: 100
"#;
update_config(NetworkConfig::from_yaml(simnet_config_yaml).unwrap())
.await
.unwrap();
let (_, mut rx) = sim::serve::<MessageEnvelope>(sim_addr.clone()).unwrap();
let tx = sim::dial::<MessageEnvelope>(sim_addr_with_src).unwrap();

let controller = id!(world[0].controller);
let dest = id!(world[1].dest);
let handle = simnet::simnet_handle().unwrap();

let ext = extent!(region = 1, dc = 1, zone = 1, rack = 4, host = 4, gpu = 8);
handle.register_proc(
controller.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(),
);
handle.register_proc(
dest.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 1, 0]).unwrap(),
);

// This message will be delievered at simulator time = 100 seconds
tx.try_post((), oneshot::channel().0).unwrap();
tx.try_post(
MessageEnvelope::new(
controller,
PortId(dest, 0),
Serialized::serialize(&456).unwrap(),
Attrs::new(),
),
oneshot::channel().0,
)
.unwrap();
{
// Allow simnet to run
tokio::task::yield_now().await;
Expand All @@ -524,41 +529,74 @@ mod tests {
#[tokio::test]
async fn test_client_message_scheduled_realtime() {
tokio::time::pause();
start();
// 1 second of latency
start_with_config(LatencyConfig {
inter_host: (1000, 1000),
..Default::default()
});

let controller_to_dst = SimAddr::new_with_src(
"unix:@controller".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let controller_tx = sim::dial::<()>(controller_to_dst.clone()).unwrap();

let controller_tx = sim::dial::<MessageEnvelope>(controller_to_dst.clone()).unwrap();

let client_to_dst = SimAddr::new_with_client_src(
"unix:@client".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let client_tx = sim::dial::<()>(client_to_dst).unwrap();

// 1 second of latency
let simnet_config_yaml = r#"
edges:
- src: unix:@controller
dst: unix:@dst
metadata:
latency: 1
"#;
update_config(NetworkConfig::from_yaml(simnet_config_yaml).unwrap())
.await
.unwrap();
let client_tx = sim::dial::<MessageEnvelope>(client_to_dst).unwrap();

let controller = id!(world[0].controller);
let dest = id!(world[1].dest);
let client = id!(world[2].client);

let handle = simnet::simnet_handle().unwrap();
let ext = extent!(region = 1, dc = 1, zone = 1, rack = 4, host = 4, gpu = 8);
handle.register_proc(
controller.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(),
);
handle.register_proc(
client.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(),
);
handle.register_proc(
dest.proc_id().clone(),
ext.point(vec![0, 0, 0, 0, 1, 0]).unwrap(),
);

assert_eq!(SimClock.millis_since_start(RealClock.now()), 0);
// Fast forward real time to 5 seconds
tokio::time::advance(tokio::time::Duration::from_secs(5)).await;
{
// Send client message
client_tx.try_post((), oneshot::channel().0).unwrap();
client_tx
.try_post(
MessageEnvelope::new(
client.clone(),
PortId(dest.clone(), 0),
Serialized::serialize(&456).unwrap(),
Attrs::new(),
),
oneshot::channel().0,
)
.unwrap();
// Send system message
controller_tx.try_post((), oneshot::channel().0).unwrap();
controller_tx
.try_post(
MessageEnvelope::new(
controller.clone(),
PortId(dest.clone(), 0),
Serialized::serialize(&456).unwrap(),
Attrs::new(),
),
oneshot::channel().0,
)
.unwrap();
// Allow some time for simnet to run
RealClock.sleep(tokio::time::Duration::from_secs(1)).await;
}
Expand Down
Loading
Loading