Skip to content

Conversation

@tobz
Copy link
Member

@tobz tobz commented Feb 9, 2026

Summary

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

How did you test this PR?

References

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds generalized runtime state management primitives (resource registry + dataspace pub/sub) to support inter-process coordination keyed by typed handles, and wires in process-local identity tracking.

Changes:

  • Introduces ResourceRegistry for publishing/acquiring typed resources with async waiting and RAII return via guards.
  • Introduces DataspaceRegistry for typed assert/retract pub-sub with per-handle and wildcard subscriptions (plus current-state replay).
  • Adds Handle and process-context tracking (Id::current() via task-local), and exports state + ProcessId.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
lib/saluki-core/src/runtime/state/resources/mod.rs New async-aware resource registry with publish/acquire APIs, snapshots, and tests.
lib/saluki-core/src/runtime/state/mod.rs New state module defining Handle and re-exporting registries.
lib/saluki-core/src/runtime/state/dataspace/mod.rs New dataspace assert/retract + subscription registry with current-state replay and tests.
lib/saluki-core/src/runtime/process.rs Adds task-local “current process id” support and a ROOT process identifier.
lib/saluki-core/src/runtime/mod.rs Exposes state module and re-exports ProcessId.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

use std::{
any::{Any, TypeId},
collections::{HashMap, HashSet, VecDeque},
hash::Hash,
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.

Suggested change
hash::Hash,

Copilot uses AI. Check for mistakes.
use std::{
any::{Any, TypeId},
collections::{HashMap, VecDeque},
hash::Hash,
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.

Suggested change
hash::Hash,

Copilot uses AI. Check for mistakes.

// Not available, register as waiter
let (tx, rx) = oneshot::channel();
state.waiters.entry(key.clone()).or_default().push_back(tx);
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closed/canceled acquires leave behind oneshot::Senders in state.waiters until a publish/return happens for that key. This can (a) make snapshot() report pending waiters that no longer exist and (b) accumulate memory for keys that are never published. Consider pruning closed senders at insertion time (e.g., retain only !sender.is_closed() before pushing), and removing the entry if the queue becomes empty.

Suggested change
state.waiters.entry(key.clone()).or_default().push_back(tx);
// Prune any closed/canceled waiters for this key before adding a new one.
let waiters = state.waiters.entry(key.clone()).or_default();
waiters.retain(|sender| !sender.is_closed());
// After pruning, add the new waiter.
waiters.push_back(tx);

Copilot uses AI. Check for mistakes.
Comment on lines +458 to +462
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await });

// Give the acquire task time to register as waiter
tokio::time::sleep(Duration::from_millis(10)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses sleep for synchronization, which can be flaky under load or on slower CI machines. Prefer an explicit synchronization signal (e.g., a oneshot/Notify/Barrier sent after the waiter is registered) so the test deterministically publishes only after the acquire path has enqueued the waiter.

Suggested change
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await });
// Give the acquire task time to register as waiter
tokio::time::sleep(Duration::from_millis(10)).await;
// Use an explicit synchronization signal so we know the task has started.
let (tx_started, rx_started) = oneshot::channel();
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move {
// Notify that the acquire task has started running.
let _ = tx_started.send(());
registry_clone.acquire::<u32>(h).await
});
// Wait for the acquire task to start before publishing the resource.
rx_started.await.expect("acquire task did not start");

Copilot uses AI. Check for mistakes.
Comment on lines +529 to +538
// Spawn multiple waiters
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;

let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.

Suggested change
// Spawn multiple waiters
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
// Spawn multiple waiters with explicit coordination instead of timing-based sleeps
let registry1 = registry.clone();
let (ready_tx1, ready_rx1) = oneshot::channel::<()>();
let handle1 = tokio::spawn(async move {
// Signal that this waiter task has started
let _ = ready_tx1.send(());
registry1.acquire::<u32>(h).await
});
// Wait until the first waiter task has started
let _ = ready_rx1.await;
let registry2 = registry.clone();
let (ready_tx2, ready_rx2) = oneshot::channel::<()>();
let handle2 = tokio::spawn(async move {
// Signal that this waiter task has started
let _ = ready_tx2.send(());
registry2.acquire::<u32>(h).await
});
// Wait until the second waiter task has started
let _ = ready_rx2.await;

