diff --git a/hyperactor/src/attrs.rs b/hyperactor/src/attrs.rs index 801784993..f9f204dc8 100644 --- a/hyperactor/src/attrs.rs +++ b/hyperactor/src/attrs.rs @@ -153,6 +153,13 @@ pub struct Key { attrs: &'static LazyLock, } +impl Key { + /// Returns the name of this key. + pub fn name(&self) -> &'static str { + self.name + } +} + impl Key { /// Creates a new key with the given name. pub const fn new( @@ -167,11 +174,6 @@ impl Key { } } - /// Returns the name of this key. - pub fn name(&self) -> &'static str { - self.name - } - /// Returns a reference to the default value for this key, if one exists. pub fn default(&self) -> Option<&'static T> { self.default_value diff --git a/hyperactor/src/config.rs b/hyperactor/src/config.rs index 0214da7fa..b9f26a680 100644 --- a/hyperactor/src/config.rs +++ b/hyperactor/src/config.rs @@ -34,79 +34,161 @@ use std::sync::LazyLock; use std::sync::RwLock; use std::time::Duration; +use serde::Deserialize; +use serde::Serialize; use shell_quote::QuoteRefExt; +use crate as hyperactor; use crate::attrs::AttrKeyInfo; +use crate::attrs::AttrValue; use crate::attrs::Attrs; use crate::attrs::SerializableValue; use crate::attrs::declare_attrs; -use crate::data::Encoding; +use crate::data::Encoding; // for macros + +/// Metadata describing how a configuration key is exposed across +/// environments. +/// +/// Each `ConfigAttr` entry defines how a Rust configuration key maps +/// to external representations: +/// - `env_name`: the environment variable consulted by +/// [`init_from_env()`] when loading configuration. +/// - `py_name`: the Python keyword argument accepted by +/// `monarch.configure(...)` and returned by `get_configuration()`. +/// +/// All configuration keys should carry this meta-attribute via +/// `@meta(CONFIG = ConfigAttr { ... })`. +#[derive(Clone, Debug, Serialize, Deserialize, hyperactor::Named)] +pub struct ConfigAttr { + /// Environment variable consulted by `init_from_env()`. + pub env_name: Option, + + /// Python kwarg name used by `monarch.configure(...)` and + /// `get_configuration()`. + pub py_name: Option, +} + +impl AttrValue for ConfigAttr { + fn display(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| "".into()) + } + fn parse(s: &str) -> Result { + Ok(serde_json::from_str(s)?) + } +} // Declare configuration keys using the new attrs system with defaults declare_attrs! { - /// This is a meta-attribute specifying the environment variable used by the configuration - /// key. - pub attr CONFIG_ENV_VAR: String; - - /// This is a meta-attribute specifying the name of the kwarg to pass to monarch.configure() - /// to set the attribute value in the global config. - pub attr PYTHON_CONFIG_KEY: String; + /// This is a meta-attribute marking a configuration key. + /// + /// It carries metadata used to bridge Rust, environment + /// variables, and Python: + /// - `env_name`: environment variable name consulted by + /// `init_from_env()`. + /// - `py_name`: keyword argument name recognized by + /// `monarch.configure(...)`. + /// + /// All configuration keys should be annotated with this + /// attribute. + pub attr CONFIG: ConfigAttr; /// Maximum frame length for codec - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()), + py_name: None, + }) pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB /// Message delivery timeout - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()), + py_name: None, + }) pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30); /// Timeout used by allocator for stopping a proc. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()), + py_name: None, + }) pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10); /// Message acknowledgment interval - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()), + py_name: None, + }) pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500); /// Number of messages after which to send an acknowledgment - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()), + py_name: None, + }) pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000; /// Default hop Time-To-Live for message envelopes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()), + py_name: None, + }) pub attr MESSAGE_TTL_DEFAULT : u8 = 64; /// Maximum buffer size for split port messages - @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()), + py_name: None, + }) pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5; /// The maximum time an update can be buffered before being reduced. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()), + py_name: None, + }) pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50); /// Timeout used by proc mesh for stopping an actor. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()), + py_name: None, + }) pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1); /// Heartbeat interval for remote allocator - @meta(CONFIG_ENV_VAR = "HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()), + py_name: None, + }) pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// The default encoding to be used. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_DEFAULT_ENCODING".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()), + py_name: None, + }) pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart; /// Whether to use multipart encoding for network channel communications. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_MULTIPART".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()), + py_name: None, + }) pub attr CHANNEL_MULTIPART: bool = true; /// How often to check for full MSPC channel on NetRx. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()), + py_name: None, + }) pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5); /// Sampling rate for logging message latency /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()), + py_name: None, + }) pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01; /// Whether to enable client sequence assignment. @@ -116,7 +198,10 @@ declare_attrs! { /// /// Default: 10 seconds. If set to zero, disables the timeout and /// waits indefinitely. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()), + py_name: None, + }) pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10); } @@ -134,9 +219,17 @@ pub fn from_env() -> Attrs { } for key in inventory::iter::() { - let Some(env_var) = key.meta.get(CONFIG_ENV_VAR) else { + // Skip keys that are not marked as CONFIG or that do not + // declare an environment variable mapping. Only CONFIG-marked + // keys with an `env_name` participate in environment + // initialization. + let Some(cfg_meta) = key.meta.get(CONFIG) else { continue; }; + let Some(env_var) = cfg_meta.env_name.as_deref() else { + continue; + }; + let Ok(val) = env::var(env_var) else { // Default value output.push_str("# "); diff --git a/hyperactor/src/config/global.rs b/hyperactor/src/config/global.rs index fd96379eb..3e0ee633d 100644 --- a/hyperactor/src/config/global.rs +++ b/hyperactor/src/config/global.rs @@ -14,16 +14,15 @@ //! //! - Reads (`get`, `get_cloned`) consult layers in that order, falling //! back to defaults if no explicit value is set. -//! - `attrs()` returns a **complete snapshot** of the effective -//! configuration at call time: it materializes defaults for keys not -//! set in any layer, and omits meta-only keys (like `CONFIG_ENV_VAR`) -//! unless explicitly set. +//! - `attrs()` returns a complete snapshot of all CONFIG-marked keys at +//! call time: it materializes defaults for keys not set in any layer. +//! Keys without @meta(CONFIG = …) are excluded. //! - In tests, `lock()` and `override_key` allow temporary overrides //! that are removed automatically when the guard drops. //! - In normal operation, a parent process can capture its effective //! config via `attrs()` and pass that snapshot to a child during //! bootstrap. The child installs it as a `Runtime` layer so the -//! parent’s values take precedence over Env/File/Defaults. +//! parent's values take precedence over Env/File/Defaults. //! //! This design provides flexibility (easy test overrides, runtime //! updates, YAML/Env baselines) while ensuring type safety and @@ -44,11 +43,15 @@ //! // ... test logic here ... //! } //! ``` +use std::collections::HashMap; use std::marker::PhantomData; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use super::*; use crate::attrs::AttrValue; use crate::attrs::Key; +use crate::config::CONFIG; /// Configuration source layers in priority order. /// @@ -87,20 +90,6 @@ fn priority(s: Source) -> u8 { } } -/// A single configuration layer in the global store. -/// -/// Each `Layer` wraps a [`Source`] and its associated [`Attrs`] -/// values. Layers are kept in priority order and consulted during -/// resolution. -#[derive(Clone)] -struct Layer { - /// The origin of this layer (File, Env, Runtime, or - /// TestOverride). - source: Source, - /// The set of attributes explicitly provided by this source. - attrs: Attrs, -} - /// The full set of configuration layers in priority order. /// /// `Layers` wraps a vector of [`Layer`]s, always kept sorted by @@ -115,6 +104,186 @@ struct Layers { ordered: Vec, } +/// A single configuration layer in the global configuration model. +/// +/// Layers are consulted in priority order (`TestOverride → Runtime → +/// Env → File → Default`) when resolving configuration values. Each +/// variant holds an [`Attrs`] map of key/value pairs. +/// +/// The `TestOverride` variant additionally maintains per-key override +/// stacks to support nested and out-of-order test overrides. These +/// stacks are currently placeholders for a future refactor; for now, +/// only the `attrs` field is used. +/// +/// Variants: +/// - [`Layer::File`] — Values loaded from configuration files (lowest +/// explicit priority). +/// - [`Layer::Env`] — Values sourced from process environment +/// variables. +/// - [`Layer::Runtime`] — Programmatically set runtime overrides. +/// - [`Layer::TestOverride`] — Temporary in-test overrides applied +/// under [`ConfigLock`]. +/// +/// Layers are stored in [`Layers::ordered`], kept sorted by their +/// effective [`Source`] priority (`TestOverride` first, `File` last). +enum Layer { + /// Values loaded from configuration files. Lowest explicit + /// priority; only overridden by Env, Runtime, or TestOverride. + File(Attrs), + + /// Values read from process environment variables. Typically + /// installed at startup via [`init_from_env`]. + Env(Attrs), + + /// Values set programmatically at runtime. Stable high-priority + /// layer used by parent/child bootstrap and dynamic updates. + Runtime(Attrs), + + /// Ephemeral values inserted during tests via + /// [`ConfigLock::override_key`]. Always takes precedence over all + /// other layers. Currently holds both the active `attrs` map and + /// a per-key `stacks` table (used to support nested or + /// out-of-order test overrides in future refactors). + TestOverride { + attrs: Attrs, + stacks: HashMap<&'static str, OverrideStack>, + }, +} + +/// A per-key stack of test overrides used by the +/// [`Layer::TestOverride`] layer. +/// +/// Each stack tracks the sequence of active overrides applied to a +/// single configuration key. The topmost frame represents the +/// currently effective override; earlier frames represent older +/// (still live) guards that may drop out of order. +/// +/// Fields: +/// - `env_var`: The associated process environment variable name, if +/// any. +/// - `saved_env`: The original environment variable value before the +/// first override was applied (or `None` if it did not exist). +/// - `frames`: The stack of active override frames, with the top +/// being the last element in the vector. +/// +/// The full stack mechanism is not yet active; it is introduced +/// incrementally to prepare for robust out-of-order test override +/// restoration. +struct OverrideStack { + /// The name of the process environment variable associated with + /// this configuration key, if any. + /// + /// Used to mirror changes to the environment when overrides are + /// applied or removed. `None` if the key has no + /// `CONFIG.env_name`. + env_var: Option, + + /// The original value of the environment variable before the + /// first override was applied. + /// + /// Stored so it can be restored once the last frame is dropped. + /// `None` means the variable did not exist prior to overriding. + saved_env: Option, + + /// The sequence of active override frames for this key. + /// + /// Each frame represents one active test override; the last + /// element (`frames.last()`) is the current top-of-stack and + /// defines the effective value seen in the configuration and + /// environment. + frames: Vec, +} + +/// A single entry in a per-key override stack. +/// +/// Each `OverrideFrame` represents one active test override applied +/// via [`ConfigLock::override_key`]. Frames are uniquely identified +/// by a monotonically increasing token and record both the value +/// being overridden and its string form for environment mirroring. +/// +/// When a guard drops, its frame is removed from the stack; if it was +/// the top, the next frame (if any) becomes active, or the original +/// environment is restored when the stack becomes empty. +struct OverrideFrame { + /// A unique, monotonically increasing identifier for this + /// override frame. + /// + /// Used to associate a dropping [`ConfigValueGuard`] with its + /// corresponding entry in the stack, even if drops occur out of + /// order. + token: u64, + + /// The serialized configuration value active while this frame is + /// on top of its stack. + /// + /// Stored as a boxed [`SerializableValue`] to match how values + /// are kept within [`Attrs`]. + value: Box, + + /// Pre-rendered string form of the value, used for environment + /// variable updates when this frame becomes active. + /// + /// Avoids recomputing `value.display()` on every push or pop. + env_str: String, +} + +/// Return the [`Source`] corresponding to a given [`Layer`]. +/// +/// This provides a uniform way to retrieve a layer's logical source +/// (File, Env, Runtime, or TestOverride) regardless of its internal +/// representation. Used for sorting layers by priority and for +/// source-based lookups or removals. +fn layer_source(l: &Layer) -> Source { + match l { + Layer::File(_) => Source::File, + Layer::Env(_) => Source::Env, + Layer::Runtime(_) => Source::Runtime, + Layer::TestOverride { .. } => Source::TestOverride, + } +} + +/// Return an immutable reference to the [`Attrs`] contained in a +/// [`Layer`]. +/// +/// This abstracts over the specific layer variant so callers can read +/// configuration values uniformly without needing to pattern-match on +/// the layer type. For `TestOverride`, this returns the current +/// top-level attributes reflecting the active overrides. +fn layer_attrs(l: &Layer) -> &Attrs { + match l { + Layer::File(a) | Layer::Env(a) | Layer::Runtime(a) => a, + Layer::TestOverride { attrs, .. } => attrs, + } +} + +/// Return a mutable reference to the [`Attrs`] contained in a +/// [`Layer`]. +/// +/// This allows callers to modify configuration values within any +/// layer without needing to pattern-match on its variant. For +/// `TestOverride`, the returned [`Attrs`] always reflect the current +/// top-of-stack overrides for each key. +fn layer_attrs_mut(l: &mut Layer) -> &mut Attrs { + match l { + Layer::File(a) | Layer::Env(a) | Layer::Runtime(a) => a, + Layer::TestOverride { attrs, .. } => attrs, + } +} + +/// Return the index of the [`Layer::TestOverride`] within the +/// [`Layers`] vector. +/// +/// If a TestOverride layer is present, its position in the ordered +/// list is returned; otherwise, `None` is returned. This is used to +/// locate the active test override layer for inserting or restoring +/// temporary configuration values. +fn test_override_index(layers: &Layers) -> Option { + layers + .ordered + .iter() + .position(|l| matches!(l, Layer::TestOverride { .. })) +} + /// Global layered configuration store. /// /// This is the single authoritative store for configuration in @@ -138,14 +307,21 @@ struct Layers { static LAYERS: LazyLock>> = LazyLock::new(|| { let env = super::from_env(); let layers = Layers { - ordered: vec![Layer { - source: Source::Env, - attrs: env, - }], + ordered: vec![Layer::Env(env)], }; Arc::new(RwLock::new(layers)) }); +/// Monotonically increasing sequence used to assign unique tokens to +/// each test override frame. +/// +/// Tokens identify individual [`ConfigValueGuard`] instances within a +/// key's override stack, allowing frames to be removed safely even +/// when guards are dropped out of order. The counter starts at 1 and +/// uses relaxed atomic ordering since exact sequencing across threads +/// is not required—only uniqueness. +static OVERRIDE_TOKEN_SEQ: AtomicU64 = AtomicU64::new(1); + /// Acquire the global configuration lock. /// /// This lock serializes all mutations of the global @@ -174,19 +350,17 @@ pub fn lock() -> ConfigLock { } } -/// Initialize the global configuration from environment -/// variables. +/// Initialize the global configuration from environment variables. /// -/// Reads values from process environment variables, using the -/// `CONFIG_ENV_VAR` meta-attribute declared on each key to find -/// its mapping. The resulting values are installed as the -/// [`Source::Env`] layer. Keys without a corresponding -/// environment variable fall back to defaults or higher-priority -/// sources. +/// Reads values from process environment variables, using each key's +/// `CONFIG.env_name` (from `@meta(CONFIG = ConfigAttr { … })`) to +/// determine its mapping. The resulting values are installed as the +/// [`Source::Env`] layer. Keys without a corresponding environment +/// variable fall back to defaults or higher-priority sources. /// -/// Typically invoked once at process startup to overlay config -/// values from the environment. Repeated calls replace the -/// existing Env layer. +/// Typically invoked once at process startup to overlay config values +/// from the environment. Repeated calls replace the existing Env +/// layer. pub fn init_from_env() { set(Source::Env, super::from_env()); } @@ -216,8 +390,9 @@ pub fn init_from_yaml>(path: P) -> Result<(), anyhow::Error> { pub fn get(key: Key) -> T { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { - if layer.attrs.contains_key(key) { - return *layer.attrs.get(key).unwrap(); + let a = layer_attrs(layer); + if let Some(value) = a.get(key) { + return *value; } } *key.default().expect("key must have a default") @@ -242,8 +417,9 @@ pub fn get_cloned(key: Key) -> T { pub fn try_get_cloned(key: Key) -> Option { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { - if layer.attrs.contains_key(key) { - return layer.attrs.get(key).cloned(); + let a = layer_attrs(layer); + if a.contains_key(key) { + return a.get(key).cloned(); } } key.default().cloned() @@ -263,12 +439,20 @@ pub fn try_get_cloned(key: Key) -> Option { /// overriding configuration values. pub fn set(source: Source, attrs: Attrs) { let mut g = LAYERS.write().unwrap(); - if let Some(l) = g.ordered.iter_mut().find(|l| l.source == source) { - l.attrs = attrs; + if let Some(l) = g.ordered.iter_mut().find(|l| layer_source(l) == source) { + *layer_attrs_mut(l) = attrs; } else { - g.ordered.push(Layer { source, attrs }); + g.ordered.push(match source { + Source::File => Layer::File(attrs), + Source::Env => Layer::Env(attrs), + Source::Runtime => Layer::Runtime(attrs), + Source::TestOverride => Layer::TestOverride { + attrs, + stacks: HashMap::new(), + }, + }); } - g.ordered.sort_by_key(|l| priority(l.source)); // TestOverride < Runtime < Env < File + g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File } /// Remove the configuration layer for the given [`Source`], if @@ -278,13 +462,14 @@ pub fn set(source: Source, attrs: Attrs) { /// contribute to resolution in [`get`], [`get_cloned`], or /// [`attrs`]. Defaults and any remaining layers continue to apply /// in their normal priority order. +#[allow(dead_code)] pub(crate) fn clear(source: Source) { let mut g = LAYERS.write().unwrap(); - g.ordered.retain(|l| l.source != source); + g.ordered.retain(|l| layer_source(l) != source); } -/// Return a complete, merged snapshot of the effective -/// configuration. +/// Return a complete, merged snapshot of the effective configuration +/// **(only keys marked with `@meta(CONFIG = ...)`)**. /// /// Resolution per key: /// 1) First explicit value found in layers (TestOverride → @@ -292,8 +477,9 @@ pub(crate) fn clear(source: Source) { /// 2) Otherwise, the key's default (if any). /// /// Notes: -/// - This materializes defaults into the returned Attrs so it's -/// self-contained. +/// - This materializes defaults into the returned Attrs for all +/// CONFIG-marked keys, so it's self-contained. +/// - Keys without `CONFIG` meta are excluded. pub fn attrs() -> Attrs { let layers = LAYERS.read().unwrap(); let mut merged = Attrs::new(); @@ -301,12 +487,17 @@ pub fn attrs() -> Attrs { // Iterate all declared keys (registered via `declare_attrs!` // + inventory). for info in inventory::iter::() { + // Skip keys not marked as `CONFIG`. + if info.meta.get(CONFIG).is_none() { + continue; + } + let name = info.name; // Try to resolve from highest -> lowest priority layer. let mut chosen: Option> = None; for layer in &layers.ordered { - if let Some(v) = layer.attrs.get_value_by_name(name) { + if let Some(v) = layer_attrs(layer).get_value_by_name(name) { chosen = Some(v.cloned()); break; } @@ -346,31 +537,11 @@ pub fn reset_to_defaults() { g.ordered.clear(); } -fn test_override_index(layers: &Layers) -> Option { - layers - .ordered - .iter() - .position(|l| matches!(l.source, Source::TestOverride)) -} - -fn ensure_test_override_layer_mut(layers: &mut Layers) -> &mut Attrs { - if let Some(i) = test_override_index(layers) { - return &mut layers.ordered[i].attrs; - } - layers.ordered.push(Layer { - source: Source::TestOverride, - attrs: Attrs::new(), - }); - layers.ordered.sort_by_key(|l| priority(l.source)); - let i = test_override_index(layers).expect("just inserted TestOverride layer"); - &mut layers.ordered[i].attrs -} - /// A guard that holds the global configuration lock and provides /// override functionality. /// -/// This struct acts as both a lock guard (preventing other tests -/// from modifying global config) and as the only way to create +/// This struct acts as both a lock guard (preventing other tests from +/// modifying global config) and as the only way to create /// configuration overrides. Override guards cannot outlive this /// ConfigLock, ensuring proper synchronization. pub struct ConfigLock { @@ -378,51 +549,84 @@ pub struct ConfigLock { } impl ConfigLock { - /// Create a configuration override that will be restored when - /// the guard is dropped. + /// Create a configuration override that is active until the + /// returned guard is dropped. + /// + /// Each call pushes a new frame onto a per-key override stack + /// within the [`Source::TestOverride`] layer. The topmost frame + /// defines the effective value seen by `get()` and in the + /// mirrored environment variable (if any). When a guard is + /// dropped, its frame is removed: if it was the top, the previous + /// frame (if any) becomes active or the key and env var are + /// restored to their prior state. /// - /// The returned guard must not outlive this ConfigLock. + /// The returned guard must not outlive this [`ConfigLock`]. pub fn override_key<'a, T: AttrValue>( &'a self, key: crate::attrs::Key, value: T, ) -> ConfigValueGuard<'a, T> { - // Write into the single TestOverride layer (create if - // needed). - let (prev_in_layer, orig_env) = { - let mut guard = LAYERS.write().unwrap(); - let layer_attrs = ensure_test_override_layer_mut(&mut guard); - // Save any previous override for this key in the the - // TestOverride layer. - let prev = layer_attrs.remove_value(key); - // Set new override value. - layer_attrs.set(key, value.clone()); - // Mirror env var. - let orig_env = if let Some(env_var) = key.attrs().get(CONFIG_ENV_VAR) { - let orig = std::env::var(env_var).ok(); - // SAFETY: Mutating process-global environment - // variables is not thread-safe. This path is used - // only in tests while holding the global - // ConfigLock, which serializes config mutations - // across the process. Tests are single-threaded - // with respect to env changes, so there are no - // concurrent readers/writers. We also record the - // original value and restore it in - // ConfigValueGuard::drop. - unsafe { - std::env::set_var(env_var, value.display()); - } - Some((env_var.clone(), orig)) + let token = OVERRIDE_TOKEN_SEQ.fetch_add(1, Ordering::Relaxed); + + let mut g = LAYERS.write().unwrap(); + + // Ensure TestOverride layer exists. + let idx = if let Some(i) = test_override_index(&g) { + i + } else { + g.ordered.push(Layer::TestOverride { + attrs: Attrs::new(), + stacks: HashMap::new(), + }); + g.ordered.sort_by_key(|l| priority(layer_source(l))); + test_override_index(&g).expect("just inserted TestOverride layer") + }; + + // Mutably access TestOverride's attrs + stacks. + let (attrs, stacks) = match &mut g.ordered[idx] { + Layer::TestOverride { attrs, stacks } => (attrs, stacks), + _ => unreachable!(), + }; + + // Compute env var (if any) for this key once. + let (env_var, env_str) = if let Some(cfg) = key.attrs().get(crate::config::CONFIG) { + if let Some(name) = &cfg.env_name { + (Some(name.clone()), value.display()) } else { - None - }; - (prev, orig_env) + (None, String::new()) + } + } else { + (None, String::new()) }; + // Get per-key stack (by declared name). + let key_name = key.name(); + let stack = stacks.entry(key_name).or_insert_with(|| OverrideStack { + env_var: env_var.clone(), + saved_env: env_var.as_ref().and_then(|n| std::env::var(n).ok()), + frames: Vec::new(), + }); + + // Push the new frame. + let boxed: Box = Box::new(value.clone()); + stack.frames.push(OverrideFrame { + token, + value: boxed, + env_str, + }); + + // Make this frame the active value in TestOverride attrs. + attrs.set(key, value.clone()); + + // Update process env to reflect new top-of-stack. + if let (Some(var), Some(top)) = (stack.env_var.as_ref(), stack.frames.last()) { + // SAFETY: Under global ConfigLock during tests. + unsafe { std::env::set_var(var, &top.env_str) } + } + ConfigValueGuard { key, - orig: prev_in_layer, // previous value for this key *inside* TestOverride layer - orig_env, + token, _phantom: PhantomData, } } @@ -442,71 +646,119 @@ impl ConfigLock { impl Drop for ConfigLock { fn drop(&mut self) { let mut guard = LAYERS.write().unwrap(); - if let Some(pos) = guard - .ordered - .iter() - .position(|l| matches!(l.source, Source::TestOverride)) - { + if let Some(pos) = test_override_index(&guard) { guard.ordered.remove(pos); } - // No need to restore anything else; underlying layers - // remain intact. } } /// A guard that restores a single configuration value when dropped pub struct ConfigValueGuard<'a, T: 'static> { key: crate::attrs::Key, - orig: Option>, - orig_env: Option<(String, Option)>, + token: u64, // This is here so we can hold onto a 'a lifetime. _phantom: PhantomData<&'a ()>, } -/// When a [`ConfigValueGuard`] is dropped, it restores the -/// configuration state for the key it was guarding: -/// -/// - If there was a previous override for this key in the -/// [`Source::TestOverride`] layer, that value is reinserted. -/// - If this guard was the only override for the key, the entry -/// is removed from the layer entirely (leaving underlying layers -/// or defaults to apply). -/// - If the key declared a `CONFIG_ENV_VAR`, the corresponding -/// process environment variable is restored to its original value -/// (or removed if it didn't exist). -/// -/// This ensures that overrides applied via -/// [`ConfigLock::override_key`] are always reverted cleanly when -/// the guard is dropped, without leaking state into subsequent -/// tests or callers. +/// When a [`ConfigValueGuard`] is dropped, it restores configuration +/// state for the key it was guarding. +/// +/// Behavior: +/// - Each key maintains a stack of override frames. The most recent +/// frame (top of stack) defines the effective value in +/// [`Source::TestOverride`]. +/// - Dropping a guard removes its frame. If it was the top frame, the +/// next frame (if any) becomes active and both the config and +/// mirrored env var are updated accordingly. +/// - If the dropped frame was not on top, no changes occur until the +/// active frame is dropped. +/// - When the last frame for a key is removed, the key is deleted +/// from the TestOverride layer and its associated environment +/// variable (if any) is restored to its original value or removed +/// if it did not exist. +/// +/// This guarantees that nested or out-of-order test overrides are +/// restored deterministically and without leaking state into +/// subsequent tests. impl Drop for ConfigValueGuard<'_, T> { fn drop(&mut self) { - let mut guard = LAYERS.write().unwrap(); - - if let Some(i) = test_override_index(&guard) { - let layer_attrs = &mut guard.ordered[i].attrs; + let mut g = LAYERS.write().unwrap(); + let i = if let Some(i) = test_override_index(&g) { + i + } else { + return; + }; - if let Some(prev) = self.orig.take() { - layer_attrs.insert_value(self.key, prev); - } else { - // remove without needing T: AttrValue - let _ = layer_attrs.remove_value(self.key); - } - } + // Access TestOverride internals + let (attrs, stacks) = match &mut g.ordered[i] { + Layer::TestOverride { attrs, stacks } => (attrs, stacks), + _ => unreachable!("TestOverride index points to non-TestOverride layer"), + }; - if let Some((k, v)) = self.orig_env.take() { - // SAFETY: process-global environment variables are - // not thread-safe to mutate. This override/restore - // path is only ever used in single-threaded test - // code, and is serialized by the global ConfigLock to - // avoid races between tests. - unsafe { - if let Some(v) = v { - std::env::set_var(k, v); + let key_name = self.key.name(); + + // We need a tiny scope for the &mut borrow of the stack so we + // can call `stacks.remove(key_name)` afterward if it becomes + // empty. + let mut remove_empty_stack = false; + let mut restore_env_var: Option = None; + let mut restore_env_to: Option = None; + + if let Some(stack) = stacks.get_mut(key_name) { + // Find this guard's frame by token. + if let Some(pos) = stack.frames.iter().position(|f| f.token == self.token) { + let is_top = pos + 1 == stack.frames.len(); + + if is_top { + // Pop the active frame + stack.frames.pop(); + + if let Some(new_top) = stack.frames.last() { + // New top becomes active: update attrs and env. + attrs.insert_value(self.key, (*new_top.value).cloned()); + if let Some(var) = stack.env_var.as_ref() { + // SAFETY: Under global ConfigLock during tests. + unsafe { std::env::set_var(var, &new_top.env_str) } + } + } else { + // Stack empty: remove the key now, then after + // releasing the &mut borrow of the stack, + // restore the env var and remove the stack + // entry. + let _ = attrs.remove_value(self.key); + + // Capture restoration details while we still + // have access to the stack. + if let Some(var) = stack.env_var.as_ref() { + restore_env_var = Some(var.clone()); + restore_env_to = stack.saved_env.clone(); // None => unset + } + remove_empty_stack = true + } } else { - std::env::remove_var(&k); + // Out-of-order drop: remove only that frame: + // active top stays + stack.frames.remove(pos); + // No changes to attrs or env here. + } + } // else: token already handled; nothing to do + } // &must stack borrow ends here + + // If we emptied the stack for this key, restore env and drop + // the stack entry. + if remove_empty_stack { + if let Some(var) = restore_env_var.as_ref() { + // SAFETY: Under global ConfigLock during tests. + unsafe { + if let Some(val) = restore_env_to.as_ref() { + std::env::set_var(var, val); + } else { + std::env::remove_var(var); + } } } + // Now it's safe to remove the stack from the map. + let _ = stacks.remove(key_name); } } } @@ -645,12 +897,12 @@ mod tests { assert_eq!(snap[CODEC_MAX_FRAME_LENGTH], 10 * 1024 * 1024 * 1024); assert_eq!(snap[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30)); - // CONFIG_ENV_VAR has no default and wasn't explicitly set: - // should be omitted. + // CONFIG has no default and wasn't explicitly set: should be + // omitted. let json = serde_json::to_string(&snap).unwrap(); assert!( - !json.contains("config_env_var"), - "CONFIG_ENV_VAR must not appear in snapshot unless explicitly set" + !json.contains("hyperactor::config::config"), + "CONFIG must not appear in snapshot unless explicitly set" ); } @@ -761,4 +1013,143 @@ mod tests { let snap = attrs(); assert_eq!(snap[MESSAGE_TTL_DEFAULT], 20); // Env beats File } + + declare_attrs! { + @meta(CONFIG = ConfigAttr { + env_name: None, + py_name: None, + }) + pub attr CONFIG_KEY: bool = true; + + pub attr NON_CONFIG_KEY: bool = true; + } + + #[test] + fn test_attrs_excludes_non_config_keys() { + let _lock = lock(); + reset_to_defaults(); + + let snap = attrs(); + let json = serde_json::to_string(&snap).unwrap(); + + // Expect our CONFIG_KEY to be present. + assert!( + json.contains("hyperactor::config::global::tests::config_key"), + "attrs() should include keys with @meta(CONFIG = ...)" + ); + // Expect our NON_CONFIG_KEY to be omitted. + assert!( + !json.contains("hyperactor::config::global::tests::non_config_key"), + "attrs() should exclude keys without @meta(CONFIG = ...)" + ); + } + + #[test] + fn test_testoverride_multiple_stacked_overrides_lifo() { + let lock = lock(); + reset_to_defaults(); + + // Baseline sanity. + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(30)); + + // Start from a clean env so we can assert restoration to "unset". + // SAFETY: single-threaded tests. + unsafe { + std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT"); + } + assert!(std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").is_err()); + + // Stack A: 40s (becomes top) + let guard_a = lock.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(40)); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(40)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(40) + ); + } + + // Stack B: 50s (new top) + let guard_b = lock.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(50)); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(50)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(50) + ); + } + + // Drop B first → should reveal A (LIFO) + std::mem::drop(guard_b); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(40)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(40) + ); + } + + // Drop A → should restore default and unset env. + std::mem::drop(guard_a); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(30)); + assert!(std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").is_err()); + } + + #[test] + fn test_testoverride_out_of_order_drop_keeps_top_stable() { + let lock = lock(); + reset_to_defaults(); + + // Clean env baseline. + // SAFETY: single-threaded tests. + unsafe { + std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT"); + } + assert!(std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").is_err()); + + // Push three frames in order: A=40s, B=50s, C=70s (C is top). + let guard_a = lock.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(40)); + let guard_b = lock.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(50)); + let guard_c = lock.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(70)); + + // Top is C. + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(70)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(70) + ); + } + + // Drop the *middle* frame (B) first → top must remain C, env unchanged. + std::mem::drop(guard_b); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(70)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(70) + ); + } + + // Now drop C → A becomes top, env follows A. + std::mem::drop(guard_c); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(40)); + { + let s = std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").unwrap(); + assert_eq!( + humantime::parse_duration(&s).unwrap(), + Duration::from_secs(40) + ); + } + + // Drop A → restore default and clear env. + std::mem::drop(guard_a); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(30)); + assert!(std::env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT").is_err()); + } } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index c6231d1a9..656180c52 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -39,7 +39,8 @@ use hyperactor::channel::Rx; use hyperactor::channel::Tx; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::config::global as config; use hyperactor::declare_attrs; use hyperactor::host; @@ -71,26 +72,38 @@ declare_attrs! { /// against leaked children; tests usually disable it via /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", /// "false")`. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()), + py_name: None, + }) pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true; /// Maximum number of log lines retained in a proc's stderr/stdout /// tail buffer. Used by [`LogTailer::tee`] when wiring child /// pipes. Default: 100 - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()), + py_name: None, + }) pub attr MESH_TAIL_LOG_LINES: usize = 100; /// Maximum number of child terminations to run concurrently /// during bulk shutdown. Prevents unbounded spawning of /// termination tasks (which could otherwise spike CPU, I/O, or /// file descriptor load). - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()), + py_name: None, + }) pub attr MESH_TERMINATE_CONCURRENCY: usize = 16; /// Per-child grace window for termination. When a shutdown is /// requested, the manager sends SIGTERM and waits this long for /// the child to exit before escalating to SIGKILL. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()), + py_name: None, + }) pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10); } diff --git a/hyperactor_mesh/src/config.rs b/hyperactor_mesh/src/config.rs index 1e4d34cf3..9aa01a09e 100644 --- a/hyperactor_mesh/src/config.rs +++ b/hyperactor_mesh/src/config.rs @@ -12,7 +12,8 @@ //! the base hyperactor configuration system. use hyperactor::attrs::declare_attrs; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; // Declare hyperactor_mesh-specific configuration keys declare_attrs! { @@ -20,6 +21,9 @@ declare_attrs! { /// when reshaping during casting to limit fanout. /// usize::MAX means no reshaping as any shape will always be below /// the limit so no dimension needs to be folded. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()), + py_name: None, + }) pub attr MAX_CAST_DIMENSION_SIZE: usize = usize::MAX; } diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index c06831014..71843b15d 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -31,8 +31,8 @@ use hyperactor::channel; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; -use hyperactor::config::PYTHON_CONFIG_KEY; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::context; use hyperactor::declare_attrs; use hyperactor::mailbox; @@ -87,10 +87,10 @@ use std::sync::RwLock; declare_attrs! { /// Default transport type to use across the application. - @meta( - CONFIG_ENV_VAR = "HYPERACTOR_MESH_DEFAULT_TRANSPORT".to_string(), - PYTHON_CONFIG_KEY = "default_transport".to_string(), - ) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_DEFAULT_TRANSPORT".to_string()), + py_name: Some("default_transport".to_string()), + }) pub attr DEFAULT_TRANSPORT: ChannelTransport = ChannelTransport::Unix; } /// Get the default transport type to use across the application. diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 9cff5aeae..4da564bd1 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -11,7 +11,8 @@ use hyperactor::channel::ChannelTransport; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::declare_attrs; pub mod mesh_agent; @@ -54,8 +55,12 @@ use crate::v1::host_mesh::mesh_agent::ShutdownHostClient; use crate::v1::proc_mesh::ProcRef; declare_attrs! { - /// The maximum idle time between updates while spawning proc meshes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()) + /// The maximum idle time between updates while spawning proc + /// meshes. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()), + py_name: None, + }) pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index b27f97e7a..8569cccaf 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -29,7 +29,8 @@ use hyperactor::channel::ChannelAddr; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::config; -use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::context; use hyperactor::declare_attrs; use hyperactor::mailbox::DialMailboxRouter; @@ -69,8 +70,12 @@ use crate::v1::Name; use crate::v1::ValueMesh; declare_attrs! { - /// The maximum idle time between updates while spawning actor meshes. - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()) + /// The maximum idle time between updates while spawning actor + /// meshes. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()), + py_name: None, + }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 4466263f9..f3215c35f 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -21,7 +21,8 @@ use hyperactor::attrs::Attrs; use hyperactor::attrs::ErasedKey; use hyperactor::attrs::declare_attrs; use hyperactor::channel::ChannelTransport; -use hyperactor::config::PYTHON_CONFIG_KEY; +use hyperactor::config::CONFIG; +use hyperactor::config::ConfigAttr; use hyperactor::config::global::Source; use pyo3::conversion::IntoPyObjectExt; use pyo3::exceptions::PyTypeError; @@ -46,16 +47,18 @@ pub fn reload_config_from_env() -> PyResult<()> { Ok(()) } -/// Map from name of the kwarg that will be passed to `monarch.configure(...)` -/// to the `Key`` associated with that kwarg. This contains all of the -/// attribute keys with meta-attribute `PYTHON_CONFIG_KEY`. +/// Map from the kwarg name passed to `monarch.configure(...)` to the +/// `Key` associated with that kwarg. This contains all attribute +/// keys whose `@meta(CONFIG = ConfigAttr { py_name: Some(...), .. })` +/// specifies a kwarg name. static KEY_BY_NAME: std::sync::LazyLock> = std::sync::LazyLock::new(|| { inventory::iter::() .filter_map(|info| { info.meta - .get(PYTHON_CONFIG_KEY) - .map(|py_name| (py_name.as_str(), info.erased)) + .get(CONFIG) + .and_then(|cfg: &ConfigAttr| cfg.py_name.as_deref()) + .map(|py_name| (py_name, info.erased)) }) .collect() }); @@ -194,7 +197,7 @@ declare_py_config_type!( /// Iterate over each key-value pair. Attempt to retrieve the `Key` /// associated with the key and convert the value to `T`, then set /// them on the global config. The association between kwarg and -/// `Key` is specified using the `PYTHON_CONFIG_KEY` meta-attribute. +/// `Key` is specified using the `CONFIG` meta-attribute. #[pyfunction] #[pyo3(signature = (**kwargs))] fn configure(py: Python<'_>, kwargs: Option>) -> PyResult<()> { @@ -208,9 +211,10 @@ fn configure(py: Python<'_>, kwargs: Option>) -> PyRes Ok(()) } -/// For all attribute keys with meta-attribute `PYTHON_CONFIG_KEY` defined, return the -/// current associated value in the global config. The key will not be present in the -/// result of it has no value in the global config. +/// For all attribute keys whose `@meta(CONFIG = ConfigAttr { py_name: +/// Some(...), .. })` specifies a kwarg name, return the current +/// associated value in the global config. Keys with no value in the +/// global config are omitted from the result. #[pyfunction] fn get_configuration(py: Python<'_>) -> PyResult> { KEY_BY_NAME