-
Notifications
You must be signed in to change notification settings - Fork 768
add human_interaction handler using MCP elicitation #507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
ba45eff
82d46c9
0767757
9ca5caf
840ba6b
65ef9f3
7d64c84
e3e9446
ddf4ec4
a6df1f8
c98b7d1
7a92ffe
a3d8222
af59efc
e49f96c
8960107
dcf6d0a
f1b72f8
d21a086
aeb2a88
702a137
82ca936
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Human interactions in Temporal | ||
|
||
This example demonstrates how to implement human interactions in an MCP running as a Temporal workflow. | ||
Human input can be used for approvals or data entry. | ||
In this case, we ask a human to provide their name, so we can create a personalised greeting. | ||
|
||
## Set up | ||
|
||
First, clone the repo and navigate to the human_input example: | ||
|
||
```bash | ||
git clone https://github.com/lastmile-ai/mcp-agent.git | ||
cd mcp-agent/examples/human_input/temporal | ||
``` | ||
|
||
Install `uv` (if you don’t have it): | ||
|
||
```bash | ||
pip install uv | ||
``` | ||
|
||
## Set up api keys | ||
|
||
In `mcp_agent.secrets.yaml`, set your OpenAI `api_key`. | ||
|
||
## Setting Up Temporal Server | ||
|
||
Before running this example, you need to have a Temporal server running: | ||
|
||
1. Install the Temporal CLI by following the instructions at: https://docs.temporal.io/cli/ | ||
|
||
2. Start a local Temporal server: | ||
```bash | ||
temporal server start-dev | ||
``` | ||
|
||
This will start a Temporal server on `localhost:7233` (the default address configured in `mcp_agent.config.yaml`). | ||
|
||
You can use the Temporal Web UI to monitor your workflows by visiting `http://localhost:8233` in your browser. | ||
|
||
## Run locally | ||
|
||
In three separate terminal windows, run the following: | ||
|
||
```bash | ||
# this runs the mcp app | ||
uv run main.py | ||
``` | ||
|
||
```bash | ||
# this runs the temporal worker that will execute the workflows | ||
uv run worker.py | ||
``` | ||
|
||
```bash | ||
# this runs the client | ||
uv run client.py | ||
``` | ||
|
||
You will be prompted for input after the agent makes the initial tool call. | ||
|
||
## Details | ||
|
||
Notice how in `main.py` the `human_input_callback` is set to `elicitation_input_callback`. | ||
This makes sure that human input is sought via elicitation. | ||
In `client.py`, on the other hand, it is set to `console_elicitation_callback`. | ||
This way, the client will prompt for input in the console whenever an upstream request for human input is made. | ||
|
||
The following diagram shows the components involved and the flow of requests and responses. | ||
|
||
```plaintext | ||
┌──────────┐ | ||
│ LLM │ | ||
│ │ | ||
└──────────┘ | ||
▲ | ||
│ | ||
1 | ||
│ | ||
▼ | ||
┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ | ||
│ Temporal │───2──▶│ MCP App │◀──3──▶│ Client │◀──4──▶│ User │ | ||
│ worker │◀──5───│ │ │ │ │ (via console)│ | ||
└──────────┘ └──────────────┘ └──────────────┘ └──────────────┘ | ||
``` | ||
|
||
In the diagram, | ||
- (1) uses the tool calling mechanism to call a system-provided tool for human input, | ||
- (2) uses a HTTPS request to tell the MCP App that the workflow wants to make a request, | ||
- (3) uses the MCP protocol for sending the request to the client and receiving the response, | ||
- (4) uses a console prompt to get the input from the user, and | ||
- (5) uses a Temporal signal to send the response back to the workflow. |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,200 @@ | ||||||
import asyncio | ||||||
import time | ||||||
from mcp_agent.app import MCPApp | ||||||
from mcp_agent.config import Settings, LoggerSettings, MCPSettings | ||||||
import yaml | ||||||
from mcp_agent.elicitation.handler import console_elicitation_callback | ||||||
from mcp_agent.config import MCPServerSettings | ||||||
from mcp_agent.core.context import Context | ||||||
from mcp_agent.mcp.gen_client import gen_client | ||||||
from datetime import timedelta | ||||||
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||||||
from mcp import ClientSession | ||||||
from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession | ||||||
from mcp.types import CallToolResult, LoggingMessageNotificationParams | ||||||
from mcp_agent.human_input.console_handler import console_input_callback | ||||||
try: | ||||||
from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport | ||||||
except Exception: # pragma: no cover | ||||||
_ExceptionGroup = None # type: ignore | ||||||
try: | ||||||
from anyio import BrokenResourceError as _BrokenResourceError | ||||||
except Exception: # pragma: no cover | ||||||
_BrokenResourceError = None # type: ignore | ||||||
|
||||||
|
||||||
async def main(): | ||||||
# Create MCPApp to get the server registry, with console handlers | ||||||
# IMPORTANT: This client acts as the “upstream MCP client” for the server. | ||||||
# When the server requests sampling (sampling/createMessage), the client-side | ||||||
# MCPApp must be able to service that request locally (approval prompts + LLM call). | ||||||
# Those client-local flows are not running inside a Temporal workflow, so they | ||||||
# must use the asyncio executor. If this were set to "temporal", local sampling | ||||||
# would crash with: "TemporalExecutor.execute must be called from within a workflow". | ||||||
# | ||||||
# We programmatically construct Settings here (mirroring examples/basic/mcp_basic_agent/main.py) | ||||||
# so everything is self-contained in this client: | ||||||
settings = Settings( | ||||||
execution_engine="asyncio", | ||||||
logger=LoggerSettings(level="info"), | ||||||
mcp=MCPSettings( | ||||||
servers={ | ||||||
"basic_agent_server": MCPServerSettings( | ||||||
name="basic_agent_server", | ||||||
description="Local workflow server running the basic agent example", | ||||||
transport="sse", | ||||||
# Use a routable loopback host; 0.0.0.0 is a bind address, not a client URL | ||||||
url="http://127.0.0.1:8000/sse", | ||||||
) | ||||||
} | ||||||
), | ||||||
) | ||||||
# Load secrets (API keys, etc.) if a secrets file is available and merge into settings. | ||||||
# We intentionally deep-merge the secrets on top of our base settings so | ||||||
# credentials are applied without overriding our executor or server endpoint. | ||||||
try: | ||||||
secrets_path = Settings.find_secrets() | ||||||
if secrets_path and secrets_path.exists(): | ||||||
with open(secrets_path, "r", encoding="utf-8") as f: | ||||||
secrets_dict = yaml.safe_load(f) or {} | ||||||
|
||||||
def _deep_merge(base: dict, overlay: dict) -> dict: | ||||||
out = dict(base) | ||||||
for k, v in (overlay or {}).items(): | ||||||
if k in out and isinstance(out[k], dict) and isinstance(v, dict): | ||||||
out[k] = _deep_merge(out[k], v) | ||||||
else: | ||||||
out[k] = v | ||||||
return out | ||||||
|
||||||
base_dict = settings.model_dump(mode="json") | ||||||
merged = _deep_merge(base_dict, secrets_dict) | ||||||
settings = Settings(**merged) | ||||||
except Exception: | ||||||
# Best-effort: continue without secrets if parsing fails | ||||||
pass | ||||||
app = MCPApp( | ||||||
name="workflow_mcp_client", | ||||||
# Disable sampling approval prompts entirely to keep flows non-interactive. | ||||||
# Elicitation remains interactive via console_elicitation_callback. | ||||||
human_input_callback=console_input_callback, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There appears to be a discrepancy between the code implementation and the documentation. The README mentions that the client should use
Suggested change
Spotted by Diamond |
||||||
elicitation_callback=console_elicitation_callback, | ||||||
settings=settings, | ||||||
) | ||||||
async with app.run() as client_app: | ||||||
logger = client_app.logger | ||||||
context = client_app.context | ||||||
|
||||||
# Connect to the workflow server | ||||||
try: | ||||||
logger.info("Connecting to workflow server...") | ||||||
|
||||||
# Server connection is configured via Settings above (no runtime mutation needed) | ||||||
|
||||||
# Connect to the workflow server | ||||||
# Define a logging callback to receive server-side log notifications | ||||||
async def on_server_log(params: LoggingMessageNotificationParams) -> None: | ||||||
# Pretty-print server logs locally for demonstration | ||||||
level = params.level.upper() | ||||||
name = params.logger or "server" | ||||||
# params.data can be any JSON-serializable data | ||||||
print(f"[SERVER LOG] [{level}] [{name}] {params.data}") | ||||||
|
||||||
# Provide a client session factory that installs our logging callback | ||||||
# and prints non-logging notifications to the console | ||||||
class ConsolePrintingClientSession(MCPAgentClientSession): | ||||||
async def _received_notification(self, notification): # type: ignore[override] | ||||||
try: | ||||||
method = getattr(notification.root, "method", None) | ||||||
except Exception: | ||||||
method = None | ||||||
|
||||||
# Avoid duplicating server log prints (handled by logging_callback) | ||||||
if method and method != "notifications/message": | ||||||
try: | ||||||
data = notification.model_dump() | ||||||
except Exception: | ||||||
data = str(notification) | ||||||
print(f"[SERVER NOTIFY] {method}: {data}") | ||||||
|
||||||
return await super()._received_notification(notification) | ||||||
|
||||||
def make_session( | ||||||
read_stream: MemoryObjectReceiveStream, | ||||||
write_stream: MemoryObjectSendStream, | ||||||
read_timeout_seconds: timedelta | None, | ||||||
context: Context | None = None, | ||||||
) -> ClientSession: | ||||||
return ConsolePrintingClientSession( | ||||||
read_stream=read_stream, | ||||||
write_stream=write_stream, | ||||||
read_timeout_seconds=read_timeout_seconds, | ||||||
logging_callback=on_server_log, | ||||||
context=context, | ||||||
) | ||||||
|
||||||
# Connect to the workflow server | ||||||
async with gen_client( | ||||||
"basic_agent_server", | ||||||
context.server_registry, | ||||||
client_session_factory=make_session, | ||||||
) as server: | ||||||
# Ask server to send logs at the requested level (default info) | ||||||
level = "info" | ||||||
print(f"[client] Setting server logging level to: {level}") | ||||||
try: | ||||||
await server.set_logging_level(level) | ||||||
except Exception: | ||||||
# Older servers may not support logging capability | ||||||
print("[client] Server does not support logging/setLevel") | ||||||
|
||||||
# Call the `greet` tool defined via `@app.tool` | ||||||
run_result = await server.call_tool( | ||||||
"greet", | ||||||
arguments={} | ||||||
) | ||||||
print(f"[client] Workflow run result: {run_result}") | ||||||
except Exception as e: | ||||||
# Tolerate benign shutdown races from SSE client (BrokenResourceError within ExceptionGroup) | ||||||
if _ExceptionGroup is not None and isinstance(e, _ExceptionGroup): | ||||||
subs = getattr(e, "exceptions", []) or [] | ||||||
if ( | ||||||
_BrokenResourceError is not None | ||||||
and subs | ||||||
and all(isinstance(se, _BrokenResourceError) for se in subs) | ||||||
): | ||||||
logger.debug("Ignored BrokenResourceError from SSE shutdown") | ||||||
else: | ||||||
raise | ||||||
elif _BrokenResourceError is not None and isinstance( | ||||||
e, _BrokenResourceError | ||||||
): | ||||||
logger.debug("Ignored BrokenResourceError from SSE shutdown") | ||||||
elif "BrokenResourceError" in str(e): | ||||||
logger.debug( | ||||||
"Ignored BrokenResourceError from SSE shutdown (string match)" | ||||||
) | ||||||
else: | ||||||
raise | ||||||
|
||||||
|
||||||
def _tool_result_to_json(tool_result: CallToolResult): | ||||||
if tool_result.content and len(tool_result.content) > 0: | ||||||
text = tool_result.content[0].text | ||||||
try: | ||||||
# Try to parse the response as JSON if it's a string | ||||||
import json | ||||||
|
||||||
return json.loads(text) | ||||||
except (json.JSONDecodeError, TypeError): | ||||||
# If it's not valid JSON, just use the text | ||||||
return None | ||||||
|
||||||
|
||||||
if __name__ == "__main__": | ||||||
start = time.time() | ||||||
asyncio.run(main()) | ||||||
end = time.time() | ||||||
t = end - start | ||||||
|
||||||
print(f"Total run time: {t:.2f}s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify documentation consistency with implementation.
The README states that
main.py
useselicitation_input_callback
(Line 64) andclient.py
usesconsole_elicitation_callback
(Line 66). Let me verify this matches the actual implementation.🏁 Script executed:
Length of output: 478
Update README: reflect actual callbacks in client.py
main.py uses elicitation_input_callback (examples/human_input/temporal/main.py:11,25); client.py sets human_input_callback=console_input_callback and elicitation_callback=console_elicitation_callback (examples/human_input/temporal/client.py:79–80). Update README (examples/human_input/temporal/README.md lines 64–67) to state this distinction.
🤖 Prompt for AI Agents