Copilot uses AI. Check for mistakes.
Comment on lines +530 to +538
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;

let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.

Suggested change
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let (ready1_tx, ready1_rx) = oneshot::channel();
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move {
// Signal that the first waiter is about to register and wait
let _ = ready1_tx.send(());
registry1.acquire::<u32>(h).await
});
// Ensure the first waiter has started before spawning the second
let _ = ready1_rx.await;
let (ready2_tx, ready2_rx) = oneshot::channel();
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move {
// Signal that the second waiter is about to register and wait
let _ = ready2_tx.send(());
registry2.acquire::<u32>(h).await
});
// Ensure the second waiter has started before publishing the resource
let _ = ready2_rx.await;

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +45
sync::{Arc, Mutex},
};

use snafu::Snafu;
use tokio::sync::oneshot;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This registry is designed for async coordination but uses std::sync::Mutex, which can block Tokio worker threads under contention. Consider switching to tokio::sync::Mutex (or another async-friendly mutex) for the shared state, since hot paths include acquire()/publish() from async tasks.

Suggested change
sync::{Arc, Mutex},
};
use snafu::Snafu;
use tokio::sync::oneshot;
sync::Arc,
};
use snafu::Snafu;
use tokio::sync::{Mutex, oneshot};

Copilot uses AI. Check for mistakes.
any::{Any, TypeId},
collections::{HashMap, VecDeque},
hash::Hash,
sync::{Arc, Mutex},
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern here: DataspaceRegistry is used from async tasks but relies on std::sync::Mutex, which can block executor threads. Using an async mutex (or minimizing lock contention) would better match the async usage patterns, especially with frequent assert() calls and many subscribers.

Copilot uses AI. Check for mistakes.
/// associated with. If no wildcard broadcast channel exists yet for this type, one is created.
///
/// If values have already been asserted for this type on any handles, the subscription will immediately yield all
/// current values before any future broadcast updates.
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe_all replays current values by iterating HashMap::iter(), so the initial replay order is inherently non-deterministic. Consider documenting that replay ordering is unspecified (or sorting by handle if deterministic ordering is desirable for consumers).

Suggested change
/// current values before any future broadcast updates.
/// current values before any future broadcast updates. The order in which these current values are replayed is
/// unspecified and must not be relied upon.

Copilot uses AI. Check for mistakes.
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from e8b5919 to 52f0a8a Compare February 11, 2026 16:24
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from b284244 to 75a8e2c Compare February 11, 2026 16:24
@pr-commenter
Copy link

pr-commenter bot commented Feb 11, 2026

Binary Size Analysis (Agent Data Plane)

Target: 9fe09ef (baseline) vs 52f0a8a (comparison) diff
Analysis Type: Stripped binaries (debug symbols excluded)
Baseline Size: 26.96 MiB
Comparison Size: 27.11 MiB
Size Change: +159.44 KiB (+0.58%)
Pass/Fail Threshold: +5%
Result: PASSED ✅

Changes by Module

Module File Size Symbols
saluki_core::runtime::supervisor +69.67 KiB 59
core +45.63 KiB 11168
tokio +24.16 KiB 2717
agent_data_plane::internal::initialize_and_launch_runtime -22.07 KiB 2
agent_data_plane::internal::create_internal_supervisor +16.05 KiB 1
saluki_app::memory::MemoryBoundsConfiguration -13.70 KiB 5
agent_data_plane::internal::control_plane -12.32 KiB 26
std -11.11 KiB 281
saluki_core::runtime::process +11.03 KiB 9
[sections] +10.73 KiB 8
agent_data_plane::cli::run +7.85 KiB 76
anyhow +6.94 KiB 1286
saluki_core::runtime::dedicated +5.63 KiB 4
agent_data_plane::internal::observability +5.60 KiB 16
saluki_core::topology::running +5.50 KiB 31
saluki_app::metrics::collect_runtime_metrics -4.73 KiB 1
saluki_core::runtime::restart +3.50 KiB 7
[Unmapped] -3.47 KiB 1
saluki_core::runtime::shutdown +2.27 KiB 4
tracing_core +2.24 KiB 521

