-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
344 lines (276 loc) · 13.2 KB
/
main.py
File metadata and controls
344 lines (276 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""
MCP Code Execution Example
Implements the efficient pattern from:
"Code execution with MCP: Building more efficient agents"
Key concepts demonstrated:
1. Progressive Disclosure - Agent explores filesystem to find tools
2. On-Demand Loading - Agent reads only the tool definitions it needs
3. Code Generation - LLM generates code that imports specific tools
4. Local Data Processing - Data stays in execution environment
5. Summary Return - Only results flow back to model context
Trace structure:
Trace: efficient-mcp-workflow
├─ Span: explore_servers ← Agent discovers available servers
├─ Span: read_tool_definition (x2) ← Agent reads only needed tools
├─ Generation: generate_code ← LLM generates tool-calling code
└─ Span: code_execution ← Code runs in sandbox
├─ Span: mcp.google-drive.get_sheet
└─ Span: mcp.salesforce.batch_update
"""
import os
import logging
import argparse
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv()
from langfuse import get_client, observe
from mcp_client import (
explore_servers,
read_tool,
execute_agent_code,
set_mcp_tracing,
)
# Initialize clients
langfuse = get_client()
anthropic = Anthropic()
# Register tool implementations
from servers.google_drive import register as register_gdrive
from servers.salesforce import register as register_salesforce
register_gdrive()
register_salesforce()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@observe(name="efficient-mcp-workflow",as_type="agent")
def run_efficient_workflow() -> dict:
"""
Demonstrates the efficient MCP pattern.
Instead of loading all tool definitions upfront (expensive),
the agent:
1. Explores the filesystem to see what's available
2. Reads only the specific tools it needs
3. Generates code that processes data locally
4. Returns only a summary to the model
"""
# ==========================================================================
# STEP 1: PROGRESSIVE DISCLOSURE - Explore available tools
# ==========================================================================
# The agent first explores what MCP servers are available.
# This is like running `ls servers/` - very cheap in tokens.
print("\n" + "=" * 60)
print("STEP 1: PROGRESSIVE DISCLOSURE")
print("=" * 60)
print("\nAgent explores available MCP servers...")
server_structure = explore_servers()
print(f"\n{server_structure}")
langfuse.flush() # Ensure correct trace ordering
# Token cost: ~50 tokens (just the directory listing)
# vs ~10,000+ tokens if we loaded all tool definitions upfront
# ==========================================================================
# STEP 2: ON-DEMAND TOOL LOADING
# ==========================================================================
# The agent reads ONLY the tools it needs for this specific task.
# Not all 4 tools - just 2.
print("\n" + "=" * 60)
print("STEP 2: ON-DEMAND TOOL LOADING")
print("=" * 60)
print("\nAgent reads only the tools it needs...")
# Read get_sheet definition (~200 tokens)
get_sheet_def = read_tool("google_drive", "get_sheet")
print(f"\n--- google_drive/get_sheet.py ---")
print(get_sheet_def[:500] + "..." if len(get_sheet_def) > 500 else get_sheet_def)
# Read batch_update definition (~200 tokens)
batch_update_def = read_tool("salesforce", "batch_update")
print(f"\n--- salesforce/batch_update.py ---")
print(batch_update_def[:500] + "..." if len(batch_update_def) > 500 else batch_update_def)
langfuse.flush() # Ensure correct trace ordering
# Token cost: ~400 tokens for 2 tools
# vs ~800+ tokens if we loaded all 4 tools
# ==========================================================================
# STEP 3: LLM GENERATES CODE
# ==========================================================================
# The model generates code that:
# - Imports only the tools it needs
# - Processes data locally (filtering, transforming)
# - Returns only a summary
print("\n" + "=" * 60)
print("STEP 3: LLM CODE GENERATION")
print("=" * 60)
task = """
Fetch Q4 sales data from Google Sheets (sheet_id: "sales-q4-2024").
Filter to only "completed" orders under $10,000.
Update these records in Salesforce as "Lead" objects.
Return a summary with counts.
"""
code_gen_prompt = f"""You have access to MCP tools via the servers/ directory.
Available tools (from filesystem exploration):
{server_structure}
Tool definitions you need:
{get_sheet_def}
{batch_update_def}
Task: {task}
Write Python code that:
1. Calls MCP tools using call_mcp_tool("server.tool_name", input_data)
- For google_drive tools: call_mcp_tool("google_drive.get_sheet", {{"sheet_id": "..."}})
- For salesforce tools: call_mcp_tool("salesforce.batch_update", {{"object_type": "...", "records": [...]}})
2. Processes data LOCALLY (filtering happens in code, not through model)
3. Sets a `result` variable with ONLY the summary (counts, not raw data)
The code runs in a sandbox with call_mcp_tool() available.
IMPORTANT: call_mcp_tool() is SYNCHRONOUS - do NOT use async/await.
Return ONLY the code, no markdown."""
messages = [{"role": "user", "content": code_gen_prompt}]
with langfuse.start_as_current_observation(
as_type="generation",
name="generate_code",
model="claude-sonnet-4-20250514",
input=messages, # Full prompt for proper display in Langfuse UI
metadata={"task": task, "tools_loaded": 2},
) as gen:
response = anthropic.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=messages
)
generated_code = response.content[0].text
# Clean markdown if present
if "```python" in generated_code:
generated_code = generated_code.split("```python")[1].split("```")[0]
elif "```" in generated_code:
generated_code = generated_code.split("```")[1].split("```")[0]
generated_code = generated_code.strip()
# Build usage details - include cache tokens if available
usage_details = {
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
}
# Add cache tokens if present (Anthropic prompt caching)
if hasattr(response.usage, 'cache_creation_input_tokens') and response.usage.cache_creation_input_tokens:
usage_details["cache_creation_input_tokens"] = response.usage.cache_creation_input_tokens
if hasattr(response.usage, 'cache_read_input_tokens') and response.usage.cache_read_input_tokens:
usage_details["cache_read_input_tokens"] = response.usage.cache_read_input_tokens
gen.update(
output=generated_code,
usage_details=usage_details,
)
code_gen_tokens = response.usage.input_tokens + response.usage.output_tokens
print(f"\nGenerated code ({code_gen_tokens} tokens):")
print("-" * 40)
print(generated_code)
print("-" * 40)
langfuse.flush() # Ensure correct trace ordering
# ==========================================================================
# STEP 4: CODE EXECUTION - Data stays local
# ==========================================================================
# The generated code runs in a sandbox.
# Data is fetched, filtered, and processed WITHOUT flowing through model.
# Only the summary result returns.
print("\n" + "=" * 60)
print("STEP 4: CODE EXECUTION (data stays local)")
print("=" * 60)
execution_result = execute_agent_code(generated_code)
if execution_result["success"]:
print(f"\nExecution successful!")
print(f"Logs:\n{execution_result['logs']}")
print(f"\nResult (what model sees): {execution_result['result']}")
else:
print(f"\nExecution failed: {execution_result['error']}")
# ==========================================================================
# TOKEN EFFICIENCY COMPARISON
# ==========================================================================
print("\n" + "=" * 60)
print("TOKEN EFFICIENCY COMPARISON")
print("=" * 60)
# Our approach
our_tokens = (
50 + # Step 1: Directory listing
400 + # Step 2: 2 tool definitions
code_gen_tokens # Step 3: Code generation
)
# Traditional approach (load all tools, pass data through model)
traditional_tokens = (
800 + # All tool definitions upfront
10000 + # 100 rows of data through model
500 # Response
)
savings = (1 - our_tokens / traditional_tokens) * 100
print(f"""
┌─────────────────────────────────────────────────────────────┐
│ TRADITIONAL APPROACH │
├─────────────────────────────────────────────────────────────┤
│ Load all tool definitions upfront → ~800 tokens │
│ Fetch 100 rows, pass through model → ~10,000 tokens │
│ Model filters and processes → ~500 tokens │
│ TOTAL: ~11,300 tokens │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ CODE EXECUTION APPROACH │
├─────────────────────────────────────────────────────────────┤
│ Explore filesystem (ls servers/) → ~50 tokens │
│ Read 2 needed tool definitions → ~400 tokens │
│ Generate code → ~{code_gen_tokens} tokens │
│ Execute code (data stays local) → 0 model tokens │
│ Return summary only → ~50 tokens │
│ TOTAL: ~{our_tokens} tokens │
└─────────────────────────────────────────────────────────────┘
Savings: {savings:.1f}%
Key insight: 100 rows of data were fetched and filtered,
but ZERO rows flowed through the model context.
Only the summary returned.
""")
trace_url = langfuse.get_trace_url()
return {
"status": "success" if execution_result["success"] else "error",
"result": execution_result.get("result"),
"tokens": {
"our_approach": our_tokens,
"traditional": traditional_tokens,
"savings_percent": round(savings, 1),
},
"trace_url": trace_url,
}
def main():
parser = argparse.ArgumentParser(description="Efficient MCP Code Execution Demo")
parser.add_argument(
"--no-mcp-tracing",
action="store_true",
help="Disable MCP tool tracing to show what observability looks like without it"
)
args = parser.parse_args()
# Configure MCP tracing based on flag
mcp_tracing_enabled = not args.no_mcp_tracing
set_mcp_tracing(mcp_tracing_enabled)
print("=" * 60)
print("EFFICIENT MCP CODE EXECUTION")
print("=" * 60)
print("\nDemonstrating the pattern from:")
print('"Code execution with MCP: Building more efficient agents"')
if not mcp_tracing_enabled:
print("\n⚠️ MCP TRACING DISABLED - Tool calls will be invisible in trace")
result = run_efficient_workflow()
langfuse.flush()
print("\n" + "=" * 60)
print("TRACE STRUCTURE")
print("=" * 60)
if mcp_tracing_enabled:
print("""
Agent: efficient-mcp-workflow
├─ Tool: explore_servers ← Progressive disclosure
├─ Tool: read_tool_definition ← On-demand loading
├─ Tool: read_tool_definition ← On-demand loading
├─ Generation: generate_code ← LLM generates code
└─ Span: code_execution ← Sandbox execution
├─ Tool: mcp.google_drive.get_sheet ← MCP tool call
└─ Tool: mcp.salesforce.batch_update ← MCP tool call
""")
else:
print("""
Agent: efficient-mcp-workflow
├─ Tool: explore_servers ← Progressive disclosure
├─ Tool: read_tool_definition ← On-demand loading
├─ Tool: read_tool_definition ← On-demand loading
├─ Generation: generate_code ← LLM generates code
└─ Span: code_execution ← Sandbox execution
└─ (MCP tool calls NOT visible - tracing disabled)
""")
print(f"\nView trace: {result['trace_url']}")
if __name__ == "__main__":
main()