Skip to content

Commit 407eceb

Browse files
committed
✨ 1. Knowledge base tracing results support downloading.
2. File download requests are made to the backend interface, instead of directly accessing Minio.
1 parent f3f453b commit 407eceb

File tree

1 file changed

+313
-0
lines changed

1 file changed

+313
-0
lines changed
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
import json
2+
import logging
3+
from typing import List, Optional
4+
5+
import httpx
6+
from pydantic import Field
7+
from smolagents.tools import Tool
8+
9+
from ..utils.observer import MessageObserver, ProcessType
10+
from ..utils.tools_common_message import SearchResultTextMessage, ToolCategory, ToolSign
11+
12+
13+
# Get logger instance
14+
logger = logging.getLogger("datamate_search_tool")
15+
16+
17+
class DataMateSearchTool(Tool):
18+
"""DataMate knowledge base search tool"""
19+
name = "datamate_search_tool"
20+
description = (
21+
"Performs a DataMate knowledge base search based on your query then returns the top search results. "
22+
"A tool for retrieving domain-specific knowledge, documents, and information stored in the DataMate knowledge base. "
23+
"Use this tool when users ask questions related to specialized knowledge, technical documentation, "
24+
"domain expertise, or any information that has been indexed in the DataMate knowledge base. "
25+
"Suitable for queries requiring access to stored knowledge that may not be publicly available."
26+
)
27+
inputs = {
28+
"query": {
29+
"type": "string",
30+
"description": "The search query to perform.",
31+
},
32+
"top_k": {
33+
"type": "integer",
34+
"description": "Maximum number of search results to return.",
35+
"default": 10,
36+
"nullable": True,
37+
},
38+
"threshold": {
39+
"type": "number",
40+
"description": "Similarity threshold for search results.",
41+
"default": 0.2,
42+
"nullable": True,
43+
},
44+
"kb_page": {
45+
"type": "integer",
46+
"description": "Page index when listing knowledge bases from DataMate.",
47+
"default": 0,
48+
"nullable": True,
49+
},
50+
"kb_page_size": {
51+
"type": "integer",
52+
"description": "Page size when listing knowledge bases from DataMate.",
53+
"default": 20,
54+
"nullable": True,
55+
},
56+
}
57+
output_type = "string"
58+
category = ToolCategory.SEARCH.value
59+
60+
# Used to distinguish different index sources for summaries
61+
tool_sign = ToolSign.DATAMATE_KNOWLEDGE_BASE.value
62+
63+
def __init__(
64+
self,
65+
server_ip: str = Field(description="DataMate server IP or hostname"),
66+
server_port: int = Field(description="DataMate server port"),
67+
observer: MessageObserver = Field(description="Message observer", default=None, exclude=True),
68+
):
69+
"""Initialize the DataMateSearchTool.
70+
71+
Args:
72+
server_ip (str): DataMate server IP or hostname (without scheme).
73+
server_port (int): DataMate server port (1-65535).
74+
observer (MessageObserver, optional): Message observer instance. Defaults to None.
75+
"""
76+
super().__init__()
77+
78+
if not server_ip:
79+
raise ValueError("server_ip is required for DataMateSearchTool")
80+
81+
if not isinstance(server_port, int) or not (1 <= server_port <= 65535):
82+
raise ValueError("server_port must be an integer between 1 and 65535")
83+
84+
# Store raw host and port
85+
self.server_ip = server_ip.strip()
86+
self.server_port = server_port
87+
88+
# Build base URL: http://host:port
89+
self.server_base_url = f"http://{self.server_ip}:{self.server_port}".rstrip("/")
90+
91+
self.kb_page = 0
92+
self.kb_page_size = 20
93+
self.observer = observer
94+
95+
self.record_ops = 1 # To record serial number
96+
self.running_prompt_zh = "DataMate知识库检索中..."
97+
self.running_prompt_en = "Searching the DataMate knowledge base..."
98+
99+
def forward(
100+
self,
101+
query: str,
102+
top_k: int = 10,
103+
threshold: float = 0.2,
104+
kb_page: int = 0,
105+
kb_page_size: int = 20,
106+
) -> str:
107+
"""Execute DataMate search.
108+
109+
Args:
110+
query: Search query text.
111+
top_k: Optional override for maximum number of search results.
112+
threshold: Optional override for similarity threshold.
113+
kb_page: Optional override for knowledge base list page index.
114+
kb_page_size: Optional override for knowledge base list page size.
115+
"""
116+
117+
self.kb_page = kb_page
118+
self.kb_page_size = kb_page_size
119+
120+
# Send tool run message
121+
if self.observer:
122+
running_prompt = self.running_prompt_zh if self.observer.lang == "zh" else self.running_prompt_en
123+
self.observer.add_message("", ProcessType.TOOL, running_prompt)
124+
card_content = [{"icon": "search", "text": query}]
125+
self.observer.add_message("", ProcessType.CARD, json.dumps(card_content, ensure_ascii=False))
126+
127+
logger.info(
128+
f"DataMateSearchTool called with query: '{query}', base_url: '{self.server_base_url}', "
129+
f"top_k: {top_k}, threshold: {threshold}"
130+
)
131+
132+
try:
133+
# Step 1: Get knowledge base list
134+
knowledge_base_ids = self._get_knowledge_base_list()
135+
if not knowledge_base_ids:
136+
return json.dumps("No knowledge base found. No relevant information found.", ensure_ascii=False)
137+
138+
# Step 2: Retrieve knowledge base content
139+
kb_search_results = self._retrieve_knowledge_base_content(query, knowledge_base_ids, top_k, threshold
140+
)
141+
142+
if not kb_search_results:
143+
raise Exception("No results found! Try a less restrictive/shorter query.")
144+
145+
# Format search results
146+
search_results_json = [] # Organize search results into a unified format
147+
search_results_return = [] # Format for input to the large model
148+
for index, single_search_result in enumerate(kb_search_results):
149+
# Extract fields from DataMate API response
150+
entity_data = single_search_result.get("entity", {})
151+
metadata = self._parse_metadata(entity_data.get("metadata"))
152+
dataset_id = self._extract_dataset_id(metadata.get("absolute_directory_path", ""))
153+
file_id = entity_data.get("id")
154+
download_url = self._build_file_download_url(dataset_id, file_id)
155+
156+
score_details = entity_data.get("scoreDetails", {}) or {}
157+
score_details.update({
158+
"datamate_dataset_id": dataset_id,
159+
"datamate_file_id": file_id,
160+
"datamate_download_url": download_url,
161+
"datamate_base_url": self.server_base_url.rstrip("/")
162+
})
163+
164+
search_result_message = SearchResultTextMessage(
165+
title=metadata.get("file_name", "") or "Untitled",
166+
text=entity_data.get("text", ""),
167+
source_type="datamate",
168+
url=download_url,
169+
filename=metadata.get("file_name", ""),
170+
published_date=entity_data.get("createTime", ""),
171+
score=entity_data.get("score", "0"),
172+
score_details=score_details,
173+
cite_index=self.record_ops + index,
174+
search_type=self.name,
175+
tool_sign=self.tool_sign,
176+
)
177+
178+
search_results_json.append(search_result_message.to_dict())
179+
search_results_return.append(search_result_message.to_model_dict())
180+
181+
self.record_ops += len(search_results_return)
182+
183+
# Record the detailed content of this search
184+
if self.observer:
185+
search_results_data = json.dumps(search_results_json, ensure_ascii=False)
186+
self.observer.add_message("", ProcessType.SEARCH_CONTENT, search_results_data)
187+
return json.dumps(search_results_return, ensure_ascii=False)
188+
189+
except Exception as e:
190+
error_msg = f"Error during DataMate knowledge base search: {str(e)}"
191+
logger.error(error_msg)
192+
raise Exception(error_msg)
193+
194+
def _get_knowledge_base_list(self) -> List[str]:
195+
"""Get knowledge base list from DataMate API.
196+
197+
Returns:
198+
List[str]: List of knowledge base IDs.
199+
"""
200+
try:
201+
url = f"{self.server_base_url}/api/knowledge-base/list"
202+
payload = {"page": self.kb_page, "size": self.kb_page_size}
203+
204+
with httpx.Client(timeout=30) as client:
205+
response = client.post(url, json=payload)
206+
207+
if response.status_code != 200:
208+
error_detail = (
209+
response.json().get("detail", "unknown error")
210+
if response.headers.get("content-type", "").startswith("application/json")
211+
else response.text
212+
)
213+
raise Exception(f"Failed to get knowledge base list (status {response.status_code}): {error_detail}")
214+
215+
result = response.json()
216+
# Extract knowledge base IDs from response
217+
# Assuming the response structure contains a list of knowledge bases with 'id' field
218+
data = result.get("data", {})
219+
knowledge_bases = data.get("content", []) if data else []
220+
221+
knowledge_base_ids = []
222+
for kb in knowledge_bases:
223+
kb_id = kb.get("id")
224+
chunk_count = kb.get("chunkCount")
225+
if kb_id and chunk_count:
226+
knowledge_base_ids.append(str(kb_id))
227+
228+
logger.info(f"Retrieved {len(knowledge_base_ids)} knowledge base(s): {knowledge_base_ids}")
229+
return knowledge_base_ids
230+
231+
except httpx.TimeoutException:
232+
raise Exception("Timeout while getting knowledge base list from DataMate API")
233+
except httpx.RequestError as e:
234+
raise Exception(f"Request error while getting knowledge base list: {str(e)}")
235+
except Exception as e:
236+
raise Exception(f"Error getting knowledge base list: {str(e)}")
237+
238+
def _retrieve_knowledge_base_content(
239+
self, query: str, knowledge_base_ids: List[str], top_k: int, threshold: float
240+
) -> List[dict]:
241+
"""Retrieve knowledge base content from DataMate API.
242+
243+
Args:
244+
query (str): Search query.
245+
knowledge_base_ids (List[str]): List of knowledge base IDs to search.
246+
top_k (int): Maximum number of results to return.
247+
threshold (float): Similarity threshold.
248+
249+
Returns:
250+
List[dict]: List of search results.
251+
"""
252+
search_results = []
253+
for knowledge_base_id in knowledge_base_ids:
254+
try:
255+
url = f"{self.server_base_url}/api/knowledge-base/retrieve"
256+
payload = {
257+
"query": query,
258+
"topK": top_k,
259+
"threshold": threshold,
260+
"knowledgeBaseIds": [knowledge_base_id],
261+
}
262+
263+
with httpx.Client(timeout=60) as client:
264+
response = client.post(url, json=payload)
265+
266+
if response.status_code != 200:
267+
error_detail = (
268+
response.json().get("detail", "unknown error")
269+
if response.headers.get("content-type", "").startswith("application/json")
270+
else response.text
271+
)
272+
raise Exception(
273+
f"Failed to retrieve knowledge base content (status {response.status_code}): {error_detail}")
274+
275+
result = response.json()
276+
# Extract search results from response
277+
for data in result.get("data", {}):
278+
search_results.append(data)
279+
except httpx.TimeoutException:
280+
raise Exception("Timeout while retrieving knowledge base content from DataMate API")
281+
except httpx.RequestError as e:
282+
raise Exception(f"Request error while retrieving knowledge base content: {str(e)}")
283+
except Exception as e:
284+
raise Exception(f"Error retrieving knowledge base content: {str(e)}")
285+
logger.info(f"Retrieved {len(search_results)} search result(s)")
286+
return search_results
287+
288+
@staticmethod
289+
def _parse_metadata(metadata_raw: Optional[str]) -> dict:
290+
"""Parse metadata payload safely."""
291+
if not metadata_raw:
292+
return {}
293+
if isinstance(metadata_raw, dict):
294+
return metadata_raw
295+
try:
296+
return json.loads(metadata_raw)
297+
except (json.JSONDecodeError, TypeError):
298+
logger.warning("Failed to parse metadata payload, falling back to empty metadata.")
299+
return {}
300+
301+
@staticmethod
302+
def _extract_dataset_id(absolute_path: str) -> str:
303+
"""Extract dataset identifier from an absolute directory path."""
304+
if not absolute_path:
305+
return ""
306+
segments = [segment for segment in absolute_path.strip("/").split("/") if segment]
307+
return segments[-1] if segments else ""
308+
309+
def _build_file_download_url(self, dataset_id: str, file_id: str) -> str:
310+
"""Build the download URL for a dataset file."""
311+
if not (self.server_ip and dataset_id and file_id):
312+
return ""
313+
return f"{self.server_ip}/api/data-management/datasets/{dataset_id}/files/{file_id}/download"

0 commit comments

Comments
 (0)