Detailed Symbol Changes

    FILE SIZE        VM SIZE    
 --------------  -------------- 
  [NEW] +1.79Mi  [NEW] +1.79Mi    std::thread::local::LocalKey<T>::with::h938f6ddf7aedbbb8
  +1.1%  +179Ki  +1.1%  +148Ki    [29785 Others]
  [NEW]  +113Ki  [NEW]  +113Ki    agent_data_plane::cli::run::create_topology::_{{closure}}::hfb0b645d0bb8c8cf
  [NEW] +68.2Ki  [NEW] +68.1Ki    h2::hpack::decoder::Decoder::try_decode_string::hd1c9a39c48e78a6e
  [NEW] +63.7Ki  [NEW] +63.6Ki    agent_data_plane::cli::run::handle_run_command::_{{closure}}::hb1b79147c6e6e2ef
  [NEW] +63.7Ki  [NEW] +63.6Ki    saluki_components::common::datadog::io::run_endpoint_io_loop::_{{closure}}::hd35493454b2aefa0
  [NEW] +62.3Ki  [NEW] +62.2Ki    agent_data_plane::main::_{{closure}}::hd35953e90113f5c5
  [NEW] +59.2Ki  [NEW] +58.9Ki    _<agent_data_plane::internal::control_plane::PrivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::h29c03b31f615628f
  [NEW] +48.8Ki  [NEW] +48.7Ki    saluki_app::bootstrap::AppBootstrapper::bootstrap::_{{closure}}::hc8e634acdf97ad33
  [NEW] +47.7Ki  [NEW] +47.6Ki    moka::sync::base_cache::Inner<K,V,S>::do_run_pending_tasks::h0ef6755dbc77ea2a
  [NEW] +46.1Ki  [NEW] +46.0Ki    h2::proto::connection::Connection<T,P,B>::poll::h1671860da0918c66
  [DEL] -46.1Ki  [DEL] -46.0Ki    h2::proto::connection::Connection<T,P,B>::poll::h2aedcbe1089b311c
  [DEL] -47.7Ki  [DEL] -47.6Ki    moka::sync::base_cache::Inner<K,V,S>::do_run_pending_tasks::ha97a2f55834f17a3
  [DEL] -48.8Ki  [DEL] -48.7Ki    saluki_app::bootstrap::AppBootstrapper::bootstrap::_{{closure}}::hbb5d0d8e944d3a21
  [DEL] -57.8Ki  [DEL] -57.7Ki    agent_data_plane::cli::run::handle_run_command::_{{closure}}::hd8d13580d16cc8a3
  [DEL] -62.3Ki  [DEL] -62.2Ki    agent_data_plane::main::_{{closure}}::h1c7c640002ce8ad1
  [DEL] -63.8Ki  [DEL] -63.6Ki    saluki_components::common::datadog::io::run_endpoint_io_loop::_{{closure}}::hc194831d658db8cc
  [DEL] -68.2Ki  [DEL] -68.1Ki    h2::hpack::decoder::Decoder::try_decode_string::hbfc0bb5a77e7669f
  [DEL] -84.5Ki  [DEL] -84.4Ki    agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::heace61ab0f2bde0c
  [DEL]  -113Ki  [DEL]  -113Ki    agent_data_plane::cli::run::create_topology::_{{closure}}::h6a8b41fa76a89e2c
  [DEL] -1.79Mi  [DEL] -1.79Mi    std::thread::local::LocalKey<T>::with::h0d8770940d38805b
  +0.6%  +159Ki  +0.5%  +128Ki    TOTAL

@pr-commenter
Copy link

pr-commenter bot commented Feb 11, 2026

Regression Detector (Agent Data Plane)

Regression Detector Results

Run ID: 8e34ee5c-3168-4c95-8f8a-61c111709511

Baseline: 9fe09ef
Comparison: 52f0a8a
Diff

❌ Experiments with retried target crashes

This is a critical error. One or more replicates failed with a non-zero exit code. These replicates may have been retried. See Replicate Execution Details for more information.

  • quality_gates_rss_dsd_ultraheavy
  • quality_gates_rss_dsd_low

Optimization Goals: ✅ No significant changes detected

Experiments ignored for regressions

Regressions in experiments with settings containing erratic: true are ignored.

