Skip to content

Commit 688df97

Browse files
authored
feat: data analysis demo (#29)
2 parents ab0df6c + 52480e1 commit 688df97

File tree

14 files changed

+1138
-0
lines changed

14 files changed

+1138
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# AgentKit configuration
2+
agentkit.yaml
3+
agentkit*.yaml
4+
5+
# Python cache
6+
__pycache__/
7+
*.py[cod]
8+
*$py.class
9+
10+
# Virtual environments
11+
.venv/
12+
venv/
13+
ENV/
14+
env/
15+
16+
# IDE
17+
.vscode/
18+
.idea/
19+
.windsurf/
20+
21+
# Git
22+
.git/
23+
.gitignore
24+
25+
# Docker
26+
Dockerfile*
27+
.dockerignore
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Data Analysis with Code Project
2+
3+
## 项目概述
4+
5+
这是一个基于 LanceDB 构建的数据分析框架,使用 IMDB 数据集,支持结构化数据查询、非结构化混合检索、元数据搜索和非结构化数据处理等多种功能。
6+
7+
## 架构
8+
9+
### 技术栈
10+
11+
- **LanceDB**: 用于高效元数据搜数、向量和结构化数据存储与检索
12+
- **DuckDB**: 用于结构化数据的 SQL 查询
13+
- **LAS**: 用于非结构化数据的处理和生成,如视频生成
14+
15+
### 核心组件
16+
17+
#### 1. 结构化数据查询 (`duckdb_sql_execution.py`)
18+
19+
- **功能**: 允许用户通过 SQL 语句查询结构化数据
20+
- **技术**: 基于 DuckDB 数据库引擎
21+
- **应用场景**: 执行传统的结构化数据查询、过滤和聚合操作
22+
23+
#### 2. 非结构化混合检索 (`lancedb_hybrid_execution.py`)
24+
25+
- **功能**: 支持将结构化查询与向量检索相结合,实现混合查询
26+
- **技术**: 基于 LanceDB 的向量检索能力
27+
- **应用场景**: 处理同时包含结构化属性和非结构化内容的查询需求
28+
29+
#### 3. 元数据搜索 (`catalog_discovery.py`)
30+
31+
- **功能**: 提供数据集元数据的搜索和发现功能
32+
- **技术**: 基于目录结构的元数据管理
33+
- **应用场景**: 帮助用户了解可用数据集的结构和内容
34+
35+
#### 4. 非结构化数据处理 (`video_generation.py`)
36+
37+
- **功能**: 支持将非结构化数据(如图片)转换为视频
38+
- **技术**: 基于视频生成算法
39+
- **应用场景**: 实现图片到视频的转换功能
40+
41+
## 数据集说明
42+
43+
本项目使用 IMDB 数据集,包含以下两个主要组成部分:
44+
45+
### 1. 元数据表
46+
47+
提供数据集的整体描述和结构信息,帮助用户了解可用的数据资源,其中包含每一列的描述、数据类型、样例值和可能的取值范围。
48+
49+
### 2. IMDB 电影数据表 (`imdb_top_1000`)
50+
51+
包含 1000 部电影的详细信息,主要字段包括:
52+
53+
| 字段名 | 类型 | 描述 |
54+
| ----------------------- | ------ | ------------------------------------------------------------------------ |
55+
| `series_title` | 字符串 | 电影标题 |
56+
| `released_year` | 字符串 | 上映年份(注意:虽然是年份数字,但为字符串类型,比较操作需用单引号包裹) |
57+
| `director` | 字符串 | 导演 |
58+
| `genre` | 字符串 | 电影类型 |
59+
| `imdb_rating` | 浮点数 | IMDB 评分 |
60+
| `poster_curde_link` | 字符串 | 电影缩略图海报链接 |
61+
| `poster_precision_link` | 字符串 | 电影海报高清链接 |
62+
63+
## 配置流程
64+
65+
### 1. 配置文件设置
66+
67+
编辑 `data_analysis_with_code/agentkit_deploy/settings.txt` 文件,可选配置以下:
68+
69+
```
70+
MODEL_AGENT_API_KEY=your_api_key_here
71+
VOLCENGINE_ACCESS_KEY=your_ak
72+
VOLCENGINE_SECRET_KEY=your_sk
73+
74+
```
75+
76+
### 2. 项目部署
77+
78+
```bash
79+
uv python install 3.12
80+
uv venv -p 3.12 .venv
81+
source .venv/bin/activate
82+
uv pip install -r requirements.txt
83+
84+
# veadk运行
85+
veadk web
86+
87+
# 在agentkit上运行
88+
agentkit config --tos_bucket <your bucket name>
89+
agentkit launch
90+
```
91+
92+
## 客户端连接
93+
94+
### 方式一:使用 Python 客户端
95+
96+
当使用 agentkit 运行时可以通过 client 进行连接
97+
98+
```bash
99+
python client.py
100+
```
101+
102+
### 方式二:使用 Web 界面
103+
104+
```bash
105+
streamlit run web/app.py
106+
```
107+
108+
## 示例问题
109+
110+
1. **Q1: 你有哪些数据?**
111+
2. **Q2: 给我一些样例数据?**
112+
3. **Q3: Ang Lee 评分超过 7 分的有哪些电影?**
113+
4. **Q4: Ang Lee 评分超过 7 分的电影中,有哪个电影海报中含有动物?**
114+
5. **Q5: Life of Pi 的电影海报,变成视频**
115+
6. **Q6: 帮我找一张海报里有红色机车的电影,并把它做成视频**
116+
117+
## 运行流程
118+
119+
当用户提出问题时,系统将遵循以下流程处理:
120+
121+
1. **搜数阶段 (Discovery)**:调用 `catalog_discovery` 工具确认可用的表名和字段信息。
122+
2. **数据分析阶段 (Query)**
123+
- 对于结构化统计或过滤查询,调用 `duckdb_sql_execution` 工具执行 SQL 查询
124+
- 对于语义、视觉或混合检索查询,调用 `lancedb_hybrid_execution` 工具执行向量检索
125+
- 对于图生视频等非结构化数据处理,调用 `video_generation` 工具执行相应操作
126+
3. **结果处理阶段 (Result Handling)**
127+
- 如果结果为空 `[]`,直接回答用户"未找到"
128+
- 如果结果正常,立即返回最终答案
129+
130+
## 文件结构
131+
132+
```
133+
data_analysis_with_code/
134+
├── agent.py
135+
├── prompts.py
136+
├── requirements.txt
137+
├── settings.txt
138+
└── tools/
139+
├── catalog_discovery.py
140+
├── duckdb_sql_execution.py
141+
├── lancedb_hybrid_execution.py
142+
└── video_generation.py
143+
├── client.py
144+
└── web/
145+
└── app.py
146+
```
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# 导入所有必要的模块
2+
import os
3+
import sys
4+
import logging
5+
from pathlib import Path
6+
from dotenv import load_dotenv
7+
8+
9+
# 将当前目录添加到sys.path以便本地模块导入
10+
sys.path.append(str(Path(__file__).resolve().parent))
11+
# 加载 settings.txt(dotenv 格式)
12+
load_dotenv(
13+
dotenv_path=str(Path(__file__).resolve().parent / "settings.txt"), override=False
14+
)
15+
16+
# 导入veadk和agentkit相关模块
17+
from veadk import Agent, Runner # noqa: E402
18+
from veadk.auth.veauth.ark_veauth import get_ark_token # noqa: E402
19+
from veadk.memory.short_term_memory import ShortTermMemory # noqa: E402
20+
from veadk.tools.builtin_tools.video_generate import video_generate # noqa: E402
21+
from agentkit.apps import AgentkitAgentServerApp # noqa: E402
22+
23+
# 导入本地模块
24+
from tools.catalog_discovery import catalog_discovery # noqa: E402
25+
from tools.duckdb_sql_execution import duckdb_sql_execution # noqa: E402
26+
from tools.lancedb_hybrid_execution import lancedb_hybrid_execution # noqa: E402
27+
from prompts import SYSTEM_PROMPT # noqa: E402
28+
29+
# Check if MODEL_AGENT_API_KEY environment variable exists and is not empty
30+
if "MODEL_AGENT_API_KEY" not in os.environ or not os.environ["MODEL_AGENT_API_KEY"]:
31+
os.environ["MODEL_AGENT_API_KEY"] = get_ark_token()
32+
# Optionally assign to a variable for easier use in the file
33+
MODEL_AGENT_API_KEY = os.environ["MODEL_AGENT_API_KEY"]
34+
35+
short_term_memory = ShortTermMemory(backend="local")
36+
37+
# 设置日志
38+
logging.basicConfig(
39+
level=logging.INFO,
40+
format="%(asctime)s - %(levelname)s - %(message)s",
41+
)
42+
43+
# --- Logging Configuration ---
44+
logger = logging.getLogger(__name__)
45+
46+
tools = [
47+
catalog_discovery,
48+
duckdb_sql_execution,
49+
lancedb_hybrid_execution,
50+
video_generate,
51+
]
52+
53+
# 创建带记忆的 Agent
54+
model_name = os.getenv(
55+
"MODEL_AGENT_NAME", "doubao-seed-1-6-251015"
56+
) # 默认使用更主流的豆包模型
57+
root_agent = Agent(
58+
description="基于LanceDB的数据检索Agent,支持结构化和向量查询。典型问题包括:1.你有哪些数据?2.给我一些样例数据?3.Ang Lee 评分超过7分的有哪些电影?4.Ang Lee 评分超过7分的电影中,有哪个电影海报中含有动物?5.Life of Pi 的电影海报,变成视频",
59+
instruction=SYSTEM_PROMPT,
60+
model_name=model_name,
61+
tools=tools,
62+
short_term_memory=short_term_memory,
63+
)
64+
65+
runner = Runner(agent=root_agent)
66+
67+
# a2a_app = AgentkitA2aApp()
68+
69+
# @a2a_app.agent_executor(runner=runner)
70+
# class MyAgentExecutor(A2aAgentExecutor):
71+
# pass
72+
73+
# # 当直接运行此文件时,启动本地服务
74+
# if __name__ == "__main__":
75+
# logger.info("🚀 正在启动 A2A Agent 服务...")
76+
# a2a_app.run(
77+
# agent_card=get_agent_card(agent=root_agent, url="http://127.0.0.1:8000"),
78+
# host="0.0.0.0",
79+
# port=8000,
80+
# )
81+
82+
agent_server_app = AgentkitAgentServerApp(
83+
agent=root_agent,
84+
short_term_memory=short_term_memory,
85+
)
86+
87+
if __name__ == "__main__":
88+
agent_server_app.run(host="0.0.0.0", port=8000)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import requests
2+
import httpx
3+
import random
4+
5+
from google.adk.cli.adk_web_server import CreateSessionRequest, RunAgentRequest
6+
from google.genai.types import Content, Part
7+
import asyncio
8+
9+
10+
if __name__ == "__main__":
11+
# Step 0: setup running configs
12+
app_name = "data_analysis_with_code"
13+
user_id = "agentkit_user"
14+
session_id = "agentkit_sample_session"
15+
base_url = ""
16+
api_key = ""
17+
18+
task_num = 1
19+
20+
# Step 1: create a session
21+
def create_session():
22+
create_session_request = CreateSessionRequest(
23+
session_id=session_id + f"_{random.randint(1, 9999)}",
24+
)
25+
26+
response = requests.post(
27+
url=f"{base_url}/apps/{app_name}/users/{user_id}/sessions/{create_session_request.session_id}",
28+
headers={"Authorization": f"Bearer {api_key}"},
29+
)
30+
31+
print(f"[create session] Response from server: {response.json()}")
32+
33+
return create_session_request.session_id
34+
35+
# Step 2: run agent with SSE
36+
run_agent_request = RunAgentRequest(
37+
app_name=app_name,
38+
user_id=user_id,
39+
session_id=create_session(),
40+
new_message=Content(
41+
parts=[Part(text="Ang Lee的电影评分超过7分,有哪些电影海报包含动物")],
42+
role="user",
43+
),
44+
stream=True,
45+
)
46+
47+
print("[run agent] Event from server:")
48+
49+
# 3. Handle streaming events
50+
async def send_request(message: str):
51+
run_agent_request = RunAgentRequest(
52+
app_name=app_name,
53+
user_id=user_id,
54+
session_id=create_session(),
55+
new_message=Content(parts=[Part(text=message)], role="user"),
56+
stream=True,
57+
)
58+
59+
with httpx.stream(
60+
"POST",
61+
f"{base_url}/run_sse",
62+
json=run_agent_request.model_dump(exclude_none=True),
63+
timeout=120,
64+
headers={"Authorization": f"Bearer {api_key}"},
65+
) as r:
66+
for line in r.iter_lines():
67+
print(line)
68+
69+
async def send_request_parallel():
70+
tasks = [
71+
send_request("Ang Lee的电影评分超过7分,有哪些电影海报包含动物")
72+
for _ in range(task_num)
73+
]
74+
await asyncio.gather(*tasks)
75+
76+
asyncio.run(send_request_parallel())

0 commit comments

Comments
 (0)