-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
322 lines (261 loc) · 12.7 KB
/
main.py
File metadata and controls
322 lines (261 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from typing import Literal, cast
from fastmcp import FastMCP
import mcp_client
import toolhive_client
from models import PipelineStage
from shell_engine import ShellEngine
mcp = FastMCP(
"model-context-shell",
instructions="""
This MCP server provides pipeline execution for coordinating tool calls and shell commands.
KEY PRINCIPLE: Build complete workflows as SINGLE pipeline calls. Don't make multiple
execute_pipeline calls where you manually copy/paste data between them. Instead, construct
the entire workflow in one pipeline and use jq/grep/sed to transform data between stages.
Only inspect intermediate results to verify correctness, not to manually pass data around.
""",
)
# Create the shell engine with MCP client's call_tool as the tool caller
# This uses dependency injection to decouple the engine from MCP specifics
# batch_tool_caller enables connection reuse for for_each pipelines (much faster)
engine = ShellEngine(
tool_caller=mcp_client.call_tool,
batch_tool_caller=mcp_client.batch_call_tool,
)
@mcp.tool()
def list_available_shell_commands() -> list[str]:
"""
List basic, safe CLI commands commonly used in shell one-liners.
These commands are available for use in execute_pipeline for data transformation:
- jq: JSON processing and transformation (essential for API data)
- grep: Text filtering and pattern matching
- sed/awk: Advanced text processing
- sort/uniq: Data organization
⚠️ SECURITY: In pipelines, specify command name and args separately:
{"type": "command", "command": "jq", "args": ["-c", ".foo"]}
NOT: {"type": "command", "command": "jq -c .foo"}
Use these in pipelines for precise data manipulation between tool calls.
"""
return engine.list_available_commands()
@mcp.tool()
async def execute_pipeline(pipeline: list[PipelineStage]) -> str:
"""
Execute a pipeline of tool calls and shell commands to coordinate multiple operations.
⚠️ IMPORTANT: Before using this tool, call list_all_tools() to discover what tools
are actually available. Do not assume tools exist - verify them first!
⚠️ FOR BATCH OPERATIONS: Use for_each to process multiple items in ONE pipeline call.
Do NOT make separate execute_pipeline calls for each item - that's inefficient!
Example: To fetch 50 Pokemon details, use ONE pipeline with for_each, not 50 separate calls.
A pipeline chains multiple stages where data flows from one to the next:
- Tool stages: Call external tools (from list_all_tools)
- Command stages: Transform data with jq, grep, sed, awk, etc.
- Preview stages: Inspect data structure before processing (recommended!)
Pipeline Structure:
Each stage is a dict with:
- type: "tool" | "command" | "preview"
- for_each (optional): Process items one-by-one instead of all at once
Tool Stage:
{"type": "tool", "name": "tool_name", "server": "server_name", "args": {...}}
- Calls a tool from an MCP server (get names from list_all_tools)
- Automatically merges upstream JSON data with args before calling tool
- ⚠️ If upstream has extra fields the tool doesn't accept, use jq to filter first
Command Stage:
{"type": "command", "command": "jq", "args": ["-c", ".field"]}
- Runs allowed shell commands (see list_available_shell_commands)
- Command and args MUST be separate (security requirement)
Preview Stage:
{"type": "preview", "chars": 3000}
- Shows a SUMMARIZED view of the data (default: 3000 chars)
- ⚠️ OUTPUT IS NOT VALID JSON - uses pseudo-format with /* N more */ markers
- Use this to understand data structure BEFORE writing jq filters
- Example output:
=== PREVIEW (not valid JSON, showing structure only) ===
{
items: [
{ id: 1, name: "First", data: { /* 3 more */ } },
/* 47 more */
]
}
=== END PREVIEW ===
Example - Preview data before processing (RECOMMENDED first step):
[
{"type": "tool", "name": "fetch", "server": "api", "args": {"url": "..."}},
{"type": "preview", "chars": 2000}
]
Example - Chain tools with data transformation:
[
{"type": "tool", "name": "get_data", "server": "database", "args": {"table": "users"}},
{"type": "command", "command": "jq", "args": ["-c", ".[] | select(.active == true)"]},
{"type": "tool", "name": "send_notification", "server": "notifications", "args": {"channel": "admin"}}
]
Example - Process multiple items with for_each:
[
{"type": "tool", "name": "list_users", "server": "api", "args": {}},
{"type": "command", "command": "jq", "args": ["-c", ".users[] | {user_id: .id}"]},
{"type": "tool", "name": "get_profile", "server": "api", "for_each": true}
]
Example - Get orders, enrich with customer data, format as CSV:
[
{"type": "tool", "name": "list_orders", "server": "sales", "args": {"status": "pending"}},
{"type": "command", "command": "jq", "args": ["-c", ".orders[] | {customer_id: .customer_id, amount: .total}"]},
{"type": "tool", "name": "get_customer", "server": "crm", "for_each": true},
{"type": "command", "command": "jq", "args": ["-r", "[.name, .email, .region] | @csv"]},
{"type": "command", "command": "sort", "args": []},
{"type": "command", "command": "uniq", "args": ["-c"]}
]
How for_each works:
- Requires JSONL input (one JSON object per line from jq)
- Calls the tool once per line, collecting all results into an array
- IMPORTANT: Use jq to extract ONLY the fields the tool accepts
Example: If get_profile only accepts {user_id: "..."}, use jq to create that exact structure
This avoids "unexpected additional properties" errors from automatic merging
Best Practices:
- Use preview stages to inspect data BEFORE writing jq filters
- Build complete workflows as single pipelines (don't split unnecessarily)
- Check list_all_tools first to see what's available
- Use get_tool_details(server, tool_name) to see exact tool parameters/schema
- Use jq to filter data to match the tool's expected fields (prevents schema errors)
- Use for_each to process collections item-by-item (results collected into array)
"""
return await engine.execute_pipeline(pipeline)
async def _list_all_tools_impl() -> str:
"""Implementation of list_all_tools (extracted for testing)"""
# Discover ToolHive connection to avoid assuming default ports
try:
from toolhive_client import discover_toolhive_async
host, port = await discover_toolhive_async()
tools_list = await mcp_client.list_tools(host=host, port=port)
except Exception:
# Fallback to defaults if discovery fails
tools_list = await mcp_client.list_tools()
if not tools_list:
return "No MCP servers found"
result = []
for server in tools_list:
workload = server.get("workload", "unknown")
status = server.get("status", "unknown")
tools = server.get("tools", [])
error = server.get("error")
# Skip self-reference workloads (orchestrator)
if status == "skipped" and error and "orchestrator" in error:
continue
result.append(f"\n**{workload}**")
result.append(f" Status: {status}")
if tools:
for tool in tools:
if isinstance(tool, dict):
tool_name = tool.get("name", "unknown")
description = tool.get("description", "")
# Truncate description: replace newlines with spaces, limit to 200 chars
if description:
description = description.replace("\n", " ").replace("\r", " ")
if len(description) > 200:
description = description[:200] + "..."
result.append(f" - {tool_name}: {description}")
else:
result.append(f" - {tool_name}")
else:
# Backwards compatibility: if tools is just a list of names
result.append(f" - {tool}")
if error:
result.append(f" Error: {error}")
return "\n".join(result)
@mcp.tool()
async def list_all_tools() -> str:
"""
List all tools available from all MCP servers running through ToolHive.
Shows tool names with brief descriptions. Use get_tool_details() to see full descriptions
and parameter schemas for a specific tool.
Use execute_pipeline to coordinate multiple tool calls for data processing workflows.
"""
return await _list_all_tools_impl()
async def _get_tool_details_impl(server: str, tool_name: str) -> str:
"""Implementation of get_tool_details (extracted for testing)"""
import json
details = await mcp_client.get_tool_details_from_server(server, tool_name)
if "error" in details:
return f"Error: {details['error']}"
result = []
result.append(f"Tool: {details.get('name', 'unknown')}")
result.append(
f"\nDescription:\n{details.get('description', 'No description available')}"
)
result.append("\nInput Schema:")
result.append(json.dumps(details.get("inputSchema", {}), indent=2))
return "\n".join(result)
@mcp.tool()
async def get_tool_details(server: str, tool_name: str) -> str:
"""
Get detailed information about a specific tool including its full description and parameter schema.
Args:
server: The MCP server/workload name (e.g., "fetch", "filesystem")
tool_name: The name of the tool to get details for
Returns detailed information including:
- Full description
- Input schema (JSON Schema describing required and optional parameters)
"""
return await _get_tool_details_impl(server, tool_name)
if __name__ == "__main__":
import os
import shutil
import sys
# Require bubblewrap (bwrap) for sandboxing child processes
# Refuse to start the server if it's not available
if shutil.which("bwrap") is None:
sys.stderr.write(
"Error: bubblewrap (bwrap) is required but was not found in PATH.\n"
"Please install bubblewrap. On Debian/Ubuntu: 'apt-get install bubblewrap'.\n"
"On macOS, run inside a Linux VM/container with bubblewrap installed.\n"
)
sys.exit(1)
# Check if running in container (ToolHive will manage thv serve)
# If TOOLHIVE_HOST is set, we're in container mode and shouldn't start thv serve
in_container = os.environ.get("TOOLHIVE_HOST") is not None
if not in_container:
# Local development mode: Initialize ToolHive client - starts thv serve and lists workloads
toolhive_client.initialize()
else:
# Container mode: Skip ToolHive discovery during startup
# Tools will be discovered dynamically when execute_pipeline is called
print(
"Running in container mode - ToolHive connection will be established on first tool use\n"
)
# Run the MCP server with HTTP transport
# Check if --transport argument is provided
transport = "streamable-http" # Default to streamable-http for HTTP access
# Defaults; may be overridden by CLI or env
port = 8000
host = (
"0.0.0.0" if in_container else "127.0.0.1"
) # Bind to 0.0.0.0 in container for external access
# CLI args take precedence over env
for i, arg in enumerate(sys.argv):
if arg == "--transport" and i + 1 < len(sys.argv):
transport = sys.argv[i + 1]
elif arg == "--port" and i + 1 < len(sys.argv):
port = int(sys.argv[i + 1])
elif arg == "--host" and i + 1 < len(sys.argv):
host = sys.argv[i + 1]
# If not set via CLI, allow env overrides
if "--port" not in sys.argv:
mcp_port_env = os.environ.get("MCP_PORT")
if mcp_port_env:
try:
port = int(mcp_port_env)
except ValueError:
sys.stderr.write(
f"Warning: MCP_PORT must be an integer, got '{mcp_port_env}'. Using default/CLI value {port}.\n"
)
if "--host" not in sys.argv:
mcp_host_env = os.environ.get("MCP_HOST")
if mcp_host_env and not in_container:
host = mcp_host_env
if transport == "stdio":
mcp.run(transport="stdio")
else:
endpoint = "/sse" if transport == "sse" else "/mcp"
print(f"\n🚀 Starting Model Context Shell on http://{host}:{port}{endpoint}")
print(f" Transport: {transport}")
print(f" Bind address: {host}")
print(f" Connect via: http://localhost:{port}{endpoint}\n")
Transport = Literal["stdio", "sse", "streamable-http"]
mcp.run(transport=cast(Transport, transport), host=host, port=port)