diff --git a/CHANGELOG.md b/CHANGELOG.md index b79ea0d9..e44eb64f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - fix(classifiers): add configurable `pii_ner_allowlist` to `ClassifiersConfig` — tokens matching an allowlist entry (case-insensitive) are never redacted by the piiranha NER model, suppressing false positives such as "Zeph" → `[PII:CITY]`; default entries: `["Zeph", "Rust", "OpenAI", "Ollama", "Claude"]`; list is empty-able via config to disable the feature (closes #2537) - fix(classifiers): document that macOS Apple Silicon requires `--features full,metal` for piiranha NER GPU acceleration; without `metal`, the 1.1 GB model exceeds the 30s timeout on CPU and falls back to regex-only PII detection (closes #2538) - +- fix(mcp): elicitation deadlock in `run_inline_tool_loop` (phase 3, closes #2542) — `run_inline_tool_loop` now wraps each `execute_tool_call_erased` call in `tokio::select!` that concurrently drains `elicitation_rx`; `handle_elicitation_event` changed to `pub(super)` for cross-module access; regression test added with a blocking executor that simulates the real MCP deadlock scenario - fix(tools): propagate `claim_source` from `ToolOutput` into the post-execution audit entry in `AdversarialPolicyGateExecutor`; `write_audit` now accepts an explicit `claim_source` parameter so the field is no longer hardcoded to `None` for successful executions (closes #2535) - fix(tools): `extract_paths` now detects relative path tokens that contain `/` but do not start with `/` or `./` (e.g. `src/main.rs`, `.local/foo/bar`); URL schemes (`://`) and shell variable assignments (`KEY=value`) are excluded from matching (closes #2536) diff --git a/crates/zeph-core/src/agent/mod.rs b/crates/zeph-core/src/agent/mod.rs index 3b7f74f2..8762973f 100644 --- a/crates/zeph-core/src/agent/mod.rs +++ b/crates/zeph-core/src/agent/mod.rs @@ -1683,7 +1683,7 @@ impl Agent { /// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration /// sub-tasks that run synchronously inside the scheduler tick loop. async fn run_inline_tool_loop( - &self, + &mut self, prompt: &str, max_iterations: usize, ) -> Result { @@ -1749,6 +1749,9 @@ impl Agent { messages.push(Message::from_parts(Role::Assistant, parts)); // Execute each tool call and collect results. + // Drain elicitation_rx concurrently to avoid deadlock: MCP tool may + // send an elicitation request and block waiting for a response while + // execute_tool_call_erased is blocked waiting for the tool to finish. let mut result_parts: Vec = Vec::new(); for tc in &tool_calls { let call = ToolCall { @@ -1758,11 +1761,24 @@ impl Agent { _ => serde_json::Map::new(), }, }; - let output = match self.tool_executor.execute_tool_call_erased(&call).await - { - Ok(Some(out)) => out.summary, - Ok(None) => "(no output)".to_owned(), - Err(e) => format!("[error] {e}"), + let output = loop { + tokio::select! { + result = self.tool_executor.execute_tool_call_erased(&call) => { + break match result { + Ok(Some(out)) => out.summary, + Ok(None) => "(no output)".to_owned(), + Err(e) => format!("[error] {e}"), + }; + } + Some(event) = async { + match self.mcp.elicitation_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + self.handle_elicitation_event(event).await; + } + } }; let is_error = output.starts_with("[error]"); result_parts.push(MessagePart::ToolResult { diff --git a/crates/zeph-core/src/agent/tests.rs b/crates/zeph-core/src/agent/tests.rs index c221dceb..6e3c98ae 100644 --- a/crates/zeph-core/src/agent/tests.rs +++ b/crates/zeph-core/src/agent/tests.rs @@ -3748,7 +3748,7 @@ mod inline_tool_loop_tests { let registry = create_test_registry(); let executor = CallableToolExecutor::new(vec![]); - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent.run_inline_tool_loop("what is 2+2?", 10).await; assert_eq!(result.unwrap(), "the answer"); @@ -3765,7 +3765,7 @@ mod inline_tool_loop_tests { let registry = create_test_registry(); let executor = CallableToolExecutor::fixed_output("tool result"); - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent.run_inline_tool_loop("run a tool", 10).await; assert_eq!(result.unwrap(), "done"); @@ -3785,7 +3785,7 @@ mod inline_tool_loop_tests { let executor = CallableToolExecutor::fixed_output("ok"); let max_iter = 5usize; - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent.run_inline_tool_loop("loop forever", max_iter).await; // Must return Ok (not panic or hang) and have called the provider exactly max_iter times. @@ -3807,7 +3807,7 @@ mod inline_tool_loop_tests { let registry = create_test_registry(); let executor = CallableToolExecutor::failing(); - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent.run_inline_tool_loop("trigger error", 10).await; assert_eq!(result.unwrap(), "recovered"); @@ -3852,7 +3852,7 @@ mod inline_tool_loop_tests { })), ]); - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent .run_inline_tool_loop("two tools then answer", 10) .await; @@ -3869,11 +3869,122 @@ mod inline_tool_loop_tests { let registry = create_test_registry(); let executor = CallableToolExecutor::new(vec![]); - let agent = Agent::new(provider, channel, registry, None, 5, executor); + let mut agent = Agent::new(provider, channel, registry, None, 5, executor); let result = agent.run_inline_tool_loop("this will fail", 10).await; assert!(result.is_err()); } + + // Regression test for issue #2542: elicitation deadlock in run_inline_tool_loop. + // + // The real deadlock scenario: MCP tool sends an elicitation event and then blocks + // waiting for the agent to respond via response_tx. Meanwhile execute_tool_call_erased + // also blocks waiting for the MCP tool — neither side makes progress. + // + // The fix: select! concurrently drains elicitation_rx while awaiting the tool result. + // + // Test design: BlockingElicitingExecutor sends an elicitation event then blocks on + // `unblock_rx` (a oneshot whose sender is never signalled — it stays pending until + // the future is cancelled). When select! picks the elicitation branch it cancels the + // tool future, dropping `unblock_rx`. On the next invocation `unblock_rx` is None so + // the executor returns immediately. This guarantees select! MUST pick the elicitation + // branch on the first iteration (tool is the only blocking party). If the fix were + // absent, the test would deadlock and time out. + #[tokio::test] + async fn elicitation_event_during_tool_execution_is_handled() { + use std::sync::Arc; + use std::time::Duration; + use tokio::sync::{mpsc, oneshot}; + use zeph_mcp::ElicitationEvent; + + struct BlockingElicitingExecutor { + elic_tx: mpsc::Sender, + // Holds the oneshot rx that the executor awaits on the first call. + // Dropped (None) on re-invocation after select! cancels the first future. + unblock_rx: Arc>>>, + sent: Arc, + } + + impl ToolExecutor for BlockingElicitingExecutor { + async fn execute(&self, _response: &str) -> Result, ToolError> { + Ok(None) + } + + async fn execute_tool_call( + &self, + _call: &ToolCall, + ) -> Result, ToolError> { + if !self.sent.swap(true, std::sync::atomic::Ordering::SeqCst) { + let (response_tx, _response_rx) = oneshot::channel(); + let event = ElicitationEvent { + server_id: "test-server".to_owned(), + request: + rmcp::model::CreateElicitationRequestParams::FormElicitationParams { + meta: None, + message: "please fill in".to_owned(), + requested_schema: rmcp::model::ElicitationSchema::new( + std::collections::BTreeMap::new(), + ), + }, + response_tx, + }; + let _ = self.elic_tx.send(event).await; + // Block until select! cancels this future (simulates the MCP server + // waiting for a response). Cancellation drops unblock_rx, causing + // this await to resolve with Err — but the future is already dropped + // by then. On re-invocation unblock_rx is None, so we skip blocking. + let rx = self.unblock_rx.lock().unwrap().take(); + if let Some(rx) = rx { + let _ = rx.await; + } + } + Ok(Some(ToolOutput { + tool_name: "elicit_tool".into(), + summary: "result".into(), + blocks_executed: 1, + filter_stats: None, + diff: None, + streamed: false, + terminal_id: None, + locations: None, + raw_response: None, + claim_source: None, + })) + } + } + + let (elic_tx, elic_rx) = mpsc::channel::(4); + // Keep _unblock_tx alive for the duration of the test so that unblock_rx.await + // truly blocks (channel not closed) until the future holding it is cancelled. + let (_unblock_tx, unblock_rx) = oneshot::channel::<()>(); + + let (mock, _counter) = MockProvider::default().with_tool_use(vec![ + tool_use_response("call-elic", "elicit_tool"), + ChatResponse::Text("done".into()), + ]); + let provider = AnyProvider::Mock(mock); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = BlockingElicitingExecutor { + elic_tx, + unblock_rx: Arc::new(std::sync::Mutex::new(Some(unblock_rx))), + sent: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }; + + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_mcp_elicitation_rx(elic_rx); + + // A 5-second timeout turns a deadlock into a clear test failure instead of a hang. + let result = tokio::time::timeout( + Duration::from_secs(5), + agent.run_inline_tool_loop("trigger elicitation", 10), + ) + .await + .expect("run_inline_tool_loop timed out — elicitation deadlock not fixed") + .unwrap(); + + assert_eq!(result, "done"); + } } #[cfg(test)]