Skip to content

Commit 91c6cae

Browse files
authored
feat(tools,orchestration): utility-guided tool dispatch and cascade-aware DAG routing (#2424, #2425) (#2470)
Utility-guided tool dispatch (zeph-tools, zeph-core): - Add UtilityScorer in zeph-tools/src/utility.rs scoring tool calls on estimated gain, step cost, redundancy, and uncertainty - Insert utility gate in tool_execution/native.rs: fail-closed on scoring errors, LLM-requested tools always go through the gate - Config: [tools.utility] enabled, threshold, and per-signal weights with startup validation rejecting negative and non-finite values - 21 unit tests covering edge cases, fail-closed behavior, and turn reset Cascade-aware DAG routing (zeph-orchestration): - Add CascadeDetector in zeph-orchestration/src/cascade.rs tracking per-region failure rates with bounded region_health map - Extend TopologyClassifier with TreeOptimized and CascadeAware dispatch strategy variants; merge strategy() and strategy_with_config() into single method - Integrate cascade reordering in DagScheduler::tick(); Sequential tasks skipped from reordering; region health reset on inject_tasks() - Config: [orchestration] cascade_routing, cascade_failure_threshold, tree_optimized_dispatch; cascade_routing requires topology_selection=true - 14 unit tests covering inject_tasks reset, sequential exclusion, and config guard Closes #2424, #2425
1 parent 00b8644 commit 91c6cae

File tree

14 files changed

+1553
-13
lines changed

14 files changed

+1553
-13
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
1313
- feat(memory): D-MEM RPE-based tiered graph extraction routing — `RpeRouter` computes heuristic surprise score from context similarity and entity novelty; low-RPE turns skip the MAGMA LLM extraction pipeline; `consecutive_skips` safety valve forces extraction after `max_skip_turns` consecutive skips; `extract_candidate_entities()` helper for cheap regex+keyword entity detection; controlled by `[memory.graph.rpe] enabled = false, threshold = 0.3, max_skip_turns = 5` (closes #2442)
1414
- feat(llm): BaRP cost-weight dial in bandit router — `cost_weight` now penalises UCB arm scores during provider selection in addition to the existing reward-signal penalty; higher values bias the bandit toward cheaper providers at inference time; static cost tier heuristics based on provider name and model identifier; `cost_weight` is clamped to [0.0, 1.0] at bootstrap (#2415)
1515
- feat(llm): MAR (Memory-Augmented Routing) — new `[llm.routing.bandit] memory_confidence_threshold` (default 0.9); when the top-1 semantic recall score for the current query meets or exceeds the threshold the bandit biases toward fast/cheap providers; signal propagated from `SemanticMemory::recall` through `ContextSlot::SemanticRecall` to `RouterProvider`; no routing change when `cost_weight = 0.0` (operator intent respected) (#2443)
16+
- feat(tools): utility-guided tool dispatch gate — `UtilityScorer` scores each candidate tool call before execution using heuristic components (estimated gain, token cost, redundancy, exploration bonus); calls below `[tools.utility] threshold` are skipped with fail-closed semantics on scoring errors; user-requested tools bypass the gate unconditionally; disabled by default (`[tools.utility] enabled = false`) (closes #2424)
17+
- feat(orchestration): cascade-aware DAG routing — `CascadeDetector` tracks failure rates per root-anchored region; when a region's failure rate exceeds `[orchestration] cascade_failure_threshold`, tasks in that region are deprioritized in the ready queue so healthy branches run first; reset on `inject_tasks()`; disabled by default (closes #2425)
18+
- feat(orchestration): tree-optimized dispatch — `DispatchStrategy::TreeOptimized` sorts the ready queue by critical-path distance (deepest tasks first) for `FanOut`/`FanIn` topologies when `[orchestration] tree_optimized_dispatch = true`; disabled by default
19+
- feat(orchestration): `DispatchStrategy::CascadeAware` for `Mixed` topology when `cascade_routing = true`; requires `topology_selection = true` (startup warning emitted otherwise)
1620
- feat(acp): expose current model in `session/list` and emit `SessionInfoUpdate` on model change — each in-memory `SessionInfo` now carries `meta.currentModel`; after `session/set_config_option` with `configId=model` a `SessionInfoUpdate` notification with `meta.currentModel` is sent in addition to the existing `ConfigOptionUpdate`; same notification is sent after `session/set_session_model` (closes #2435)
1721
- feat(tools): adversarial policy agent — LLM-based pre-execution tool call validation against plain-language policies; configurable fail-closed/fail-open behavior (`fail_open = false` default); prompt injection hardening via code-fence param quoting; strict allow/deny response parsing; full `ToolExecutor` trait delegation; audit log `adversarial_policy_decision` field; executor chain order `PolicyGateExecutor → AdversarialPolicyGateExecutor → TrustGateExecutor`; gated on `policy-enforcer` feature; config `[tools.adversarial_policy]` (closes #2447)
1822
- feat(memory): Memex tool output archive — before compaction, `ToolOutput` bodies in the compaction range are saved to `tool_overflow` with `archive_type = 'archive'`; archived UUIDs are appended as a postfix after LLM summarization so references survive compaction; controlled by `[memory.compression] archive_tool_outputs = false`; archives are excluded from the short-lived cleanup job via `archive_type` column (migration 054, closes #2432)

crates/zeph-config/src/experiment.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ fn default_completeness_threshold() -> f32 {
5555
0.7
5656
}
5757

58+
fn default_cascade_failure_threshold() -> f32 {
59+
0.5
60+
}
61+
5862
fn default_plan_cache_similarity_threshold() -> f32 {
5963
0.90
6064
}
@@ -199,6 +203,20 @@ pub struct OrchestrationConfig {
199203
/// Values outside [0.0, 1.0] are rejected at startup by `Config::validate()`.
200204
#[serde(default = "default_completeness_threshold")]
201205
pub completeness_threshold: f32,
206+
/// Enable cascade-aware routing for Mixed-topology DAGs. Requires `topology_selection = true`.
207+
/// When enabled, tasks in failing subtrees are deprioritized in favour of healthy branches.
208+
/// Default: false (opt-in).
209+
#[serde(default)]
210+
pub cascade_routing: bool,
211+
/// Failure rate threshold (0.0–1.0) above which a DAG region is considered "cascading".
212+
/// Must be in (0.0, 1.0]. Default: 0.5.
213+
#[serde(default = "default_cascade_failure_threshold")]
214+
pub cascade_failure_threshold: f32,
215+
/// Enable tree-optimized dispatch for FanOut/FanIn topologies.
216+
/// Sorts the ready queue by critical-path distance (deepest tasks first) to minimize
217+
/// end-to-end latency. Default: false (opt-in).
218+
#[serde(default)]
219+
pub tree_optimized_dispatch: bool,
202220
}
203221

204222
impl Default for OrchestrationConfig {
@@ -224,6 +242,9 @@ impl Default for OrchestrationConfig {
224242
verify_completeness: false,
225243
completeness_threshold: default_completeness_threshold(),
226244
tool_provider: String::new(),
245+
cascade_routing: false,
246+
cascade_failure_threshold: default_cascade_failure_threshold(),
247+
tree_optimized_dispatch: false,
227248
}
228249
}
229250
}

crates/zeph-core/src/agent/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,7 @@ impl<C: Channel> Agent<C> {
13251325
graph_config,
13261326
anomaly_config,
13271327
result_cache_config,
1328+
utility_config,
13281329
orchestration_config,
13291330
// Not applied here: caller clones this before `apply_session_config` and applies
13301331
// it per-session (e.g. `spawn_acp_agent` passes it to `with_debug_config`).
@@ -1381,6 +1382,7 @@ impl<C: Channel> Agent<C> {
13811382
self.runtime.semantic_cache_threshold = semantic_cache_threshold;
13821383
self.runtime.semantic_cache_max_candidates = semantic_cache_max_candidates;
13831384
self = self.with_result_cache_config(&result_cache_config);
1385+
self.tool_orchestrator.set_utility_config(utility_config);
13841386

13851387
self
13861388
}

crates/zeph-core/src/agent/session_config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub struct AgentSessionConfig {
8686
pub graph_config: GraphConfig,
8787
pub anomaly_config: zeph_tools::AnomalyConfig,
8888
pub result_cache_config: zeph_tools::ResultCacheConfig,
89+
pub utility_config: zeph_tools::UtilityScoringConfig,
8990
pub orchestration_config: OrchestrationConfig,
9091
pub debug_config: DebugConfig,
9192
pub server_compaction: bool,
@@ -138,6 +139,7 @@ impl AgentSessionConfig {
138139
graph_config: config.memory.graph.clone(),
139140
anomaly_config: config.tools.anomaly.clone(),
140141
result_cache_config: config.tools.result_cache.clone(),
142+
utility_config: config.tools.utility.clone(),
141143
orchestration_config: config.orchestration.clone(),
142144
debug_config: config.debug.clone(),
143145
server_compaction: config.llm.providers.iter().any(|e| e.server_compaction),

crates/zeph-core/src/agent/tool_execution/native.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl<C: Channel> Agent<C> {
7676
) -> Result<(), super::super::error::AgentError> {
7777
self.tool_orchestrator.clear_doom_history();
7878
self.tool_orchestrator.clear_recent_tool_calls();
79+
self.tool_orchestrator.clear_utility_state();
7980

8081
// `mut` required when context-compression is enabled to inject focus tool definitions.
8182
#[cfg_attr(not(feature = "context-compression"), allow(unused_mut))]
@@ -791,6 +792,57 @@ impl<C: Channel> Agent<C> {
791792
}
792793
}
793794

795+
// Utility gate: score each call before dispatch. Calls below the threshold are skipped.
796+
// Fail-closed on scoring errors (None when scoring produces invalid result).
797+
// user_requested is only true for explicit /tool slash commands — never set from
798+
// LLM-requested calls to prevent prompt-injection bypass (C2 fix).
799+
let utility_blocked: Vec<bool> = {
800+
#[allow(clippy::cast_possible_truncation)]
801+
let tokens_consumed =
802+
usize::try_from(self.providers.cached_prompt_tokens).unwrap_or(usize::MAX);
803+
// token_budget = 0 signals "unknown" to UtilityContext — cost component is zeroed.
804+
let token_budget: usize = 0;
805+
let tool_calls_this_turn = self.tool_orchestrator.recent_tool_calls.len();
806+
calls
807+
.iter()
808+
.enumerate()
809+
.map(|(idx, call)| {
810+
if pre_exec_blocked[idx] {
811+
return false; // already blocked, no need to score
812+
}
813+
let ctx = zeph_tools::UtilityContext {
814+
tool_calls_this_turn: tool_calls_this_turn + idx,
815+
tokens_consumed,
816+
token_budget,
817+
// Never set from LLM call content to prevent prompt-injection bypass.
818+
user_requested: false,
819+
};
820+
let score = self.tool_orchestrator.utility_scorer.score(call, &ctx);
821+
tracing::debug!(
822+
tool = %call.tool_id,
823+
score = ?score.as_ref().map(|s| s.total),
824+
threshold = self.tool_orchestrator.utility_scorer.threshold(),
825+
"utility gate: scored tool call"
826+
);
827+
let execute = self
828+
.tool_orchestrator
829+
.utility_scorer
830+
.should_execute(score.as_ref(), false);
831+
if !execute {
832+
tracing::warn!(
833+
tool = %call.tool_id,
834+
score = ?score.as_ref().map(|s| s.total),
835+
threshold = self.tool_orchestrator.utility_scorer.threshold(),
836+
"utility gate: skipping low-utility tool call"
837+
);
838+
}
839+
// Record call regardless so subsequent calls in this batch see it as prior.
840+
self.tool_orchestrator.utility_scorer.record_call(call);
841+
!execute
842+
})
843+
.collect()
844+
};
845+
794846
// Repeat-detection (CRIT-3): record LLM-initiated calls BEFORE execution.
795847
// Retry re-executions must NOT be pushed here — they are handled inside the retry loop.
796848
// Build args hashes and check for repeats. Blocked calls get a pre-built error result.
@@ -1041,6 +1093,30 @@ impl<C: Channel> Agent<C> {
10411093
continue;
10421094
}
10431095

1096+
if utility_blocked[idx] {
1097+
let threshold = self.tool_orchestrator.utility_scorer.threshold();
1098+
let msg = format!(
1099+
"[skipped] Tool call to {} was skipped by the utility gate \
1100+
(score below threshold {threshold:.2}). \
1101+
Try a different approach or disable the utility gate in config.",
1102+
tc.name
1103+
);
1104+
let out = zeph_tools::ToolOutput {
1105+
tool_name: tc.name.clone(),
1106+
summary: msg,
1107+
blocks_executed: 0,
1108+
filter_stats: None,
1109+
diff: None,
1110+
streamed: false,
1111+
terminal_id: None,
1112+
locations: None,
1113+
raw_response: None,
1114+
claim_source: None,
1115+
};
1116+
tier_futs.push((idx, Box::pin(std::future::ready(Ok(Some(out))))));
1117+
continue;
1118+
}
1119+
10441120
if repeat_blocked[idx] {
10451121
let msg = format!(
10461122
"[error] Repeated identical call to {} detected. \
@@ -2326,4 +2402,92 @@ mod tests {
23262402
"tool must not enforce min_messages_per_focus: {result}"
23272403
);
23282404
}
2405+
2406+
// --- utility gate integration ---
2407+
2408+
#[test]
2409+
fn utility_gate_disabled_by_default_scorer_is_not_enabled() {
2410+
// The default ToolOrchestrator has scoring disabled — no calls are gated.
2411+
let agent = make_agent();
2412+
assert!(
2413+
!agent.tool_orchestrator.utility_scorer.is_enabled(),
2414+
"utility scorer must be disabled by default"
2415+
);
2416+
}
2417+
2418+
#[test]
2419+
fn set_utility_config_enables_scorer_on_agent() {
2420+
// set_utility_config wires the scorer into the tool orchestrator (integration path).
2421+
let mut agent = make_agent();
2422+
agent
2423+
.tool_orchestrator
2424+
.set_utility_config(zeph_tools::UtilityScoringConfig {
2425+
enabled: true,
2426+
threshold: 0.5,
2427+
..zeph_tools::UtilityScoringConfig::default()
2428+
});
2429+
assert!(
2430+
agent.tool_orchestrator.utility_scorer.is_enabled(),
2431+
"scorer must be enabled after set_utility_config"
2432+
);
2433+
assert!(
2434+
(agent.tool_orchestrator.utility_scorer.threshold() - 0.5).abs() < f32::EPSILON,
2435+
"threshold must match config"
2436+
);
2437+
}
2438+
2439+
#[test]
2440+
fn clear_utility_state_resets_per_turn_redundancy_tracking() {
2441+
// Verify that clear_utility_state() clears the redundancy state so the
2442+
// next turn treats all calls as fresh (no stale redundancy carry-over).
2443+
use zeph_tools::{ToolCall, UtilityContext};
2444+
2445+
let mut agent = make_agent();
2446+
agent
2447+
.tool_orchestrator
2448+
.set_utility_config(zeph_tools::UtilityScoringConfig {
2449+
enabled: true,
2450+
threshold: 0.0,
2451+
..zeph_tools::UtilityScoringConfig::default()
2452+
});
2453+
2454+
let call = ToolCall {
2455+
tool_id: "bash".to_owned(),
2456+
params: serde_json::Map::new(),
2457+
};
2458+
let ctx = UtilityContext {
2459+
tool_calls_this_turn: 0,
2460+
tokens_consumed: 0,
2461+
token_budget: 1000,
2462+
user_requested: false,
2463+
};
2464+
2465+
// Record the call to create redundancy state.
2466+
agent.tool_orchestrator.utility_scorer.record_call(&call);
2467+
2468+
// Before clear: redundancy is 1.0.
2469+
let score_before = agent
2470+
.tool_orchestrator
2471+
.utility_scorer
2472+
.score(&call, &ctx)
2473+
.unwrap();
2474+
assert!(
2475+
(score_before.redundancy - 1.0).abs() < f32::EPSILON,
2476+
"redundancy must be 1.0 before clear"
2477+
);
2478+
2479+
// clear_utility_state simulates turn start.
2480+
agent.tool_orchestrator.clear_utility_state();
2481+
2482+
// After clear: redundancy is 0.0.
2483+
let score_after = agent
2484+
.tool_orchestrator
2485+
.utility_scorer
2486+
.score(&call, &ctx)
2487+
.unwrap();
2488+
assert!(
2489+
score_after.redundancy.abs() < f32::EPSILON,
2490+
"redundancy must be 0.0 after clear_utility_state"
2491+
);
2492+
}
23292493
}

crates/zeph-core/src/agent/tool_execution/tests.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4820,3 +4820,111 @@ async fn sanitize_tool_output_non_acp_session_normal_path() {
48204820
"non-ACP session must NOT emit CrossBoundaryMcpToAcp"
48214821
);
48224822
}
4823+
4824+
// --- utility gate integration tests ---
4825+
4826+
#[tokio::test]
4827+
async fn utility_gate_blocks_call_and_produces_skipped_output() {
4828+
// When threshold = 1.0, no realistic tool call can pass the gate.
4829+
// handle_native_tool_calls must produce a ToolResult with "[skipped]" content.
4830+
use super::super::agent_tests::{
4831+
MockChannel, MockToolExecutor, create_test_registry, mock_provider,
4832+
};
4833+
use zeph_llm::provider::{Message, MessagePart, Role, ToolUseRequest};
4834+
4835+
let provider = mock_provider(vec![]);
4836+
let channel = MockChannel::new(vec![]);
4837+
let registry = create_test_registry();
4838+
let executor = MockToolExecutor::no_tools();
4839+
let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor);
4840+
4841+
// Push a system prompt so the assistant message has a valid preceding context.
4842+
agent
4843+
.msg
4844+
.messages
4845+
.push(Message::from_legacy(Role::System, "system"));
4846+
4847+
// Enable utility gate with threshold = 1.0 (blocks every call).
4848+
agent
4849+
.tool_orchestrator
4850+
.set_utility_config(zeph_tools::UtilityScoringConfig {
4851+
enabled: true,
4852+
threshold: 1.0,
4853+
..zeph_tools::UtilityScoringConfig::default()
4854+
});
4855+
4856+
let tool_calls = vec![ToolUseRequest {
4857+
id: "call-1".to_owned(),
4858+
name: "bash".to_owned(),
4859+
input: serde_json::json!({"command": "ls"}),
4860+
}];
4861+
4862+
agent
4863+
.handle_native_tool_calls(None, &tool_calls)
4864+
.await
4865+
.unwrap();
4866+
4867+
// Find the ToolResult message injected by the utility gate.
4868+
let skipped = agent.msg.messages.iter().any(|m| {
4869+
m.parts.iter().any(|p| {
4870+
if let MessagePart::ToolResult { content, .. } = p {
4871+
content.contains("[skipped]")
4872+
} else {
4873+
false
4874+
}
4875+
})
4876+
});
4877+
assert!(
4878+
skipped,
4879+
"utility gate must produce [skipped] ToolResult when score < threshold"
4880+
);
4881+
}
4882+
4883+
#[tokio::test]
4884+
async fn utility_gate_disabled_does_not_produce_skipped_output() {
4885+
// Default config has scoring disabled — calls must not produce [skipped] ToolResult.
4886+
use super::super::agent_tests::{
4887+
MockChannel, MockToolExecutor, create_test_registry, mock_provider,
4888+
};
4889+
use zeph_llm::provider::{Message, MessagePart, Role, ToolUseRequest};
4890+
4891+
let provider = mock_provider(vec![]);
4892+
let channel = MockChannel::new(vec![]);
4893+
let registry = create_test_registry();
4894+
let executor = MockToolExecutor::no_tools();
4895+
let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor);
4896+
4897+
agent
4898+
.msg
4899+
.messages
4900+
.push(Message::from_legacy(Role::System, "system"));
4901+
4902+
// Utility scorer is disabled by default (enabled = false).
4903+
assert!(!agent.tool_orchestrator.utility_scorer.is_enabled());
4904+
4905+
let tool_calls = vec![ToolUseRequest {
4906+
id: "call-2".to_owned(),
4907+
name: "bash".to_owned(),
4908+
input: serde_json::json!({"command": "ls"}),
4909+
}];
4910+
4911+
agent
4912+
.handle_native_tool_calls(None, &tool_calls)
4913+
.await
4914+
.unwrap();
4915+
4916+
// No ToolResult must contain [skipped] — gate is disabled.
4917+
let has_skipped = agent.msg.messages.iter().any(|m| {
4918+
m.parts.iter().any(|p| {
4919+
if let MessagePart::ToolResult { content, .. } = p {
4920+
content.contains("[skipped]")
4921+
} else {
4922+
false
4923+
}
4924+
})
4925+
});
4926+
assert!(
4927+
!has_skipped,
4928+
"disabled utility gate must not produce [skipped] ToolResult"
4929+
);
4930+
}

0 commit comments

Comments
 (0)