Skip to content

Commit 101c51f

Browse files
authored
feat(mcp): elicitation during tool execution (phase 2) (#2545)
* feat(mcp): elicitation during tool execution — phase 2 (#2522) - Fix deadlock: drain elicitation_rx concurrently with tool tier futures via tokio::select! loop in execute_native_tool_calls - TUI: ElicitationDialogState widget with field navigation and render - TUI: TuiChannel::elicit() via AgentEvent::ElicitationRequest oneshot - Telegram: TelegramChannel::elicit() with sequential prompts, 120s timeout - Security: sanitize all MCP-provided strings (ANSI, control chars) - ELICITATION_TIMEOUT constant in zeph-channels * test(telegram): add elicit() unit tests; sanitize JSON field keys - Add 4 unit tests for TelegramChannel::elicit(): - happy path string field → Accepted with correct key/value - /cancel command → Cancelled - timeout logic (rx-level isolation, matching confirm pattern) - sanitize_field_key strips dashes, spaces, special chars - Fix sanitize_field_key test: function only strips non-alphanumeric/ underscore chars, not ANSI sequences — corrected test assertion - Update coverage-status.md: add row for MCP elicitation phase 2 (status: Untested) in the zeph-mcp section
1 parent 6fd347c commit 101c51f

File tree

10 files changed

+848
-15
lines changed

10 files changed

+848
-15
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
1212

1313
### Added
1414

15+
- feat(mcp): elicitation during tool execution — phase 2 (#2522)
16+
- **Core deadlock fix**: agent loop now drains MCP elicitation events concurrently with tool tier futures via `tokio::select!` in `execute_native_tool_calls`; without this, MCP servers that send an elicitation mid-tool-call would deadlock (tool awaits elicitation response, agent loop awaits tool result)
17+
- **TUI interactive dialog**: modal overlay (`ElicitationDialogState` + `widgets::elicitation`) renders elicitation fields (string/integer/number/boolean/enum) with keyboard navigation: `Tab`/`Shift+Tab` to move between fields, `Space` to toggle booleans, `Up`/`Down` for enum selection, `Enter` to submit, `Esc` to cancel; overrides vi-mode while active
18+
- **Telegram sequential prompts**: `TelegramChannel::elicit()` sends one prompt per field with 120 s per-field timeout; boolean uses yes/no reply; enum accepts 1-based index or exact match; `/cancel` command dismisses at any point; `ELICITATION_TIMEOUT` constant added to `zeph-channels`
19+
- **Security**: all MCP-supplied strings (server name, message, field names, enum values) are sanitized to strip ANSI escape sequences and control characters before rendering in TUI or Telegram; Telegram field prompts use 1-based numeric indexes to avoid the 64-byte callback_data limit
20+
21+
22+
1523
- feat(tools): structured shell output envelope (#2488) — `execute_bash` now captures stdout and stderr as separate streams at the process level using a tagged `(bool, String)` channel; `ShellOutputEnvelope { stdout, stderr, exit_code, truncated }` is built post-execution and serialized into `ToolOutput.raw_response` for ACP/audit consumers; LLM context continues using the interleaved combined output in `summary`; `AuditEntry` gains optional `exit_code: Option<i32>` and `truncated: bool` fields (`skip_serializing_if` for backward compat)
1624
- feat(tools): per-path read allow/deny sandbox for file tool (#2489) — new `[tools.file]` config section with `deny_read` and `allow_read` glob pattern lists; evaluation order: deny-then-allow; all patterns matched against canonicalized absolute paths (prevents symlink bypass); `FileExecutor::with_read_sandbox()` builder method applies the sandbox; `handle_read()` checks sandbox before `read_to_string`; `grep_recursive` skips denied files before reading content; `FileConfig` exported from `zeph-tools`
1725

crates/zeph-channels/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ pub use cli::CliChannel;
2121
/// Used by Telegram, Discord, and Slack `confirm()` implementations to ensure
2222
/// consistent deny-on-timeout behavior.
2323
pub const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
24+
/// Per-field timeout for interactive elicitation dialogs on remote channels (Telegram, etc.).
25+
pub const ELICITATION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);

crates/zeph-channels/src/telegram.rs

Lines changed: 231 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use crate::markdown::markdown_to_telegram;
77
use teloxide::prelude::*;
88
use teloxide::types::{BotCommand, ChatAction, MessageId, ParseMode};
99
use tokio::sync::mpsc;
10-
use zeph_core::channel::{Attachment, AttachmentKind, Channel, ChannelError, ChannelMessage};
10+
use zeph_core::channel::{
11+
Attachment, AttachmentKind, Channel, ChannelError, ChannelMessage, ElicitationField,
12+
ElicitationFieldType, ElicitationRequest, ElicitationResponse,
13+
};
1114

1215
const MAX_MESSAGE_LEN: usize = 4096;
1316
const MAX_IMAGE_BYTES: u32 = 20 * 1024 * 1024;
@@ -501,6 +504,144 @@ impl Channel for TelegramChannel {
501504
}
502505
}
503506
}
507+
508+
async fn elicit(
509+
&mut self,
510+
request: ElicitationRequest,
511+
) -> Result<ElicitationResponse, ChannelError> {
512+
let timeout = crate::ELICITATION_TIMEOUT;
513+
514+
self.send(&format!(
515+
"*[MCP server '{}' is requesting input]*\n{}\n\n_Reply /cancel to cancel. \
516+
Timeout: {}s._",
517+
sanitize_markdown(&request.server_name),
518+
sanitize_markdown(&request.message),
519+
timeout.as_secs(),
520+
))
521+
.await?;
522+
523+
let mut values = serde_json::Map::new();
524+
for field in &request.fields {
525+
let prompt = build_telegram_field_prompt(field);
526+
self.send(&prompt).await?;
527+
528+
let incoming = match tokio::time::timeout(timeout, self.rx.recv()).await {
529+
Ok(Some(msg)) => msg,
530+
Ok(None) => {
531+
tracing::warn!(server = request.server_name, "elicitation channel closed");
532+
return Ok(ElicitationResponse::Declined);
533+
}
534+
Err(_) => {
535+
tracing::warn!(server = request.server_name, "elicitation timed out");
536+
let _ = self
537+
.send("Elicitation timed out — request cancelled.")
538+
.await;
539+
return Ok(ElicitationResponse::Cancelled);
540+
}
541+
};
542+
543+
let text = incoming.text.trim().to_owned();
544+
545+
if text.eq_ignore_ascii_case("/cancel") {
546+
let _ = self.send("Elicitation cancelled.").await;
547+
return Ok(ElicitationResponse::Cancelled);
548+
}
549+
550+
let Some(value) = coerce_telegram_field(&text, &field.field_type) else {
551+
let _ = self
552+
.send(&format!("Invalid value for '{}'. Declining.", field.name))
553+
.await;
554+
return Ok(ElicitationResponse::Declined);
555+
};
556+
values.insert(sanitize_field_key(&field.name), value);
557+
}
558+
559+
Ok(ElicitationResponse::Accepted(serde_json::Value::Object(
560+
values,
561+
)))
562+
}
563+
}
564+
565+
/// Strip Markdown special characters to prevent injection in Telegram messages.
566+
fn sanitize_markdown(s: &str) -> String {
567+
s.chars()
568+
.filter(|c| !matches!(c, '*' | '_' | '[' | ']' | '`' | '\x1b'))
569+
.collect()
570+
}
571+
572+
/// Sanitize a field name for use as a JSON key.
573+
///
574+
/// Keeps only alphanumeric characters and underscores to prevent injection via
575+
/// malicious MCP server field names (e.g. keys with special chars that could
576+
/// confuse downstream consumers).
577+
fn sanitize_field_key(s: &str) -> String {
578+
s.chars()
579+
.filter(|c| c.is_alphanumeric() || *c == '_')
580+
.collect()
581+
}
582+
583+
fn build_telegram_field_prompt(field: &ElicitationField) -> String {
584+
let req = if field.required { " (required)" } else { "" };
585+
match &field.field_type {
586+
ElicitationFieldType::Boolean => {
587+
format!("*{}*{}: Reply *yes* or *no*", field.name, req)
588+
}
589+
ElicitationFieldType::Enum(opts) => {
590+
// Use short numeric indexes to avoid Telegram 64-byte callback_data limit
591+
let list: String = opts
592+
.iter()
593+
.enumerate()
594+
.map(|(i, o)| format!("{}: {}", i + 1, sanitize_markdown(o)))
595+
.collect::<Vec<_>>()
596+
.join("\n");
597+
format!("*{}*{}: Reply with the number:\n{}", field.name, req, list)
598+
}
599+
ElicitationFieldType::Integer => {
600+
format!("*{}*{}: Reply with an integer", field.name, req)
601+
}
602+
ElicitationFieldType::Number => {
603+
format!("*{}*{}: Reply with a number", field.name, req)
604+
}
605+
ElicitationFieldType::String => {
606+
format!("*{}*{}: Reply with text", field.name, req)
607+
}
608+
}
609+
}
610+
611+
fn coerce_telegram_field(text: &str, kind: &ElicitationFieldType) -> Option<serde_json::Value> {
612+
match kind {
613+
ElicitationFieldType::String => Some(serde_json::Value::String(text.to_owned())),
614+
ElicitationFieldType::Boolean => {
615+
if text.eq_ignore_ascii_case("yes") || text == "1" {
616+
Some(serde_json::Value::Bool(true))
617+
} else if text.eq_ignore_ascii_case("no") || text == "0" {
618+
Some(serde_json::Value::Bool(false))
619+
} else {
620+
None
621+
}
622+
}
623+
ElicitationFieldType::Integer => text
624+
.parse::<i64>()
625+
.ok()
626+
.map(|n| serde_json::Value::Number(n.into())),
627+
ElicitationFieldType::Number => text
628+
.parse::<f64>()
629+
.ok()
630+
.and_then(|n| serde_json::Number::from_f64(n).map(serde_json::Value::Number)),
631+
ElicitationFieldType::Enum(opts) => {
632+
// Accept numeric index (1-based) or exact match
633+
if let Ok(idx) = text.parse::<usize>()
634+
&& idx >= 1
635+
&& idx <= opts.len()
636+
{
637+
return Some(serde_json::Value::String(opts[idx - 1].clone()));
638+
}
639+
// Exact match (case-insensitive)
640+
opts.iter()
641+
.find(|o| o.eq_ignore_ascii_case(text))
642+
.map(|o| serde_json::Value::String(o.clone()))
643+
}
644+
}
504645
}
505646

506647
#[cfg(test)]
@@ -846,4 +987,93 @@ mod tests {
846987
requests.len()
847988
);
848989
}
990+
991+
// ---------------------------------------------------------------------------
992+
// elicit() — happy path, timeout, /cancel, field-key sanitization
993+
// All tests that exercise elicit() need the mock server because elicit()
994+
// calls self.send() (which calls the Telegram Bot API) before reading rx.
995+
// ---------------------------------------------------------------------------
996+
997+
#[tokio::test]
998+
async fn elicit_happy_path_string_field_returns_accepted() {
999+
let server = MockServer::start().await;
1000+
let (mut channel, tx) = make_mocked_channel(&server, vec![]).await;
1001+
1002+
let request = ElicitationRequest {
1003+
server_name: "test-server".to_owned(),
1004+
message: "Please provide your name".to_owned(),
1005+
fields: vec![ElicitationField {
1006+
name: "username".to_owned(),
1007+
description: None,
1008+
field_type: ElicitationFieldType::String,
1009+
required: true,
1010+
}],
1011+
};
1012+
1013+
// Send the answer before calling elicit() so it is buffered in the channel.
1014+
tx.send(plain_message("alice")).await.unwrap();
1015+
1016+
let response = channel.elicit(request).await.unwrap();
1017+
1018+
match response {
1019+
ElicitationResponse::Accepted(val) => {
1020+
assert_eq!(val["username"], "alice");
1021+
}
1022+
other => panic!("expected Accepted, got {other:?}"),
1023+
}
1024+
}
1025+
1026+
#[tokio::test]
1027+
async fn elicit_cancel_command_returns_cancelled() {
1028+
let server = MockServer::start().await;
1029+
let (mut channel, tx) = make_mocked_channel(&server, vec![]).await;
1030+
1031+
let request = ElicitationRequest {
1032+
server_name: "test-server".to_owned(),
1033+
message: "Provide a value".to_owned(),
1034+
fields: vec![ElicitationField {
1035+
name: "token".to_owned(),
1036+
description: None,
1037+
field_type: ElicitationFieldType::String,
1038+
required: true,
1039+
}],
1040+
};
1041+
1042+
tx.send(plain_message("/cancel")).await.unwrap();
1043+
1044+
let response = channel.elicit(request).await.unwrap();
1045+
assert!(
1046+
matches!(response, ElicitationResponse::Cancelled),
1047+
"expected Cancelled, got {response:?}"
1048+
);
1049+
}
1050+
1051+
/// Verify the timeout branch of elicit() at the rx level, matching the
1052+
/// same pattern used in confirm_timeout_logic_denies_on_timeout.
1053+
#[tokio::test]
1054+
async fn elicit_timeout_logic_cancels_on_timeout() {
1055+
tokio::time::pause();
1056+
let (_tx, mut rx) = mpsc::channel::<IncomingMessage>(1);
1057+
let timeout_fut = tokio::time::timeout(crate::ELICITATION_TIMEOUT, rx.recv());
1058+
tokio::time::advance(crate::ELICITATION_TIMEOUT + Duration::from_millis(1)).await;
1059+
let result = timeout_fut.await;
1060+
assert!(
1061+
result.is_err(),
1062+
"expected Err(Elapsed) for elicitation timeout, got recv result"
1063+
);
1064+
}
1065+
1066+
// ---------------------------------------------------------------------------
1067+
// sanitize_field_key — pure unit test (no network)
1068+
// ---------------------------------------------------------------------------
1069+
1070+
#[test]
1071+
fn sanitize_field_key_strips_special_chars() {
1072+
assert_eq!(sanitize_field_key("hello world"), "helloworld");
1073+
assert_eq!(sanitize_field_key("field-name"), "fieldname");
1074+
assert_eq!(sanitize_field_key("__ok__"), "__ok__");
1075+
assert_eq!(sanitize_field_key("a.b.c"), "abc");
1076+
// Alphanumeric chars and underscores are kept; everything else stripped.
1077+
assert_eq!(sanitize_field_key("key!@#val"), "keyval");
1078+
}
8491079
}

