Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

- fix(classifiers): add configurable `pii_ner_allowlist` to `ClassifiersConfig` — tokens matching an allowlist entry (case-insensitive) are never redacted by the piiranha NER model, suppressing false positives such as "Zeph" → `[PII:CITY]`; default entries: `["Zeph", "Rust", "OpenAI", "Ollama", "Claude"]`; list is empty-able via config to disable the feature (closes #2537)
- fix(classifiers): document that macOS Apple Silicon requires `--features full,metal` for piiranha NER GPU acceleration; without `metal`, the 1.1 GB model exceeds the 30s timeout on CPU and falls back to regex-only PII detection (closes #2538)

- fix(mcp): elicitation deadlock in `run_inline_tool_loop` (phase 3, closes #2542) — `run_inline_tool_loop` now wraps each `execute_tool_call_erased` call in `tokio::select!` that concurrently drains `elicitation_rx`; `handle_elicitation_event` changed to `pub(super)` for cross-module access; regression test added with a blocking executor that simulates the real MCP deadlock scenario
- fix(tools): propagate `claim_source` from `ToolOutput` into the post-execution audit entry in `AdversarialPolicyGateExecutor`; `write_audit` now accepts an explicit `claim_source` parameter so the field is no longer hardcoded to `None` for successful executions (closes #2535)
- fix(tools): `extract_paths` now detects relative path tokens that contain `/` but do not start with `/` or `./` (e.g. `src/main.rs`, `.local/foo/bar`); URL schemes (`://`) and shell variable assignments (`KEY=value`) are excluded from matching (closes #2536)

Expand Down
28 changes: 22 additions & 6 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1683,7 +1683,7 @@ impl<C: Channel> Agent<C> {
/// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
/// sub-tasks that run synchronously inside the scheduler tick loop.
async fn run_inline_tool_loop(
&self,
&mut self,
prompt: &str,
max_iterations: usize,
) -> Result<String, zeph_llm::LlmError> {
Expand Down Expand Up @@ -1749,6 +1749,9 @@ impl<C: Channel> Agent<C> {
messages.push(Message::from_parts(Role::Assistant, parts));

// Execute each tool call and collect results.
// Drain elicitation_rx concurrently to avoid deadlock: MCP tool may
// send an elicitation request and block waiting for a response while
// execute_tool_call_erased is blocked waiting for the tool to finish.
let mut result_parts: Vec<MessagePart> = Vec::new();
for tc in &tool_calls {
let call = ToolCall {
Expand All @@ -1758,11 +1761,24 @@ impl<C: Channel> Agent<C> {
_ => serde_json::Map::new(),
},
};
let output = match self.tool_executor.execute_tool_call_erased(&call).await
{
Ok(Some(out)) => out.summary,
Ok(None) => "(no output)".to_owned(),
Err(e) => format!("[error] {e}"),
let output = loop {
tokio::select! {
result = self.tool_executor.execute_tool_call_erased(&call) => {
break match result {
Ok(Some(out)) => out.summary,
Ok(None) => "(no output)".to_owned(),
Err(e) => format!("[error] {e}"),
};
}
Some(event) = async {
match self.mcp.elicitation_rx.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
self.handle_elicitation_event(event).await;
}
}
};
let is_error = output.starts_with("[error]");
result_parts.push(MessagePart::ToolResult {
Expand Down
123 changes: 117 additions & 6 deletions crates/zeph-core/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3748,7 +3748,7 @@ mod inline_tool_loop_tests {
let registry = create_test_registry();
let executor = CallableToolExecutor::new(vec![]);

let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("what is 2+2?", 10).await;

assert_eq!(result.unwrap(), "the answer");
Expand All @@ -3765,7 +3765,7 @@ mod inline_tool_loop_tests {
let registry = create_test_registry();
let executor = CallableToolExecutor::fixed_output("tool result");

let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("run a tool", 10).await;

assert_eq!(result.unwrap(), "done");
Expand All @@ -3785,7 +3785,7 @@ mod inline_tool_loop_tests {
let executor = CallableToolExecutor::fixed_output("ok");

let max_iter = 5usize;
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("loop forever", max_iter).await;

// Must return Ok (not panic or hang) and have called the provider exactly max_iter times.
Expand All @@ -3807,7 +3807,7 @@ mod inline_tool_loop_tests {
let registry = create_test_registry();
let executor = CallableToolExecutor::failing();

let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("trigger error", 10).await;

assert_eq!(result.unwrap(), "recovered");
Expand Down Expand Up @@ -3852,7 +3852,7 @@ mod inline_tool_loop_tests {
})),
]);

let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent
.run_inline_tool_loop("two tools then answer", 10)
.await;
Expand All @@ -3869,11 +3869,122 @@ mod inline_tool_loop_tests {
let registry = create_test_registry();
let executor = CallableToolExecutor::new(vec![]);

let agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("this will fail", 10).await;

assert!(result.is_err());
}

// Regression test for issue #2542: elicitation deadlock in run_inline_tool_loop.
//
// The real deadlock scenario: MCP tool sends an elicitation event and then blocks
// waiting for the agent to respond via response_tx. Meanwhile execute_tool_call_erased
// also blocks waiting for the MCP tool — neither side makes progress.
//
// The fix: select! concurrently drains elicitation_rx while awaiting the tool result.
//
// Test design: BlockingElicitingExecutor sends an elicitation event then blocks on
// `unblock_rx` (a oneshot whose sender is never signalled — it stays pending until
// the future is cancelled). When select! picks the elicitation branch it cancels the
// tool future, dropping `unblock_rx`. On the next invocation `unblock_rx` is None so
// the executor returns immediately. This guarantees select! MUST pick the elicitation
// branch on the first iteration (tool is the only blocking party). If the fix were
// absent, the test would deadlock and time out.
#[tokio::test]
async fn elicitation_event_during_tool_execution_is_handled() {
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use zeph_mcp::ElicitationEvent;

struct BlockingElicitingExecutor {
elic_tx: mpsc::Sender<ElicitationEvent>,
// Holds the oneshot rx that the executor awaits on the first call.
// Dropped (None) on re-invocation after select! cancels the first future.
unblock_rx: Arc<std::sync::Mutex<Option<oneshot::Receiver<()>>>>,
sent: Arc<std::sync::atomic::AtomicBool>,
}

impl ToolExecutor for BlockingElicitingExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(None)
}

async fn execute_tool_call(
&self,
_call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
if !self.sent.swap(true, std::sync::atomic::Ordering::SeqCst) {
let (response_tx, _response_rx) = oneshot::channel();
let event = ElicitationEvent {
server_id: "test-server".to_owned(),
request:
rmcp::model::CreateElicitationRequestParams::FormElicitationParams {
meta: None,
message: "please fill in".to_owned(),
requested_schema: rmcp::model::ElicitationSchema::new(
std::collections::BTreeMap::new(),
),
},
response_tx,
};
let _ = self.elic_tx.send(event).await;
// Block until select! cancels this future (simulates the MCP server
// waiting for a response). Cancellation drops unblock_rx, causing
// this await to resolve with Err — but the future is already dropped
// by then. On re-invocation unblock_rx is None, so we skip blocking.
let rx = self.unblock_rx.lock().unwrap().take();
if let Some(rx) = rx {
let _ = rx.await;
}
}
Ok(Some(ToolOutput {
tool_name: "elicit_tool".into(),
summary: "result".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
claim_source: None,
}))
}
}

let (elic_tx, elic_rx) = mpsc::channel::<ElicitationEvent>(4);
// Keep _unblock_tx alive for the duration of the test so that unblock_rx.await
// truly blocks (channel not closed) until the future holding it is cancelled.
let (_unblock_tx, unblock_rx) = oneshot::channel::<()>();

let (mock, _counter) = MockProvider::default().with_tool_use(vec![
tool_use_response("call-elic", "elicit_tool"),
ChatResponse::Text("done".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = BlockingElicitingExecutor {
elic_tx,
unblock_rx: Arc::new(std::sync::Mutex::new(Some(unblock_rx))),
sent: Arc::new(std::sync::atomic::AtomicBool::new(false)),
};

let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_mcp_elicitation_rx(elic_rx);

// A 5-second timeout turns a deadlock into a clear test failure instead of a hang.
let result = tokio::time::timeout(
Duration::from_secs(5),
agent.run_inline_tool_loop("trigger elicitation", 10),
)
.await
.expect("run_inline_tool_loop timed out — elicitation deadlock not fixed")
.unwrap();

assert_eq!(result, "done");
}
}

#[cfg(test)]
Expand Down
Loading