Skip to content

Commit 9adcf3a

Browse files
authored
fix(mcp): drain elicitation_rx concurrently in run_inline_tool_loop (#2548)
* fix(mcp): drain elicitation_rx concurrently in run_inline_tool_loop (#2542) Phase 3 fix for elicitation deadlock. run_inline_tool_loop called execute_tool_call_erased sequentially without draining elicitation_rx, causing deadlock when an MCP tool sent an elicitation event and blocked waiting for a response. - Change run_inline_tool_loop to &mut self - Wrap each execute_tool_call_erased call in tokio::select! that concurrently drains elicitation_rx and calls handle_elicitation_event - Change handle_elicitation_event to pub(super) for cross-module access - Add regression test with blocking executor that simulates the real MCP deadlock scenario; test times out in 5s if deadlock recurs * docs: update CHANGELOG for #2542 elicitation deadlock fix
1 parent 101c51f commit 9adcf3a

File tree

3 files changed

+140
-13
lines changed

3 files changed

+140
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
3131

3232
- fix(classifiers): add configurable `pii_ner_allowlist` to `ClassifiersConfig` — tokens matching an allowlist entry (case-insensitive) are never redacted by the piiranha NER model, suppressing false positives such as "Zeph" → `[PII:CITY]`; default entries: `["Zeph", "Rust", "OpenAI", "Ollama", "Claude"]`; list is empty-able via config to disable the feature (closes #2537)
3333
- fix(classifiers): document that macOS Apple Silicon requires `--features full,metal` for piiranha NER GPU acceleration; without `metal`, the 1.1 GB model exceeds the 30s timeout on CPU and falls back to regex-only PII detection (closes #2538)
34-
34+
- fix(mcp): elicitation deadlock in `run_inline_tool_loop` (phase 3, closes #2542) — `run_inline_tool_loop` now wraps each `execute_tool_call_erased` call in `tokio::select!` that concurrently drains `elicitation_rx`; `handle_elicitation_event` changed to `pub(super)` for cross-module access; regression test added with a blocking executor that simulates the real MCP deadlock scenario
3535
- fix(tools): propagate `claim_source` from `ToolOutput` into the post-execution audit entry in `AdversarialPolicyGateExecutor`; `write_audit` now accepts an explicit `claim_source` parameter so the field is no longer hardcoded to `None` for successful executions (closes #2535)
3636
- fix(tools): `extract_paths` now detects relative path tokens that contain `/` but do not start with `/` or `./` (e.g. `src/main.rs`, `.local/foo/bar`); URL schemes (`://`) and shell variable assignments (`KEY=value`) are excluded from matching (closes #2536)
3737

crates/zeph-core/src/agent/mod.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,7 +1683,7 @@ impl<C: Channel> Agent<C> {
16831683
/// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
16841684
/// sub-tasks that run synchronously inside the scheduler tick loop.
16851685
async fn run_inline_tool_loop(
1686-
&self,
1686+
&mut self,
16871687
prompt: &str,
16881688
max_iterations: usize,
16891689
) -> Result<String, zeph_llm::LlmError> {
@@ -1749,6 +1749,9 @@ impl<C: Channel> Agent<C> {
17491749
messages.push(Message::from_parts(Role::Assistant, parts));
17501750

17511751
// Execute each tool call and collect results.
1752+
// Drain elicitation_rx concurrently to avoid deadlock: MCP tool may
1753+
// send an elicitation request and block waiting for a response while
1754+
// execute_tool_call_erased is blocked waiting for the tool to finish.
17521755
let mut result_parts: Vec<MessagePart> = Vec::new();
17531756
for tc in &tool_calls {
17541757
let call = ToolCall {
@@ -1758,11 +1761,24 @@ impl<C: Channel> Agent<C> {
17581761
_ => serde_json::Map::new(),
17591762
},
17601763
};
1761-
let output = match self.tool_executor.execute_tool_call_erased(&call).await
1762-
{
1763-
Ok(Some(out)) => out.summary,
1764-
Ok(None) => "(no output)".to_owned(),
1765-
Err(e) => format!("[error] {e}"),
1764+
let output = loop {
1765+
tokio::select! {
1766+
result = self.tool_executor.execute_tool_call_erased(&call) => {
1767+
break match result {
1768+
Ok(Some(out)) => out.summary,
1769+
Ok(None) => "(no output)".to_owned(),
1770+
Err(e) => format!("[error] {e}"),
1771+
};
1772+
}
1773+
Some(event) = async {
1774+
match self.mcp.elicitation_rx.as_mut() {
1775+
Some(rx) => rx.recv().await,
1776+
None => std::future::pending().await,
1777+
}
1778+
} => {
1779+
self.handle_elicitation_event(event).await;
1780+
}
1781+
}
17661782
};
17671783
let is_error = output.starts_with("[error]");
17681784
result_parts.push(MessagePart::ToolResult {

crates/zeph-core/src/agent/tests.rs

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3748,7 +3748,7 @@ mod inline_tool_loop_tests {
37483748
let registry = create_test_registry();
37493749
let executor = CallableToolExecutor::new(vec![]);
37503750

3751-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3751+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
37523752
let result = agent.run_inline_tool_loop("what is 2+2?", 10).await;
37533753

37543754
assert_eq!(result.unwrap(), "the answer");
@@ -3765,7 +3765,7 @@ mod inline_tool_loop_tests {
37653765
let registry = create_test_registry();
37663766
let executor = CallableToolExecutor::fixed_output("tool result");
37673767

3768-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3768+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
37693769
let result = agent.run_inline_tool_loop("run a tool", 10).await;
37703770

37713771
assert_eq!(result.unwrap(), "done");
@@ -3785,7 +3785,7 @@ mod inline_tool_loop_tests {
37853785
let executor = CallableToolExecutor::fixed_output("ok");
37863786

37873787
let max_iter = 5usize;
3788-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3788+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
37893789
let result = agent.run_inline_tool_loop("loop forever", max_iter).await;
37903790

37913791
// Must return Ok (not panic or hang) and have called the provider exactly max_iter times.
@@ -3807,7 +3807,7 @@ mod inline_tool_loop_tests {
38073807
let registry = create_test_registry();
38083808
let executor = CallableToolExecutor::failing();
38093809

3810-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3810+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
38113811
let result = agent.run_inline_tool_loop("trigger error", 10).await;
38123812

38133813
assert_eq!(result.unwrap(), "recovered");
@@ -3852,7 +3852,7 @@ mod inline_tool_loop_tests {
38523852
})),
38533853
]);
38543854

3855-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3855+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
38563856
let result = agent
38573857
.run_inline_tool_loop("two tools then answer", 10)
38583858
.await;
@@ -3869,11 +3869,122 @@ mod inline_tool_loop_tests {
38693869
let registry = create_test_registry();
38703870
let executor = CallableToolExecutor::new(vec![]);
38713871

3872-
let agent = Agent::new(provider, channel, registry, None, 5, executor);
3872+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
38733873
let result = agent.run_inline_tool_loop("this will fail", 10).await;
38743874

38753875
assert!(result.is_err());
38763876
}
3877+
3878+
// Regression test for issue #2542: elicitation deadlock in run_inline_tool_loop.
3879+
//
3880+
// The real deadlock scenario: MCP tool sends an elicitation event and then blocks
3881+
// waiting for the agent to respond via response_tx. Meanwhile execute_tool_call_erased
3882+
// also blocks waiting for the MCP tool — neither side makes progress.
3883+
//
3884+
// The fix: select! concurrently drains elicitation_rx while awaiting the tool result.
3885+
//
3886+
// Test design: BlockingElicitingExecutor sends an elicitation event then blocks on
3887+
// `unblock_rx` (a oneshot whose sender is never signalled — it stays pending until
3888+
// the future is cancelled). When select! picks the elicitation branch it cancels the
3889+
// tool future, dropping `unblock_rx`. On the next invocation `unblock_rx` is None so
3890+
// the executor returns immediately. This guarantees select! MUST pick the elicitation
3891+
// branch on the first iteration (tool is the only blocking party). If the fix were
3892+
// absent, the test would deadlock and time out.
3893+
#[tokio::test]
3894+
async fn elicitation_event_during_tool_execution_is_handled() {
3895+
use std::sync::Arc;
3896+
use std::time::Duration;
3897+
use tokio::sync::{mpsc, oneshot};
3898+
use zeph_mcp::ElicitationEvent;
3899+
3900+
struct BlockingElicitingExecutor {
3901+
elic_tx: mpsc::Sender<ElicitationEvent>,
3902+
// Holds the oneshot rx that the executor awaits on the first call.
3903+
// Dropped (None) on re-invocation after select! cancels the first future.
3904+
unblock_rx: Arc<std::sync::Mutex<Option<oneshot::Receiver<()>>>>,
3905+
sent: Arc<std::sync::atomic::AtomicBool>,
3906+
}
3907+
3908+
impl ToolExecutor for BlockingElicitingExecutor {
3909+
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
3910+
Ok(None)
3911+
}
3912+
3913+
async fn execute_tool_call(
3914+
&self,
3915+
_call: &ToolCall,
3916+
) -> Result<Option<ToolOutput>, ToolError> {
3917+
if !self.sent.swap(true, std::sync::atomic::Ordering::SeqCst) {
3918+
let (response_tx, _response_rx) = oneshot::channel();
3919+
let event = ElicitationEvent {
3920+
server_id: "test-server".to_owned(),
3921+
request:
3922+
rmcp::model::CreateElicitationRequestParams::FormElicitationParams {
3923+
meta: None,
3924+
message: "please fill in".to_owned(),
3925+
requested_schema: rmcp::model::ElicitationSchema::new(
3926+
std::collections::BTreeMap::new(),
3927+
),
3928+
},
3929+
response_tx,
3930+
};
3931+
let _ = self.elic_tx.send(event).await;
3932+
// Block until select! cancels this future (simulates the MCP server
3933+
// waiting for a response). Cancellation drops unblock_rx, causing
3934+
// this await to resolve with Err — but the future is already dropped
3935+
// by then. On re-invocation unblock_rx is None, so we skip blocking.
3936+
let rx = self.unblock_rx.lock().unwrap().take();
3937+
if let Some(rx) = rx {
3938+
let _ = rx.await;
3939+
}
3940+
}
3941+
Ok(Some(ToolOutput {
3942+
tool_name: "elicit_tool".into(),
3943+
summary: "result".into(),
3944+
blocks_executed: 1,
3945+
filter_stats: None,
3946+
diff: None,
3947+
streamed: false,
3948+
terminal_id: None,
3949+
locations: None,
3950+
raw_response: None,
3951+
claim_source: None,
3952+
}))
3953+
}
3954+
}
3955+
3956+
let (elic_tx, elic_rx) = mpsc::channel::<ElicitationEvent>(4);
3957+
// Keep _unblock_tx alive for the duration of the test so that unblock_rx.await
3958+
// truly blocks (channel not closed) until the future holding it is cancelled.
3959+
let (_unblock_tx, unblock_rx) = oneshot::channel::<()>();
3960+
3961+
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3962+
tool_use_response("call-elic", "elicit_tool"),
3963+
ChatResponse::Text("done".into()),
3964+
]);
3965+
let provider = AnyProvider::Mock(mock);
3966+
let channel = MockChannel::new(vec![]);
3967+
let registry = create_test_registry();
3968+
let executor = BlockingElicitingExecutor {
3969+
elic_tx,
3970+
unblock_rx: Arc::new(std::sync::Mutex::new(Some(unblock_rx))),
3971+
sent: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3972+
};
3973+
3974+
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3975+
.with_mcp_elicitation_rx(elic_rx);
3976+
3977+
// A 5-second timeout turns a deadlock into a clear test failure instead of a hang.
3978+
let result = tokio::time::timeout(
3979+
Duration::from_secs(5),
3980+
agent.run_inline_tool_loop("trigger elicitation", 10),
3981+
)
3982+
.await
3983+
.expect("run_inline_tool_loop timed out — elicitation deadlock not fixed")
3984+
.unwrap();
3985+
3986+
assert_eq!(result, "done");
3987+
}
38773988
}
38783989

38793990
#[cfg(test)]

0 commit comments

Comments
 (0)