crates/zeph-core/src/agent/mcp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ impl<C: Channel> Agent<C> {
511511
}
512512

513513
/// Handle a single elicitation event by routing it to the active channel.
514-
async fn handle_elicitation_event(&mut self, event: zeph_mcp::ElicitationEvent) {
514+
pub(super) async fn handle_elicitation_event(&mut self, event: zeph_mcp::ElicitationEvent) {
515515
use crate::channel::{ElicitationRequest, ElicitationResponse};
516516

517517
let decline = CreateElicitationResult {

crates/zeph-core/src/agent/tool_execution/native.rs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,18 +1246,40 @@ impl<C: Channel> Agent<C> {
12461246
// runtime. For CPU-bound tool work, the semaphore limits oversubscription.
12471247
let (indices, futs): (Vec<usize>, Vec<ToolExecFut>) = tier_futs.into_iter().unzip();
12481248

1249-
let tier_results = tokio::select! {
1250-
results = futures::future::join_all(futs) => results,
1251-
() = cancel.cancelled() => {
1252-
self.tool_executor.set_skill_env(None);
1253-
tracing::info!("tool execution cancelled by user");
1254-
self.update_metrics(|m| m.cancellations += 1);
1255-
self.channel.send("[Cancelled]").await?;
1256-
// Persist tombstone ToolResult for all tool_calls so the assistant ToolUse
1257-
// persisted above is always paired in the DB (prevents cross-session orphan).
1258-
self.persist_cancelled_tool_results(tool_calls).await;
1259-
return Ok(());
1260-
}
1249+
// Poll tier futures, cancellation, and MCP elicitation requests concurrently.
1250+
// Elicitation events arrive from MCP server handlers that are blocked waiting on a
1251+
// oneshot response. Without draining them here the tier join never completes (deadlock).
1252+
let tier_results = {
1253+
let mut join_fut = std::pin::pin!(futures::future::join_all(futs));
1254+
// Take elicitation_rx out of self so we can hold &mut self for handling.
1255+
let mut elicitation_rx = self.mcp.elicitation_rx.take();
1256+
let result = loop {
1257+
tokio::select! {
1258+
results = &mut join_fut => break results,
1259+
() = cancel.cancelled() => {
1260+
self.mcp.elicitation_rx = elicitation_rx;
1261+
self.tool_executor.set_skill_env(None);
1262+
tracing::info!("tool execution cancelled by user");
1263+
self.update_metrics(|m| m.cancellations += 1);
1264+
self.channel.send("[Cancelled]").await?;
1265+
// Persist tombstone ToolResult for all tool_calls so the assistant ToolUse
1266+
// persisted above is always paired in the DB (prevents cross-session orphan).
1267+
self.persist_cancelled_tool_results(tool_calls).await;
1268+
return Ok(());
1269+
}
1270+
event = recv_elicitation(&mut elicitation_rx) => {
1271+
if let Some(ev) = event {
1272+
self.handle_elicitation_event(ev).await;
1273+
} else {
1274+
// Channel closed — stop polling it
1275+
tracing::debug!("elicitation channel closed during tier exec");
1276+
elicitation_rx = None;
1277+
}
1278+
}
1279+
}
1280+
};
1281+
self.mcp.elicitation_rx = elicitation_rx;
1282+
result
12611283
};
12621284

12631285
// Store results and collect failed tool_use_ids for dependency propagation.
@@ -2252,6 +2274,19 @@ impl<C: Channel> Agent<C> {
22522274
}
22532275
}
22542276

2277+
/// Receive the next elicitation event from an optional channel without blocking.
2278+
///
2279+
/// Returns `None` when the receiver is absent (no MCP elicitation configured) or the channel
2280+
/// is closed, causing the `select!` branch to be disabled rather than polling indefinitely.
2281+
async fn recv_elicitation(
2282+
rx: &mut Option<tokio::sync::mpsc::Receiver<zeph_mcp::ElicitationEvent>>,
2283+
) -> Option<zeph_mcp::ElicitationEvent> {
2284+
match rx {
2285+
Some(r) => r.recv().await,
2286+
None => std::future::pending().await,
2287+
}
2288+
}
2289+
22552290
// T-CRIT-02: handle_focus_tool tests — happy path, error paths, checkpoint pinning (S5 fix).
22562291
#[cfg(all(test, feature = "context-compression"))]
22572292
mod tests {

0 commit comments

Comments
 (0)