|
1 | 1 | import os |
2 | 2 | import logging |
3 | 3 | import asyncio |
| 4 | +import tempfile |
| 5 | +import json |
| 6 | +import re |
4 | 7 |
|
5 | 8 | from dotenv import load_dotenv |
6 | 9 | from slack_bolt.async_app import AsyncApp |
7 | 10 | from slack_bolt.adapter.socket_mode.aiohttp import AsyncSocketModeHandler |
8 | 11 | from slack_sdk.web.async_client import AsyncWebClient |
9 | 12 | from pydantic_ai import Agent |
10 | 13 | from pydantic_ai.mcp import MCPServerStdio |
| 14 | +import altair as alt |
| 15 | +import vl_convert as vlc |
11 | 16 |
|
12 | 17 | # Load environment variables from .env file |
13 | 18 | load_dotenv() |
|
30 | 35 | logging.basicConfig(level=logging.INFO) |
31 | 36 |
|
32 | 37 | # --- MCP SERVER AND AGENT SETUP --- |
33 | | -mcp_server = MCPServerStdio( |
| 38 | +clickhouse_server = MCPServerStdio( |
34 | 39 | 'uv', |
35 | 40 | args=[ |
36 | 41 | 'run', |
37 | 42 | '--with', 'mcp-clickhouse', |
38 | 43 | '--python', '3.13', |
39 | 44 | 'mcp-clickhouse' |
40 | 45 | ], |
41 | | - env=CLICKHOUSE_ENV |
| 46 | + env=CLICKHOUSE_ENV, |
| 47 | + timeout=30.0 |
42 | 48 | ) |
43 | 49 |
|
44 | 50 | agent = Agent( |
45 | 51 | "anthropic:claude-sonnet-4-0", |
46 | | - mcp_servers=[mcp_server], |
47 | | - system_prompt="You are a data assistant. You have access to a ClickHouse database from which you can answer the user's questions. You have tools available to you that let you explore the database, e.g. to list available databases, tables, etc., and to execute SQL queries against them. Use these tools to answer the user's questions. You must always answer the user's questions by using the available tools. If the database cannot help you, say so. You must include a summary of how you came to your answer: e.g. which data you used and how you queried it." |
| 52 | + mcp_servers=[clickhouse_server], |
| 53 | + system_prompt="""You are a data assistant with visualization capabilities. You have access to a ClickHouse database and can create charts from query results. |
| 54 | +
|
| 55 | +Available capabilities: |
| 56 | +1) ClickHouse tools to explore databases, tables, and execute SQL queries |
| 57 | +2) Chart generation by providing Vega-Lite specifications |
| 58 | +
|
| 59 | +When users ask for data analysis with visualizations: |
| 60 | +1. First query the database using available tools |
| 61 | +2. If a visualization would be helpful, create a Vega-Lite chart specification |
| 62 | +3. Format your Vega-Lite spec as JSON within ```json blocks |
| 63 | +4. Choose appropriate chart types: bar charts for categories, line charts for time series, scatter for correlations, pie for proportions |
| 64 | +
|
| 65 | +Example Vega-Lite specification format: |
| 66 | +```json |
| 67 | +{ |
| 68 | + "$schema": "https://vega.github.io/schema/vega-lite/v5.json", |
| 69 | + "title": "Chart Title", |
| 70 | + "data": {"values": [{"category": "A", "value": 100}, {"category": "B", "value": 200}]}, |
| 71 | + "mark": "bar", |
| 72 | + "encoding": { |
| 73 | + "x": {"field": "category", "type": "nominal"}, |
| 74 | + "y": {"field": "value", "type": "quantitative"} |
| 75 | + } |
| 76 | +} |
| 77 | +``` |
| 78 | +
|
| 79 | +Always include a summary of your approach: what data you used, how you queried it, and why you chose a specific visualization.""" |
48 | 80 | ) |
49 | 81 |
|
50 | 82 | app = AsyncApp(token=SLACK_BOT_TOKEN) |
51 | 83 |
|
| 84 | +async def render_and_upload_chart(client, channel, thread_ts, vega_lite_spec, title="Chart"): |
| 85 | + """Render Vega-Lite spec to PNG and upload to Slack""" |
| 86 | + try: |
| 87 | + # Parse the Vega-Lite specification |
| 88 | + if isinstance(vega_lite_spec, str): |
| 89 | + spec = json.loads(vega_lite_spec) |
| 90 | + else: |
| 91 | + spec = vega_lite_spec |
| 92 | + |
| 93 | + # Render to PNG using vl-convert |
| 94 | + png_data = vlc.vegalite_to_png(spec) |
| 95 | + |
| 96 | + # Create temporary file |
| 97 | + with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: |
| 98 | + tmp_file.write(png_data) |
| 99 | + tmp_file.flush() |
| 100 | + |
| 101 | + # Upload file to Slack |
| 102 | + response = await client.files_upload_v2( |
| 103 | + channel=channel, |
| 104 | + file=tmp_file.name, |
| 105 | + title=title, |
| 106 | + thread_ts=thread_ts |
| 107 | + ) |
| 108 | + |
| 109 | + # Clean up temp file |
| 110 | + os.unlink(tmp_file.name) |
| 111 | + return response |
| 112 | + |
| 113 | + except Exception as e: |
| 114 | + logging.error(f"Error rendering and uploading chart: {e}") |
| 115 | + return None |
| 116 | + |
| 117 | +def extract_vega_lite_specs(text): |
| 118 | + """Extract Vega-Lite JSON specifications from text""" |
| 119 | + # Look for JSON blocks that contain Vega-Lite specs |
| 120 | + json_pattern = r'```json\s*(\{.*?\})\s*```' |
| 121 | + matches = re.findall(json_pattern, text, re.DOTALL) |
| 122 | + |
| 123 | + specs = [] |
| 124 | + for match in matches: |
| 125 | + try: |
| 126 | + spec = json.loads(match) |
| 127 | + # Check if it looks like a Vega-Lite spec |
| 128 | + if "$schema" in spec and "vega" in spec["$schema"]: |
| 129 | + specs.append(spec) |
| 130 | + except json.JSONDecodeError: |
| 131 | + continue |
| 132 | + |
| 133 | + return specs |
| 134 | + |
52 | 135 | async def handle_slack_query(event, say): |
53 | 136 | user = event["user"] |
54 | 137 | text = event.get("text", "") |
@@ -81,7 +164,25 @@ async def do_agent(): |
81 | 164 |
|
82 | 165 | async with agent.run_mcp_servers(): |
83 | 166 | result = await agent.run(prompt) |
84 | | - await say(text=f"{result.output}", thread_ts=thread_ts) |
| 167 | + |
| 168 | + # Check if the response contains Vega-Lite chart specifications |
| 169 | + response_text = result.output |
| 170 | + client = AsyncWebClient(token=SLACK_BOT_TOKEN) |
| 171 | + |
| 172 | + # Extract Vega-Lite specifications from the response |
| 173 | + vega_specs = extract_vega_lite_specs(response_text) |
| 174 | + |
| 175 | + if vega_specs: |
| 176 | + # Render and upload each chart found |
| 177 | + for i, spec in enumerate(vega_specs): |
| 178 | + chart_title = spec.get("title", f"Chart {i+1}" if len(vega_specs) > 1 else "Chart") |
| 179 | + await render_and_upload_chart(client, channel, thread_ts, spec, chart_title) |
| 180 | + |
| 181 | + # Remove JSON blocks from text response to avoid clutter |
| 182 | + clean_text = re.sub(r'```json\s*\{.*?\}\s*```', '[Chart uploaded above]', response_text, flags=re.DOTALL) |
| 183 | + await say(text=clean_text, thread_ts=thread_ts) |
| 184 | + else: |
| 185 | + await say(text=response_text, thread_ts=thread_ts) |
85 | 186 |
|
86 | 187 | asyncio.create_task(do_agent()) |
87 | 188 |
|
|
0 commit comments