diff --git a/hyperactor/src/config.rs b/hyperactor/src/config.rs index 0214da7fa..b9f26a680 100644 --- a/hyperactor/src/config.rs +++ b/hyperactor/src/config.rs @@ -34,79 +34,161 @@ use std::sync::LazyLock; use std::sync::RwLock; use std::time::Duration; +use serde::Deserialize; +use serde::Serialize; use shell_quote::QuoteRefExt; +use crate as hyperactor; use crate::attrs::AttrKeyInfo; +use crate::attrs::AttrValue; use crate::attrs::Attrs; use crate::attrs::SerializableValue; use crate::attrs::declare_attrs; -use crate::data::Encoding; +use crate::data::Encoding; // for macros + +/// Metadata describing how a configuration key is exposed across +/// environments. +/// +/// Each `ConfigAttr` entry defines how a Rust configuration key maps +/// to external representations: +/// - `env_name`: the environment variable consulted by +/// [`init_from_env()`] when loading configuration. +/// - `py_name`: the Python keyword argument accepted by +/// `monarch.configure(...)` and returned by `get_configuration()`. +/// +/// All configuration keys should carry this meta-attribute via +/// `@meta(CONFIG = ConfigAttr { ... })`. +#[derive(Clone, Debug, Serialize, Deserialize, hyperactor::Named)] +pub struct ConfigAttr { + /// Environment variable consulted by `init_from_env()`. + pub env_name: Option, + + /// Python kwarg name used by `monarch.configure(...)` and + /// `get_configuration()`. + pub py_name: Option, +} + +impl AttrValue for ConfigAttr { + fn display(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| "".into()) + } + fn parse(s: &str) -> Result { + Ok(serde_json::from_str(s)?) + } +} // Declare configuration keys using the new attrs system with defaults declare_attrs! { - /// This is a meta-attribute specifying the environment variable used by the configuration - /// key. - pub attr CONFIG_ENV_VAR: String; - - /// This is a meta-attribute specifying the name of the kwarg to pass to monarch.configure() - /// to set the attribute value in the global config. - pub attr PYTHON_CONFIG_KEY: String; + /// This is a meta-attribute marking a configuration key. + /// + /// It carries metadata used to bridge Rust, environment + /// variables, and Python: + /// - `env_name`: environment variable name consulted by + /// `init_from_env()`. + /// - `py_name`: keyword argument name recognized by + /// `monarch.configure(...)`. + /// + /// All configuration keys should be annotated with this + /// attribute. + pub attr CONFIG: ConfigAttr; /// Maximum frame length for codec - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()), + py_name: None, + }) pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB /// Message delivery timeout - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()), + py_name: None, + }) pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30); /// Timeout used by allocator for stopping a proc. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()), + py_name: None, + }) pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10); /// Message acknowledgment interval - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()), + py_name: None, + }) pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500); /// Number of messages after which to send an acknowledgment - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()), + py_name: None, + }) pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000; /// Default hop Time-To-Live for message envelopes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()), + py_name: None, + }) pub attr MESSAGE_TTL_DEFAULT : u8 = 64; /// Maximum buffer size for split port messages - @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()), + py_name: None, + }) pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5; /// The maximum time an update can be buffered before being reduced. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()), + py_name: None, + }) pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50); /// Timeout used by proc mesh for stopping an actor. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()), + py_name: None, + }) pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1); /// Heartbeat interval for remote allocator - @meta(CONFIG_ENV_VAR = "HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()), + py_name: None, + }) pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// The default encoding to be used. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_DEFAULT_ENCODING".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()), + py_name: None, + }) pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart; /// Whether to use multipart encoding for network channel communications. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_MULTIPART".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()), + py_name: None, + }) pub attr CHANNEL_MULTIPART: bool = true; /// How often to check for full MSPC channel on NetRx. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()), + py_name: None, + }) pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5); /// Sampling rate for logging message latency /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()), + py_name: None, + }) pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01; /// Whether to enable client sequence assignment. @@ -116,7 +198,10 @@ declare_attrs! { /// /// Default: 10 seconds. If set to zero, disables the timeout and /// waits indefinitely. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()), + py_name: None, + }) pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10); } @@ -134,9 +219,17 @@ pub fn from_env() -> Attrs { } for key in inventory::iter::() { - let Some(env_var) = key.meta.get(CONFIG_ENV_VAR) else { + // Skip keys that are not marked as CONFIG or that do not + // declare an environment variable mapping. Only CONFIG-marked + // keys with an `env_name` participate in environment + // initialization. + let Some(cfg_meta) = key.meta.get(CONFIG) else { continue; }; + let Some(env_var) = cfg_meta.env_name.as_deref() else { + continue; + }; + let Ok(val) = env::var(env_var) else { // Default value output.push_str("# "); diff --git a/hyperactor/src/config/global.rs b/hyperactor/src/config/global.rs index fd96379eb..8b3a03a68 100644 --- a/hyperactor/src/config/global.rs +++ b/hyperactor/src/config/global.rs @@ -14,16 +14,15 @@ //! //! - Reads (`get`, `get_cloned`) consult layers in that order, falling //! back to defaults if no explicit value is set. -//! - `attrs()` returns a **complete snapshot** of the effective -//! configuration at call time: it materializes defaults for keys not -//! set in any layer, and omits meta-only keys (like `CONFIG_ENV_VAR`) -//! unless explicitly set. +//! - `attrs()` returns a complete snapshot of all CONFIG-marked keys at +//! call time: it materializes defaults for keys not set in any layer. +//! Keys without @meta(CONFIG = …) are excluded. //! - In tests, `lock()` and `override_key` allow temporary overrides //! that are removed automatically when the guard drops. //! - In normal operation, a parent process can capture its effective //! config via `attrs()` and pass that snapshot to a child during //! bootstrap. The child installs it as a `Runtime` layer so the -//! parent’s values take precedence over Env/File/Defaults. +//! parent's values take precedence over Env/File/Defaults. //! //! This design provides flexibility (easy test overrides, runtime //! updates, YAML/Env baselines) while ensuring type safety and @@ -44,11 +43,13 @@ //! // ... test logic here ... //! } //! ``` +use std::collections::HashMap; use std::marker::PhantomData; use super::*; use crate::attrs::AttrValue; use crate::attrs::Key; +use crate::config::CONFIG; /// Configuration source layers in priority order. /// @@ -87,20 +88,6 @@ fn priority(s: Source) -> u8 { } } -/// A single configuration layer in the global store. -/// -/// Each `Layer` wraps a [`Source`] and its associated [`Attrs`] -/// values. Layers are kept in priority order and consulted during -/// resolution. -#[derive(Clone)] -struct Layer { - /// The origin of this layer (File, Env, Runtime, or - /// TestOverride). - source: Source, - /// The set of attributes explicitly provided by this source. - attrs: Attrs, -} - /// The full set of configuration layers in priority order. /// /// `Layers` wraps a vector of [`Layer`]s, always kept sorted by @@ -115,6 +102,209 @@ struct Layers { ordered: Vec, } +/// A single configuration layer in the global configuration model. +/// +/// Layers are consulted in priority order (`TestOverride → Runtime → +/// Env → File → Default`) when resolving configuration values. Each +/// variant holds an [`Attrs`] map of key/value pairs. +/// +/// The `TestOverride` variant additionally maintains per-key override +/// stacks to support nested and out-of-order test overrides. These +/// stacks are currently placeholders for a future refactor; for now, +/// only the `attrs` field is used. +/// +/// Variants: +/// - [`Layer::File`] — Values loaded from configuration files (lowest +/// explicit priority). +/// - [`Layer::Env`] — Values sourced from process environment +/// variables. +/// - [`Layer::Runtime`] — Programmatically set runtime overrides. +/// - [`Layer::TestOverride`] — Temporary in-test overrides applied +/// under [`ConfigLock`]. +/// +/// Layers are stored in [`Layers::ordered`], kept sorted by their +/// effective [`Source`] priority (`TestOverride` first, `File` last). +enum Layer { + /// Values loaded from configuration files. Lowest explicit + /// priority; only overridden by Env, Runtime, or TestOverride. + File(Attrs), + + /// Values read from process environment variables. Typically + /// installed at startup via [`init_from_env`]. + Env(Attrs), + + /// Values set programmatically at runtime. Stable high-priority + /// layer used by parent/child bootstrap and dynamic updates. + Runtime(Attrs), + + /// Ephemeral values inserted during tests via + /// [`ConfigLock::override_key`]. Always takes precedence over all + /// other layers. Currently holds both the active `attrs` map and + /// a per-key `stacks` table (used to support nested or + /// out-of-order test overrides in future refactors). + TestOverride { + attrs: Attrs, + stacks: HashMap<&'static str, OverrideStack>, + }, +} + +/// A per-key stack of test overrides used by the +/// [`Layer::TestOverride`] layer. +/// +/// Each stack tracks the sequence of active overrides applied to a +/// single configuration key. The topmost frame represents the +/// currently effective override; earlier frames represent older +/// (still live) guards that may drop out of order. +/// +/// Fields: +/// - `env_var`: The associated process environment variable name, if +/// any. +/// - `saved_env`: The original environment variable value before the +/// first override was applied (or `None` if it did not exist). +/// - `frames`: The stack of active override frames, with the top +/// being the last element in the vector. +/// +/// The full stack mechanism is not yet active; it is introduced +/// incrementally to prepare for robust out-of-order test override +/// restoration. +struct OverrideStack { + /// The name of the process environment variable associated with + /// this configuration key, if any. + /// + /// Used to mirror changes to the environment when overrides are + /// applied or removed. `None` if the key has no + /// `CONFIG.env_name`. + env_var: Option, + + /// The original value of the environment variable before the + /// first override was applied. + /// + /// Stored so it can be restored once the last frame is dropped. + /// `None` means the variable did not exist prior to overriding. + saved_env: Option, + + /// The sequence of active override frames for this key. + /// + /// Each frame represents one active test override; the last + /// element (`frames.last()`) is the current top-of-stack and + /// defines the effective value seen in the configuration and + /// environment. + frames: Vec, +} + +/// A single entry in a per-key override stack. +/// +/// Each `OverrideFrame` represents one active test override applied +/// via [`ConfigLock::override_key`]. Frames are uniquely identified +/// by a monotonically increasing token and record both the value +/// being overridden and its string form for environment mirroring. +/// +/// When a guard drops, its frame is removed from the stack; if it was +/// the top, the next frame (if any) becomes active, or the original +/// environment is restored when the stack becomes empty. +struct OverrideFrame { + /// A unique, monotonically increasing identifier for this + /// override frame. + /// + /// Used to associate a dropping [`ConfigValueGuard`] with its + /// corresponding entry in the stack, even if drops occur out of + /// order. + token: u64, + + /// The serialized configuration value active while this frame is + /// on top of its stack. + /// + /// Stored as a boxed [`SerializableValue`] to match how values + /// are kept within [`Attrs`]. + value: Box, + + /// Pre-rendered string form of the value, used for environment + /// variable updates when this frame becomes active. + /// + /// Avoids recomputing `value.display()` on every push or pop. + env_str: String, +} + +/// Return the [`Source`] corresponding to a given [`Layer`]. +/// +/// This provides a uniform way to retrieve a layer's logical source +/// (File, Env, Runtime, or TestOverride) regardless of its internal +/// representation. Used for sorting layers by priority and for +/// source-based lookups or removals. +fn layer_source(l: &Layer) -> Source { + match l { + Layer::File(_) => Source::File, + Layer::Env(_) => Source::Env, + Layer::Runtime(_) => Source::Runtime, + Layer::TestOverride { .. } => Source::TestOverride, + } +} + +/// Return an immutable reference to the [`Attrs`] contained in a +/// [`Layer`]. +/// +/// This abstracts over the specific layer variant so callers can read +/// configuration values uniformly without needing to pattern-match on +/// the layer type. For `TestOverride`, this returns the current +/// top-level attributes reflecting the active overrides. +fn layer_attrs(l: &Layer) -> &Attrs { + match l { + Layer::File(a) | Layer::Env(a) | Layer::Runtime(a) => a, + Layer::TestOverride { attrs, .. } => attrs, + } +} + +/// Return a mutable reference to the [`Attrs`] contained in a +/// [`Layer`]. +/// +/// This allows callers to modify configuration values within any +/// layer without needing to pattern-match on its variant. For +/// `TestOverride`, the returned [`Attrs`] always reflect the current +/// top-of-stack overrides for each key. +fn layer_attrs_mut(l: &mut Layer) -> &mut Attrs { + match l { + Layer::File(a) | Layer::Env(a) | Layer::Runtime(a) => a, + Layer::TestOverride { attrs, .. } => attrs, + } +} + +/// Return the index of the [`Layer::TestOverride`] within the +/// [`Layers`] vector. +/// +/// If a TestOverride layer is present, its position in the ordered +/// list is returned; otherwise, `None` is returned. This is used to +/// locate the active test override layer for inserting or restoring +/// temporary configuration values. +fn test_override_index(layers: &Layers) -> Option { + layers + .ordered + .iter() + .position(|l| matches!(l, Layer::TestOverride { .. })) +} + +/// Ensure a [`Layer::TestOverride`] exists and return a mutable +/// reference to its [`Attrs`]. +/// +/// If the TestOverride layer already exists, a mutable reference to +/// its attributes is returned directly. Otherwise, a new TestOverride +/// layer (with empty `Attrs` and stacks) is created, inserted into +/// the ordered layers according to priority, and then returned. +/// +/// This helper is used by [`ConfigLock::override_key`] to guarantee +/// that test overrides always have a dedicated writable layer. +fn ensure_test_override_layer_mut<'a>(layers: &'a mut Layers) -> &'a mut Attrs { + if let Some(i) = test_override_index(layers) { + return layer_attrs_mut(&mut layers.ordered[i]); + } + layers.ordered.push(Layer::TestOverride { + attrs: Attrs::new(), + stacks: HashMap::new(), + }); + layers.ordered.sort_by_key(|l| priority(layer_source(l))); + let i = test_override_index(layers).expect("just inserted TestOverride layer"); + layer_attrs_mut(&mut layers.ordered[i]) +} + /// Global layered configuration store. /// /// This is the single authoritative store for configuration in @@ -138,10 +328,7 @@ struct Layers { static LAYERS: LazyLock>> = LazyLock::new(|| { let env = super::from_env(); let layers = Layers { - ordered: vec![Layer { - source: Source::Env, - attrs: env, - }], + ordered: vec![Layer::Env(env)], }; Arc::new(RwLock::new(layers)) }); @@ -174,19 +361,17 @@ pub fn lock() -> ConfigLock { } } -/// Initialize the global configuration from environment -/// variables. +/// Initialize the global configuration from environment variables. /// -/// Reads values from process environment variables, using the -/// `CONFIG_ENV_VAR` meta-attribute declared on each key to find -/// its mapping. The resulting values are installed as the -/// [`Source::Env`] layer. Keys without a corresponding -/// environment variable fall back to defaults or higher-priority -/// sources. +/// Reads values from process environment variables, using each key's +/// `CONFIG.env_name` (from `@meta(CONFIG = ConfigAttr { … })`) to +/// determine its mapping. The resulting values are installed as the +/// [`Source::Env`] layer. Keys without a corresponding environment +/// variable fall back to defaults or higher-priority sources. /// -/// Typically invoked once at process startup to overlay config -/// values from the environment. Repeated calls replace the -/// existing Env layer. +/// Typically invoked once at process startup to overlay config values +/// from the environment. Repeated calls replace the existing Env +/// layer. pub fn init_from_env() { set(Source::Env, super::from_env()); } @@ -216,8 +401,9 @@ pub fn init_from_yaml>(path: P) -> Result<(), anyhow::Error> { pub fn get(key: Key) -> T { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { - if layer.attrs.contains_key(key) { - return *layer.attrs.get(key).unwrap(); + let a = layer_attrs(layer); + if let Some(value) = a.get(key) { + return *value; } } *key.default().expect("key must have a default") @@ -242,8 +428,9 @@ pub fn get_cloned(key: Key) -> T { pub fn try_get_cloned(key: Key) -> Option { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { - if layer.attrs.contains_key(key) { - return layer.attrs.get(key).cloned(); + let a = layer_attrs(layer); + if a.contains_key(key) { + return a.get(key).cloned(); } } key.default().cloned() @@ -263,12 +450,20 @@ pub fn try_get_cloned(key: Key) -> Option { /// overriding configuration values. pub fn set(source: Source, attrs: Attrs) { let mut g = LAYERS.write().unwrap(); - if let Some(l) = g.ordered.iter_mut().find(|l| l.source == source) { - l.attrs = attrs; + if let Some(l) = g.ordered.iter_mut().find(|l| layer_source(l) == source) { + *layer_attrs_mut(l) = attrs; } else { - g.ordered.push(Layer { source, attrs }); + g.ordered.push(match source { + Source::File => Layer::File(attrs), + Source::Env => Layer::Env(attrs), + Source::Runtime => Layer::Runtime(attrs), + Source::TestOverride => Layer::TestOverride { + attrs, + stacks: HashMap::new(), + }, + }); } - g.ordered.sort_by_key(|l| priority(l.source)); // TestOverride < Runtime < Env < File + g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File } /// Remove the configuration layer for the given [`Source`], if @@ -278,13 +473,14 @@ pub fn set(source: Source, attrs: Attrs) { /// contribute to resolution in [`get`], [`get_cloned`], or /// [`attrs`]. Defaults and any remaining layers continue to apply /// in their normal priority order. +#[allow(dead_code)] pub(crate) fn clear(source: Source) { let mut g = LAYERS.write().unwrap(); - g.ordered.retain(|l| l.source != source); + g.ordered.retain(|l| layer_source(l) != source); } -/// Return a complete, merged snapshot of the effective -/// configuration. +/// Return a complete, merged snapshot of the effective configuration +/// **(only keys marked with `@meta(CONFIG = ...)`)**. /// /// Resolution per key: /// 1) First explicit value found in layers (TestOverride → @@ -292,8 +488,9 @@ pub(crate) fn clear(source: Source) { /// 2) Otherwise, the key's default (if any). /// /// Notes: -/// - This materializes defaults into the returned Attrs so it's -/// self-contained. +/// - This materializes defaults into the returned Attrs for all +/// CONFIG-marked keys, so it's self-contained. +/// - Keys without `CONFIG` meta are excluded. pub fn attrs() -> Attrs { let layers = LAYERS.read().unwrap(); let mut merged = Attrs::new(); @@ -301,12 +498,17 @@ pub fn attrs() -> Attrs { // Iterate all declared keys (registered via `declare_attrs!` // + inventory). for info in inventory::iter::() { + // Skip keys not marked as `CONFIG`. + if info.meta.get(CONFIG).is_none() { + continue; + } + let name = info.name; // Try to resolve from highest -> lowest priority layer. let mut chosen: Option> = None; for layer in &layers.ordered { - if let Some(v) = layer.attrs.get_value_by_name(name) { + if let Some(v) = layer_attrs(layer).get_value_by_name(name) { chosen = Some(v.cloned()); break; } @@ -346,26 +548,6 @@ pub fn reset_to_defaults() { g.ordered.clear(); } -fn test_override_index(layers: &Layers) -> Option { - layers - .ordered - .iter() - .position(|l| matches!(l.source, Source::TestOverride)) -} - -fn ensure_test_override_layer_mut(layers: &mut Layers) -> &mut Attrs { - if let Some(i) = test_override_index(layers) { - return &mut layers.ordered[i].attrs; - } - layers.ordered.push(Layer { - source: Source::TestOverride, - attrs: Attrs::new(), - }); - layers.ordered.sort_by_key(|l| priority(l.source)); - let i = test_override_index(layers).expect("just inserted TestOverride layer"); - &mut layers.ordered[i].attrs -} - /// A guard that holds the global configuration lock and provides /// override functionality. /// @@ -398,21 +580,17 @@ impl ConfigLock { // Set new override value. layer_attrs.set(key, value.clone()); // Mirror env var. - let orig_env = if let Some(env_var) = key.attrs().get(CONFIG_ENV_VAR) { - let orig = std::env::var(env_var).ok(); - // SAFETY: Mutating process-global environment - // variables is not thread-safe. This path is used - // only in tests while holding the global - // ConfigLock, which serializes config mutations - // across the process. Tests are single-threaded - // with respect to env changes, so there are no - // concurrent readers/writers. We also record the - // original value and restore it in - // ConfigValueGuard::drop. - unsafe { - std::env::set_var(env_var, value.display()); + let orig_env = if let Some(cfg) = key.attrs().get(crate::config::CONFIG) { + if let Some(env_var) = &cfg.env_name { + let orig = std::env::var(env_var).ok(); + // SAFETY: this path is used only in tests under ConfigLock + unsafe { + std::env::set_var(env_var, value.display()); + } + Some((env_var.clone(), orig)) + } else { + None } - Some((env_var.clone(), orig)) } else { None }; @@ -442,15 +620,9 @@ impl ConfigLock { impl Drop for ConfigLock { fn drop(&mut self) { let mut guard = LAYERS.write().unwrap(); - if let Some(pos) = guard - .ordered - .iter() - .position(|l| matches!(l.source, Source::TestOverride)) - { + if let Some(pos) = test_override_index(&guard) { guard.ordered.remove(pos); } - // No need to restore anything else; underlying layers - // remain intact. } } @@ -468,23 +640,24 @@ pub struct ConfigValueGuard<'a, T: 'static> { /// /// - If there was a previous override for this key in the /// [`Source::TestOverride`] layer, that value is reinserted. -/// - If this guard was the only override for the key, the entry -/// is removed from the layer entirely (leaving underlying layers -/// or defaults to apply). -/// - If the key declared a `CONFIG_ENV_VAR`, the corresponding +/// - If this guard was the only override for the key, the entry is +/// removed from the layer entirely (leaving underlying layers or +/// defaults to apply). +/// - If the key declared a `CONFIG.env_name` (via `@meta(CONFIG = +/// ConfigAttr { env_name: Some(...), .. })`), the corresponding /// process environment variable is restored to its original value /// (or removed if it didn't exist). /// /// This ensures that overrides applied via -/// [`ConfigLock::override_key`] are always reverted cleanly when -/// the guard is dropped, without leaking state into subsequent -/// tests or callers. +/// [`ConfigLock::override_key`] are always reverted cleanly when the +/// guard is dropped, without leaking state into subsequent tests or +/// callers. impl Drop for ConfigValueGuard<'_, T> { fn drop(&mut self) { let mut guard = LAYERS.write().unwrap(); if let Some(i) = test_override_index(&guard) { - let layer_attrs = &mut guard.ordered[i].attrs; + let layer_attrs = &mut layer_attrs_mut(&mut guard.ordered[i]); if let Some(prev) = self.orig.take() { layer_attrs.insert_value(self.key, prev); @@ -495,11 +668,9 @@ impl Drop for ConfigValueGuard<'_, T> { } if let Some((k, v)) = self.orig_env.take() { - // SAFETY: process-global environment variables are - // not thread-safe to mutate. This override/restore - // path is only ever used in single-threaded test - // code, and is serialized by the global ConfigLock to - // avoid races between tests. + // SAFETY: only ever used in single-threaded test code and + // serialized by the global ConfigLock to avoid races + // between tests. unsafe { if let Some(v) = v { std::env::set_var(k, v); @@ -645,12 +816,12 @@ mod tests { assert_eq!(snap[CODEC_MAX_FRAME_LENGTH], 10 * 1024 * 1024 * 1024); assert_eq!(snap[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30)); - // CONFIG_ENV_VAR has no default and wasn't explicitly set: - // should be omitted. + // CONFIG has no default and wasn't explicitly set: should be + // omitted. let json = serde_json::to_string(&snap).unwrap(); assert!( - !json.contains("config_env_var"), - "CONFIG_ENV_VAR must not appear in snapshot unless explicitly set" + !json.contains("hyperactor::config::config"), + "CONFIG must not appear in snapshot unless explicitly set" ); } @@ -761,4 +932,34 @@ mod tests { let snap = attrs(); assert_eq!(snap[MESSAGE_TTL_DEFAULT], 20); // Env beats File } + + declare_attrs! { + @meta(CONFIG = ConfigAttr { + env_name: None, + py_name: None, + }) + pub attr CONFIG_KEY: bool = true; + + pub attr NON_CONFIG_KEY: bool = true; + } + + #[test] + fn test_attrs_excludes_non_config_keys() { + let _lock = lock(); + reset_to_defaults(); + + let snap = attrs(); + let json = serde_json::to_string(&snap).unwrap(); + + // Expect our CONFIG_KEY to be present. + assert!( + json.contains("hyperactor::config::global::tests::config_key"), + "attrs() should include keys with @meta(CONFIG = ...)" + ); + // Expect our NON_CONFIG_KEY to be omitted. + assert!( + !json.contains("hyperactor::config::global::tests::non_config_key"), + "attrs() should exclude keys without @meta(CONFIG = ...)" + ); + } } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index c6231d1a9..656180c52 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -39,7 +39,8 @@ use hyperactor::channel::Rx; use hyperactor::channel::Tx; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::config::global as config; use hyperactor::declare_attrs; use hyperactor::host; @@ -71,26 +72,38 @@ declare_attrs! { /// against leaked children; tests usually disable it via /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", /// "false")`. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()), + py_name: None, + }) pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true; /// Maximum number of log lines retained in a proc's stderr/stdout /// tail buffer. Used by [`LogTailer::tee`] when wiring child /// pipes. Default: 100 - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()), + py_name: None, + }) pub attr MESH_TAIL_LOG_LINES: usize = 100; /// Maximum number of child terminations to run concurrently /// during bulk shutdown. Prevents unbounded spawning of /// termination tasks (which could otherwise spike CPU, I/O, or /// file descriptor load). - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()), + py_name: None, + }) pub attr MESH_TERMINATE_CONCURRENCY: usize = 16; /// Per-child grace window for termination. When a shutdown is /// requested, the manager sends SIGTERM and waits this long for /// the child to exit before escalating to SIGKILL. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()), + py_name: None, + }) pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10); } diff --git a/hyperactor_mesh/src/config.rs b/hyperactor_mesh/src/config.rs index 1e4d34cf3..9aa01a09e 100644 --- a/hyperactor_mesh/src/config.rs +++ b/hyperactor_mesh/src/config.rs @@ -12,7 +12,8 @@ //! the base hyperactor configuration system. use hyperactor::attrs::declare_attrs; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; // Declare hyperactor_mesh-specific configuration keys declare_attrs! { @@ -20,6 +21,9 @@ declare_attrs! { /// when reshaping during casting to limit fanout. /// usize::MAX means no reshaping as any shape will always be below /// the limit so no dimension needs to be folded. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()), + py_name: None, + }) pub attr MAX_CAST_DIMENSION_SIZE: usize = usize::MAX; } diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index c06831014..71843b15d 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -31,8 +31,8 @@ use hyperactor::channel; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; -use hyperactor::config::PYTHON_CONFIG_KEY; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::context; use hyperactor::declare_attrs; use hyperactor::mailbox; @@ -87,10 +87,10 @@ use std::sync::RwLock; declare_attrs! { /// Default transport type to use across the application. - @meta( - CONFIG_ENV_VAR = "HYPERACTOR_MESH_DEFAULT_TRANSPORT".to_string(), - PYTHON_CONFIG_KEY = "default_transport".to_string(), - ) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_DEFAULT_TRANSPORT".to_string()), + py_name: Some("default_transport".to_string()), + }) pub attr DEFAULT_TRANSPORT: ChannelTransport = ChannelTransport::Unix; } /// Get the default transport type to use across the application. diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 9cff5aeae..4da564bd1 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -11,7 +11,8 @@ use hyperactor::channel::ChannelTransport; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::declare_attrs; pub mod mesh_agent; @@ -54,8 +55,12 @@ use crate::v1::host_mesh::mesh_agent::ShutdownHostClient; use crate::v1::proc_mesh::ProcRef; declare_attrs! { - /// The maximum idle time between updates while spawning proc meshes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()) + /// The maximum idle time between updates while spawning proc + /// meshes. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()), + py_name: None, + }) pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index b27f97e7a..8569cccaf 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -29,7 +29,8 @@ use hyperactor::channel::ChannelAddr; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::context; use hyperactor::declare_attrs; use hyperactor::mailbox::DialMailboxRouter; @@ -69,8 +70,12 @@ use crate::v1::Name; use crate::v1::ValueMesh; declare_attrs! { - /// The maximum idle time between updates while spawning actor meshes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()) + /// The maximum idle time between updates while spawning actor + /// meshes. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()), + py_name: None, + }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 4466263f9..f3215c35f 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -21,7 +21,8 @@ use hyperactor::attrs::Attrs; use hyperactor::attrs::ErasedKey; use hyperactor::attrs::declare_attrs; use hyperactor::channel::ChannelTransport; -use hyperactor::config::PYTHON_CONFIG_KEY; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::config::global::Source; use pyo3::conversion::IntoPyObjectExt; use pyo3::exceptions::PyTypeError; @@ -46,16 +47,18 @@ pub fn reload_config_from_env() -> PyResult<()> { Ok(()) } -/// Map from name of the kwarg that will be passed to `monarch.configure(...)` -/// to the `Key`` associated with that kwarg. This contains all of the -/// attribute keys with meta-attribute `PYTHON_CONFIG_KEY`. +/// Map from the kwarg name passed to `monarch.configure(...)` to the +/// `Key` associated with that kwarg. This contains all attribute +/// keys whose `@meta(CONFIG = ConfigAttr { py_name: Some(...), .. })` +/// specifies a kwarg name. static KEY_BY_NAME: std::sync::LazyLock> = std::sync::LazyLock::new(|| { inventory::iter::() .filter_map(|info| { info.meta - .get(PYTHON_CONFIG_KEY) - .map(|py_name| (py_name.as_str(), info.erased)) + .get(CONFIG) + .and_then(|cfg: &ConfigAttr| cfg.py_name.as_deref()) + .map(|py_name| (py_name, info.erased)) }) .collect() }); @@ -194,7 +197,7 @@ declare_py_config_type!( /// Iterate over each key-value pair. Attempt to retrieve the `Key` /// associated with the key and convert the value to `T`, then set /// them on the global config. The association between kwarg and -/// `Key` is specified using the `PYTHON_CONFIG_KEY` meta-attribute. +/// `Key` is specified using the `CONFIG` meta-attribute. #[pyfunction] #[pyo3(signature = (**kwargs))] fn configure(py: Python<'_>, kwargs: Option>) -> PyResult<()> { @@ -208,9 +211,10 @@ fn configure(py: Python<'_>, kwargs: Option>) -> PyRes Ok(()) } -/// For all attribute keys with meta-attribute `PYTHON_CONFIG_KEY` defined, return the -/// current associated value in the global config. The key will not be present in the -/// result of it has no value in the global config. +/// For all attribute keys whose `@meta(CONFIG = ConfigAttr { py_name: +/// Some(...), .. })` specifies a kwarg name, return the current +/// associated value in the global config. Keys with no value in the +/// global config are omitted from the result. #[pyfunction] fn get_configuration(py: Python<'_>) -> PyResult> { KEY_BY_NAME