-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathagent_example.py
More file actions
104 lines (83 loc) · 3.11 KB
/
agent_example.py
File metadata and controls
104 lines (83 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Example: OrionBelt Analytics agent using Google ADK with MCP.
Prerequisites:
pip install google-adk
Start OrionBelt Analytics MCP server first:
uv run server.py
Then run this script:
export GOOGLE_API_KEY=...
python agent_example.py
"""
from __future__ import annotations
import asyncio
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool import MCPToolset
from google.genai import types
INSTRUCTIONS = """\
You are a database intelligence assistant powered by OrionBelt Analytics.
You help users analyze database schemas, generate ontologies, and write
safe SQL queries with fan-trap prevention.
Workflow:
1. Call connect_database to establish a connection (postgresql, snowflake,
dremio, clickhouse, or mysql).
2. Call list_schemas to discover available schemas.
3. Call analyze_schema to get the schema structure with relationships.
4. Use get_table_details for deep-dive into specific tables.
5. Call generate_ontology to create RDF/OWL ontology with SQL mappings.
6. Use validate_sql_syntax before running queries.
7. Use execute_sql_query to run validated SQL (set checklist_completed=true).
8. Use generate_chart to visualize query results.
9. Use graphrag_query_context for intelligent schema discovery.
Rules:
- Always validate SQL before execution using validate_sql_syntax.
- Set checklist_completed=true when calling execute_sql_query after validation.
- Fan-trap warnings must be resolved before executing multi-fact queries.
- Present SQL in code blocks. Explain ontology triples in plain language.
"""
MCP_SERVER_URL = "http://localhost:9000/mcp"
async def main() -> None:
tools, cleanup = await MCPToolset.from_server(
connection_params={"url": MCP_SERVER_URL},
)
agent = Agent(
name="orionbelt_analyst",
model="gemini-2.0-flash",
instruction=INSTRUCTIONS,
tools=tools,
)
session_service = InMemorySessionService()
session = await session_service.create_session(
app_name="orionbelt_analyst",
user_id="user",
)
runner = Runner(
agent=agent,
app_name="orionbelt_analyst",
session_service=session_service,
)
print("OrionBelt Analytics Agent (type 'quit' to exit)\n")
try:
while True:
user_input = input("You: ").strip()
if user_input.lower() in ("quit", "exit", "q"):
break
if not user_input:
continue
content = types.Content(
role="user",
parts=[types.Part(text=user_input)],
)
final_text = ""
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
final_text = event.content.parts[0].text
print(f"\nAssistant: {final_text}\n")
finally:
await cleanup()
if __name__ == "__main__":
asyncio.run(main())