Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 117 additions & 24 deletions hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,79 +34,161 @@ use std::sync::LazyLock;
use std::sync::RwLock;
use std::time::Duration;

use serde::Deserialize;
use serde::Serialize;
use shell_quote::QuoteRefExt;

use crate as hyperactor;
use crate::attrs::AttrKeyInfo;
use crate::attrs::AttrValue;
use crate::attrs::Attrs;
use crate::attrs::SerializableValue;
use crate::attrs::declare_attrs;
use crate::data::Encoding;
use crate::data::Encoding; // for macros

/// Metadata describing how a configuration key is exposed across
/// environments.
///
/// Each `ConfigAttr` entry defines how a Rust configuration key maps
/// to external representations:
/// - `env_name`: the environment variable consulted by
/// [`init_from_env()`] when loading configuration.
/// - `py_name`: the Python keyword argument accepted by
/// `monarch.configure(...)` and returned by `get_configuration()`.
///
/// All configuration keys should carry this meta-attribute via
/// `@meta(CONFIG = ConfigAttr { ... })`.
#[derive(Clone, Debug, Serialize, Deserialize, hyperactor::Named)]
pub struct ConfigAttr {
/// Environment variable consulted by `init_from_env()`.
pub env_name: Option<String>,

/// Python kwarg name used by `monarch.configure(...)` and
/// `get_configuration()`.
pub py_name: Option<String>,
}

impl AttrValue for ConfigAttr {
fn display(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "<invalid ConfigAttr>".into())
}
fn parse(s: &str) -> Result<Self, anyhow::Error> {
Ok(serde_json::from_str(s)?)
}
}

// Declare configuration keys using the new attrs system with defaults
declare_attrs! {
/// This is a meta-attribute specifying the environment variable used by the configuration
/// key.
pub attr CONFIG_ENV_VAR: String;

/// This is a meta-attribute specifying the name of the kwarg to pass to monarch.configure()
/// to set the attribute value in the global config.
pub attr PYTHON_CONFIG_KEY: String;
/// This is a meta-attribute marking a configuration key.
///
/// It carries metadata used to bridge Rust, environment
/// variables, and Python:
/// - `env_name`: environment variable name consulted by
/// `init_from_env()`.
/// - `py_name`: keyword argument name recognized by
/// `monarch.configure(...)`.
///
/// All configuration keys should be annotated with this
/// attribute.
pub attr CONFIG: ConfigAttr;

/// Maximum frame length for codec
@meta(CONFIG_ENV_VAR = "HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
py_name: None,
})
pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB

/// Message delivery timeout
@meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
py_name: None,
})
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);

/// Timeout used by allocator for stopping a proc.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
py_name: None,
})
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);

/// Message acknowledgment interval
@meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
py_name: None,
})
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);

/// Number of messages after which to send an acknowledgment
@meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
py_name: None,
})
pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;

/// Default hop Time-To-Live for message envelopes.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
py_name: None,
})
pub attr MESSAGE_TTL_DEFAULT : u8 = 64;

/// Maximum buffer size for split port messages
@meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
py_name: None,
})
pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;

/// The maximum time an update can be buffered before being reduced.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
py_name: None,
})
pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);

/// Timeout used by proc mesh for stopping an actor.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
py_name: None,
})
pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1);

/// Heartbeat interval for remote allocator
@meta(CONFIG_ENV_VAR = "HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
py_name: None,
})
pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// The default encoding to be used.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_DEFAULT_ENCODING".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()),
py_name: None,
})
pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;

/// Whether to use multipart encoding for network channel communications.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_MULTIPART".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()),
py_name: None,
})
pub attr CHANNEL_MULTIPART: bool = true;

/// How often to check for full MSPC channel on NetRx.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
py_name: None,
})
pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);

/// Sampling rate for logging message latency
/// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
py_name: None,
})
pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;

/// Whether to enable client sequence assignment.
Expand All @@ -116,7 +198,10 @@ declare_attrs! {
///
/// Default: 10 seconds. If set to zero, disables the timeout and
/// waits indefinitely.
@meta(CONFIG_ENV_VAR = "HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string())
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
py_name: None,
})
pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10);
}

Expand All @@ -134,9 +219,17 @@ pub fn from_env() -> Attrs {
}

for key in inventory::iter::<AttrKeyInfo>() {
let Some(env_var) = key.meta.get(CONFIG_ENV_VAR) else {
// Skip keys that are not marked as CONFIG or that do not
// declare an environment variable mapping. Only CONFIG-marked
// keys with an `env_name` participate in environment
// initialization.
let Some(cfg_meta) = key.meta.get(CONFIG) else {
continue;
};
let Some(env_var) = cfg_meta.env_name.as_deref() else {
continue;
};

let Ok(val) = env::var(env_var) else {
// Default value
output.push_str("# ");
Expand Down
Loading
Loading