Skip to content

Commit ad8dc62

Browse files
chore: refine knowledgebase and memory structure / logs (#38)
* refine knowledgebase and memory logs * fix index error * reconstruct database adapter * update naming * fix: fix todo * fix: fix database dpendency issues --------- Co-authored-by: hanzhi.421 <[email protected]>
1 parent eb35f54 commit ad8dc62

File tree

10 files changed

+436
-594
lines changed

10 files changed

+436
-594
lines changed

tests/test_knowledgebase.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,3 @@ async def test_knowledgebase():
3333
)
3434
res = "".join(res_list)
3535
assert key in res, f"Test failed for backend local res is {res}"
36-
assert key in res, f"Test failed for backend local res is {res}"
37-
assert key in res, f"Test failed for backend local res is {res}"

tests/test_long_term_memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def test_long_term_memory():
4949
events=[
5050
Event(
5151
invocation_id="test_invocation_id",
52-
author=agent.name,
52+
author="user",
5353
branch=None,
5454
content=types.Content(
5555
parts=[types.Part(text="My name is Alice.")],

veadk/database/database_adapter.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import re
15+
import time
16+
from typing import BinaryIO, TextIO
17+
from veadk.database.base_database import BaseDatabase
18+
19+
from veadk.utils.logger import get_logger
20+
21+
logger = get_logger(__name__)
22+
23+
24+
class KVDatabaseAdapter:
25+
def __init__(self, client):
26+
from veadk.database.kv.redis_database import RedisDatabase
27+
28+
self.client: RedisDatabase = client
29+
30+
def add(self, data: list[str], index: str):
31+
logger.debug(f"Adding documents to Redis database: index={index}")
32+
33+
try:
34+
for _data in data:
35+
self.client.add(key=index, value=_data)
36+
logger.debug(f"Added {len(data)} texts to Redis database: index={index}")
37+
except Exception as e:
38+
logger.error(
39+
f"Failed to add data to Redis database: index={index} error={e}"
40+
)
41+
raise e
42+
43+
def query(self, query: str, index: str, top_k: int = 0) -> list[str]:
44+
logger.debug(f"Querying Redis database: index={index} query={query}")
45+
46+
# ignore top_k, as KV search only return one result
47+
_ = top_k
48+
49+
try:
50+
result = self.client.query(key=index, query=query)
51+
return result
52+
except Exception as e:
53+
logger.error(f"Failed to search from Redis: index={index} error={e}")
54+
raise e
55+
56+
57+
class RelationalDatabaseAdapter:
58+
def __init__(self, client):
59+
from veadk.database.relational.mysql_database import MysqlDatabase
60+
61+
self.client: MysqlDatabase = client
62+
63+
def create_table(self, table_name: str):
64+
logger.debug(f"Creating table for SQL database: table_name={table_name}")
65+
66+
sql = f"""
67+
CREATE TABLE `{table_name}` (
68+
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
69+
`data` TEXT NOT NULL,
70+
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
71+
) ENGINE=InnoDB DEFAULT CHARSET={self.client.config.charset};
72+
"""
73+
self.client.add(sql)
74+
75+
def add(self, data: list[str], index: str):
76+
logger.debug(
77+
f"Adding documents to SQL database: table_name={index} data_len={len(data)}"
78+
)
79+
80+
if not self.client.table_exists(index):
81+
logger.warning(f"Table {index} does not exist, creating a new table.")
82+
self.create_table(index)
83+
84+
for _data in data:
85+
sql = f"""
86+
INSERT INTO `{index}` (`data`)
87+
VALUES (%s);
88+
"""
89+
self.client.add(sql, params=(_data,))
90+
logger.debug(f"Added {len(data)} texts to table {index}.")
91+
92+
def query(self, query: str, index: str, top_k: int) -> list[str]:
93+
logger.debug(
94+
f"Querying SQL database: table_name={index} query={query} top_k={top_k}"
95+
)
96+
97+
if not self.client.table_exists(index):
98+
logger.warning(
99+
f"Querying SQL database, but table `{index}` does not exist, returning empty list."
100+
)
101+
return []
102+
103+
sql = f"""
104+
SELECT `data` FROM `{index}` ORDER BY `created_at` DESC LIMIT {top_k};
105+
"""
106+
results = self.client.query(sql)
107+
108+
return [item["data"] for item in results]
109+
110+
111+
class VectorDatabaseAdapter:
112+
def __init__(self, client):
113+
from veadk.database.vector.opensearch_vector_database import (
114+
OpenSearchVectorDatabase,
115+
)
116+
117+
self.client: OpenSearchVectorDatabase = client
118+
119+
def _validate_index(self, index: str):
120+
"""
121+
Verify whether the string conforms to the naming rules of index_name in OpenSearch.
122+
https://docs.opensearch.org/2.8/api-reference/index-apis/create-index/
123+
"""
124+
if not (
125+
isinstance(index, str)
126+
and not index.startswith(("_", "-"))
127+
and index.islower()
128+
and re.match(r"^[a-z0-9_\-.]+$", index)
129+
):
130+
raise ValueError(
131+
"The index name does not conform to the naming rules of OpenSearch"
132+
)
133+
134+
def add(self, data: list[str], index: str):
135+
self._validate_index(index)
136+
137+
logger.debug(
138+
f"Adding documents to vector database: index={index} data_len={len(data)}"
139+
)
140+
141+
self.client.add(data, collection_name=index)
142+
143+
def query(self, query: str, index: str, top_k: int) -> list[str]:
144+
logger.debug(
145+
f"Querying vector database: collection_name={index} query={query} top_k={top_k}"
146+
)
147+
148+
return self.client.query(
149+
query=query,
150+
collection_name=index,
151+
top_k=top_k,
152+
)
153+
154+
155+
class VikingDatabaseAdapter:
156+
def __init__(self, client):
157+
from veadk.database.viking.viking_database import VikingDatabase
158+
159+
self.client: VikingDatabase = client
160+
161+
def _validate_index(self, index: str):
162+
"""
163+
Only English letters, numbers, and underscores (_) are allowed.
164+
It must start with an English letter and cannot be empty. Length requirement: [1, 128].
165+
For details, please see: https://www.volcengine.com/docs/84313/1254542?lang=zh
166+
"""
167+
if not (
168+
isinstance(index, str)
169+
and 0 < len(index) <= 128
170+
and re.fullmatch(r"^[a-zA-Z][a-zA-Z0-9_]*$", index)
171+
):
172+
raise ValueError(
173+
"The index name does not conform to the rules: it must start with an English letter, contain only letters, numbers, and underscores, and have a length of 1-128."
174+
)
175+
176+
def get_or_create_collection(self, collection_name: str):
177+
if not self.client.collection_exists(collection_name):
178+
logger.warning(
179+
f"Collection {collection_name} does not exist, creating a new collection."
180+
)
181+
self.client.create_collection(collection_name)
182+
183+
# After creation, it is necessary to wait for a while.
184+
count = 0
185+
while not self.client.collection_exists(collection_name):
186+
print("here")
187+
time.sleep(1)
188+
count += 1
189+
if count > 60:
190+
raise TimeoutError(
191+
f"Collection {collection_name} not created after 50 seconds"
192+
)
193+
194+
def add(
195+
self, data: str | list[str] | TextIO | BinaryIO | bytes, index: str, **kwargs
196+
):
197+
self._validate_index(index)
198+
199+
logger.debug(f"Adding documents to Viking database: collection_name={index}")
200+
201+
self.get_or_create_collection(index)
202+
self.client.add(data, collection_name=index, **kwargs)
203+
204+
def query(self, query: str, index: str, top_k: int) -> list[str]:
205+
self._validate_index(index)
206+
207+
logger.debug(f"Querying Viking database: collection_name={index} query={query}")
208+
209+
if not self.client.collection_exists(index):
210+
return []
211+
212+
return self.client.query(query, collection_name=index, top_k=top_k)
213+
214+
215+
class VikingMemoryDatabaseAdapter:
216+
def __init__(self, client):
217+
from veadk.database.viking.viking_memory_db import VikingMemoryDatabase
218+
219+
self.client: VikingMemoryDatabase = client
220+
221+
def _validate_index(self, index: str):
222+
if not (
223+
isinstance(index, str)
224+
and 1 <= len(index) <= 128
225+
and re.fullmatch(r"^[a-zA-Z][a-zA-Z0-9_]*$", index)
226+
):
227+
raise ValueError(
228+
"The index name does not conform to the rules: it must start with an English letter, contain only letters, numbers, and underscores, and have a length of 1-128."
229+
)
230+
231+
def add(self, data: list[str], index: str, **kwargs):
232+
self._validate_index(index)
233+
234+
logger.debug(
235+
f"Adding documents to Viking database memory: collection_name={index} data_len={len(data)}"
236+
)
237+
238+
self.client.add(data, collection_name=index, **kwargs)
239+
240+
def query(self, query: str, index: str, top_k: int, **kwargs):
241+
self._validate_index(index)
242+
243+
logger.debug(
244+
f"Querying Viking database memory: collection_name={index} query={query} top_k={top_k}"
245+
)
246+
247+
result = self.client.query(query, collection_name=index, top_k=top_k, **kwargs)
248+
return result
249+
250+
251+
class LocalDatabaseAdapter:
252+
def __init__(self, client):
253+
from veadk.database.local_database import LocalDataBase
254+
255+
self.client: LocalDataBase = client
256+
257+
def add(self, data: list[str], **kwargs):
258+
self.client.add(data)
259+
260+
def query(self, query: str, **kwargs):
261+
return self.client.query(query, **kwargs)
262+
263+
264+
MAPPING = {
265+
"RedisDatabase": KVDatabaseAdapter,
266+
"MysqlDatabase": RelationalDatabaseAdapter,
267+
"LocalDataBase": LocalDatabaseAdapter,
268+
"VikingDatabase": VikingDatabaseAdapter,
269+
"OpenSearchVectorDatabase": VectorDatabaseAdapter,
270+
"VikingMemoryDatabase": VikingMemoryDatabaseAdapter,
271+
}
272+
273+
274+
def get_knowledgebase_database_adapter(database_client: BaseDatabase):
275+
return MAPPING[type(database_client).__name__](client=database_client)
276+
277+
278+
def get_long_term_memory_database_adapter(database_client: BaseDatabase):
279+
return MAPPING[type(database_client).__name__](client=database_client)

veadk/database/database_factory.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ def create(backend: str, config=None) -> BaseDatabase:
6969
return VikingDatabase() if config is None else VikingDatabase(config=config)
7070

7171
if backend == DatabaseBackend.VIKING_MEM:
72-
from .viking.viking_memory_db import VikingDatabaseMemory
72+
from .viking.viking_memory_db import VikingMemoryDatabase
7373

7474
return (
75-
VikingDatabaseMemory()
75+
VikingMemoryDatabase()
7676
if config is None
77-
else VikingDatabaseMemory(config=config)
77+
else VikingMemoryDatabase(config=config)
7878
)
7979
else:
8080
raise ValueError(f"Unsupported database type: {backend}")

veadk/database/viking/viking_database.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
get_collections_path = "/api/knowledge/collection/info"
4141
doc_add_path = "/api/knowledge/doc/add"
4242
doc_info_path = "/api/knowledge/doc/info"
43+
doc_del_path = "/api/collection/drop"
4344

4445

4546
class VolcengineTOSConfig(BaseModel):
@@ -215,7 +216,12 @@ def _add_doc(self, collection_name: str, tos_url: str, doc_id: str, **kwargs: An
215216

216217
return doc_id
217218

218-
def add(self, data: str | list[str] | TextIO | BinaryIO | bytes, **kwargs: Any):
219+
def add(
220+
self,
221+
data: str | list[str] | TextIO | BinaryIO | bytes,
222+
collection_name: str,
223+
**kwargs,
224+
):
219225
"""
220226
Args:
221227
data: str, file path or file stream: Both file or file.read() are acceptable.
@@ -226,8 +232,6 @@ def add(self, data: str | list[str] | TextIO | BinaryIO | bytes, **kwargs: Any):
226232
"doc_id": "<doc_id>",
227233
}
228234
"""
229-
collection_name = kwargs.get("collection_name")
230-
assert collection_name is not None, "collection_name is required"
231235

232236
status, tos_url = self._upload_to_tos(data=data, **kwargs)
233237
if status != 200:
@@ -243,9 +247,23 @@ def add(self, data: str | list[str] | TextIO | BinaryIO | bytes, **kwargs: Any):
243247
}
244248

245249
def delete(self, **kwargs: Any):
246-
# collection_name = kwargs.get("collection_name")
247-
# todo: delete vikingdb
248-
...
250+
collection_name = kwargs.get("collection_name")
251+
resource_id = kwargs.get("resource_id")
252+
request_param = {"collection_name": collection_name, "resource_id": resource_id}
253+
doc_del_req = prepare_request(
254+
method="POST", path=doc_del_path, config=self.config, data=request_param
255+
)
256+
rsp = requests.request(
257+
method=doc_del_req.method,
258+
url="http://{}{}".format(g_knowledge_base_domain, doc_del_req.path),
259+
headers=doc_del_req.headers,
260+
data=doc_del_req.body,
261+
)
262+
result = rsp.json()
263+
if result["code"] != 0:
264+
logger.error(f"Error in add_doc: {result['message']}")
265+
return {"error": result["message"]}
266+
return {}
249267

250268
def query(self, query: str, **kwargs: Any) -> list[str]:
251269
"""

0 commit comments

Comments
 (0)