perf experiment goal Δ mean % Δ mean % CI trials links
otlp_ingest_logs_5mb_cpu % cpu utilization +3.31 [-1.79, +8.40] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization +3.07 [+2.44, +3.70] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_throughput ingress throughput +0.01 [-0.11, +0.14] 1 (metrics) (profiles) (logs)

Fine details of change detection per experiment

perf experiment goal Δ mean % Δ mean % CI trials links
otlp_ingest_logs_5mb_cpu % cpu utilization +3.31 [-1.79, +8.40] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization +3.07 [+2.44, +3.70] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_cpu % cpu utilization +3.01 [-27.51, +33.53] 1 (metrics) (profiles) (logs)
quality_gates_rss_idle memory utilization +2.03 [+1.97, +2.10] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory utilization +1.70 [+1.54, +1.86] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_memory memory utilization +1.40 [+1.21, +1.60] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_memory memory utilization +1.29 [+1.10, +1.48] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_cpu % cpu utilization +1.25 [-54.31, +56.82] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_memory memory utilization +1.04 [+0.85, +1.22] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_memory memory utilization +0.81 [+0.62, +1.00] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_cpu % cpu utilization +0.71 [-2.02, +3.43] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_memory memory utilization +0.62 [+0.43, +0.82] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory utilization +0.61 [+0.42, +0.80] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_memory memory utilization +0.41 [+0.18, +0.64] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_memory memory utilization +0.40 [+0.16, +0.64] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_heavy memory utilization +0.35 [+0.23, +0.47] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory utilization +0.05 [-0.08, +0.18] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_throughput ingress throughput +0.02 [-0.12, +0.15] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_throughput ingress throughput +0.01 [-0.11, +0.14] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_throughput ingress throughput +0.00 [-0.05, +0.06] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_throughput ingress throughput +0.00 [-0.08, +0.09] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_throughput ingress throughput +0.00 [-0.06, +0.06] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_throughput ingress throughput -0.01 [-0.06, +0.05] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_throughput ingress throughput -0.01 [-0.16, +0.13] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_throughput ingress throughput -0.11 [-0.24, +0.02] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_cpu % cpu utilization -0.12 [-1.50, +1.26] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_cpu % cpu utilization -0.34 [-5.98, +5.30] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_cpu % cpu utilization -3.69 [-9.25, +1.87] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_cpu % cpu utilization -9.23 [-59.54, +41.08] 1 (metrics) (profiles) (logs)

Bounds Checks: ✅ Passed

perf experiment bounds_check_name replicates_passed links
quality_gates_rss_dsd_heavy memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_idle memory_usage 10/10 (metrics) (profiles) (logs)

Explanation

Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%

Performance changes are noted in the perf column of each table:

  • ✅ = significantly better comparison variant performance
  • ❌ = significantly worse comparison variant performance
  • ➖ = no significant change in performance

A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".

For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:

  1. Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.

  2. Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.

  3. Its configuration does not mark it "erratic".

Replicate Execution Details

We run multiple replicates for each experiment/variant. However, we allow replicates to be automatically retried if there are any failures, up to 8 times, at which point the replicate is marked dead and we are unable to run analysis for the entire experiment. We call each of these attempts at running replicates a replicate execution. This section lists all replicate executions that failed due to the target crashing or being oom killed.

Note: In the below tables we bucket failures by experiment, variant, and failure type. For each of these buckets we list out the replicate indexes that failed with an annotation signifying how many times said replicate failed with the given failure mode. In the below example the baseline variant of the experiment named experiment_with_failures had two replicates that failed by oom kills. Replicate 0, which failed 8 executions, and replicate 1 which failed 6 executions, all with the same failure mode.

Experiment Variant Replicates Failure Logs Debug Dashboard
experiment_with_failures baseline 0 (x8) 1 (x6) Oom killed Debug Dashboard

The debug dashboard links will take you to a debugging dashboard specifically designed to investigate replicate execution failures.

❌ Retried Normal Replicate Execution Failures (non-profiling)

Experiment Variant Replicates Failure Debug Dashboard
quality_gates_rss_dsd_low comparison 9 Failed to shutdown when requested Debug Dashboard
quality_gates_rss_dsd_ultraheavy baseline 3 Failed to shutdown when requested Debug Dashboard
quality_gates_rss_dsd_ultraheavy comparison 5 Failed to shutdown when requested Debug Dashboard

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/core Core functionality, event model, etc.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant