Skip to content

Commit ce73862

Browse files
committed
reconstruct database adapter
1 parent 5206d7e commit ce73862

File tree

6 files changed

+315
-581
lines changed

6 files changed

+315
-581
lines changed

veadk/database/database_adapter.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
from typing import BinaryIO, TextIO
17+
18+
from pydantic import BaseModel, ConfigDict
19+
20+
from veadk.database.base_database import BaseDatabase
21+
from veadk.database.kv.redis_database import RedisDatabase
22+
from veadk.database.local_database import LocalDataBase
23+
from veadk.database.relational.mysql_database import MysqlDatabase
24+
from veadk.database.vector.opensearch_vector_database import OpenSearchVectorDatabase
25+
from veadk.database.viking.viking_database import VikingDatabase
26+
from veadk.database.viking.viking_memory_db import VikingDatabaseMemory
27+
from veadk.utils.logger import get_logger
28+
29+
logger = get_logger(__name__)
30+
31+
32+
class KVDatabaseAdapter(BaseModel):
33+
model_config = ConfigDict(arbitrary_types_allowed=True)
34+
35+
client: RedisDatabase
36+
37+
def add(self, data: list[str], index: str):
38+
logger.debug(f"Adding documents to Redis database: index={index}")
39+
40+
try:
41+
for _data in data:
42+
self.client.add(key=index, data=_data)
43+
logger.debug(f"Added {len(data)} texts to Redis database: index={index}")
44+
except Exception as e:
45+
logger.error(
46+
f"Failed to add data to Redis database: index={index} error={e}"
47+
)
48+
raise e
49+
50+
def query(self, query: str, index: str, top_k: int = 0) -> list[str]:
51+
logger.debug(f"Querying Redis database: index={index} query={query}")
52+
53+
# ignore top_k, as KV search only return one result
54+
_ = top_k
55+
56+
try:
57+
result = self.client.query(key=index, query=query)
58+
return result
59+
except Exception as e:
60+
logger.error(f"Failed to search from Redis: index={index} error={e}")
61+
raise e
62+
63+
64+
class RelationalDatabaseAdapter(BaseModel):
65+
model_config = ConfigDict(arbitrary_types_allowed=True)
66+
67+
client: MysqlDatabase
68+
69+
def create_table(self, table_name: str):
70+
logger.debug(f"Creating table for SQL database: table_name={table_name}")
71+
72+
sql = f"""
73+
CREATE TABLE `{table_name}` (
74+
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
75+
`data` TEXT NOT NULL,
76+
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
77+
) ENGINE=InnoDB DEFAULT CHARSET={self.client.config.charset};
78+
"""
79+
self.client.add(sql)
80+
81+
def add(self, data: list[str], index: str):
82+
logger.debug(
83+
f"Adding documents to SQL database: table_name={index} data_len={len(data)}"
84+
)
85+
86+
if not self.client.table_exists(index):
87+
logger.warning(f"Table {index} does not exist, creating...")
88+
self.create_table(index)
89+
90+
for _data in data:
91+
sql = f"""
92+
INSERT INTO `{index}` (`data`)
93+
VALUES (%s);
94+
"""
95+
self.client.add(sql, params=(_data,))
96+
logger.debug(f"Added {len(data)} texts to table {index}.")
97+
98+
def query(self, query: str, index: str, top_k: int) -> list[str]:
99+
logger.debug(
100+
f"Querying SQL database: table_name={index} query={query} top_k={top_k}"
101+
)
102+
103+
if not self.client.table_exists(index):
104+
logger.warning(
105+
f"Querying SQL database, but table `{index}` does not exist, returning empty list."
106+
)
107+
return []
108+
109+
sql = f"""
110+
SELECT `data` FROM `{index}` ORDER BY `created_at` DESC LIMIT {top_k};
111+
"""
112+
results = self.client.query(sql)
113+
114+
return [item["data"] for item in results]
115+
116+
117+
class VectorDatabaseAdapter(BaseModel):
118+
model_config = ConfigDict(arbitrary_types_allowed=True)
119+
120+
client: OpenSearchVectorDatabase
121+
122+
def _validate_index(self, index: str):
123+
# TODO
124+
pass
125+
126+
def add(self, data: list[str], index: str):
127+
self._validate_index(index)
128+
129+
logger.debug(
130+
f"Adding documents to vector database: index={index} data_len={len(data)}"
131+
)
132+
133+
self.client.add(data, collection_name=index)
134+
135+
def query(self, query: str, index: str, top_k: int) -> list[str]:
136+
self._validate_index(index)
137+
138+
logger.debug(
139+
f"Querying vector database: collection_name={index} query={query} top_k={top_k}"
140+
)
141+
142+
return self.client.query(
143+
query=query,
144+
collection_name=index,
145+
top_k=top_k,
146+
)
147+
148+
149+
class VikingDatabaseAdapter(BaseModel):
150+
model_config = ConfigDict(arbitrary_types_allowed=True)
151+
152+
client: VikingDatabase
153+
154+
def _validate_index(self, index: str):
155+
# TODO
156+
pass
157+
158+
def get_or_create_collection(self, collection_name: str):
159+
if not self.client.collection_exists(collection_name):
160+
self.client.create_collection(collection_name)
161+
162+
count = 0
163+
while not self.client.collection_exists(collection_name):
164+
time.sleep(1)
165+
count += 1
166+
if count > 50:
167+
raise TimeoutError(
168+
f"Collection {collection_name} not created after 50 seconds"
169+
)
170+
171+
def add(
172+
self, data: str | list[str] | TextIO | BinaryIO | bytes, index: str, **kwargs
173+
):
174+
self._validate_index(index)
175+
176+
logger.debug(f"Adding documents to Viking database: collection_name={index}")
177+
self.get_or_create_collection(index)
178+
self.client.add(data, collection_name=index, **kwargs)
179+
180+
def query(self, query: str, index: str, top_k: int) -> list[str]:
181+
self._validate_index(index)
182+
183+
logger.debug(f"Querying Viking database: collection_name={index} query={query}")
184+
185+
# FIXME(): maybe do not raise, but just return []
186+
if not self.client.collection_exists(index):
187+
raise ValueError(f"Collection {index} does not exist")
188+
189+
return self.client.query(query, collection_name=index, top_k=top_k)
190+
191+
192+
class VikingDatabaseMemoryAdapter(BaseModel):
193+
model_config = ConfigDict(arbitrary_types_allowed=True)
194+
195+
client: VikingDatabaseMemory
196+
197+
def _validate_index(self, index: str):
198+
# TODO
199+
pass
200+
201+
def add(self, data: list[str], index: str):
202+
self._validate_index(index)
203+
204+
logger.debug(
205+
f"Adding documents to Viking database memory: collection_name={index} data_len={len(data)}"
206+
)
207+
208+
self.client.add(data, collection_name=index)
209+
210+
def query(self, query: str, index: str, top_k: int):
211+
self._validate_index(index)
212+
213+
logger.debug(
214+
f"Querying Viking database memory: collection_name={index} query={query} top_k={top_k}"
215+
)
216+
217+
result = self.client.query(query, collection_name=index, top_k=top_k)
218+
return result
219+
220+
221+
class LocalDatabaseAdapter(BaseModel):
222+
model_config = ConfigDict(arbitrary_types_allowed=True)
223+
224+
client: LocalDataBase
225+
226+
def add(self, data: list[str], **kwargs):
227+
self.client.add(data)
228+
229+
def query(self, query: str, **kwargs):
230+
return self.client.query(query, **kwargs)
231+
232+
233+
MAPPING = {
234+
RedisDatabase: KVDatabaseAdapter,
235+
MysqlDatabase: RelationalDatabaseAdapter,
236+
LocalDataBase: LocalDatabaseAdapter,
237+
VikingDatabase: VikingDatabaseAdapter,
238+
OpenSearchVectorDatabase: VectorDatabaseAdapter,
239+
VikingDatabaseMemory: VikingDatabaseMemoryAdapter,
240+
}
241+
242+
243+
def get_knowledgebase_database_adapter(database_client: BaseDatabase):
244+
return MAPPING[type(database_client)](database_client=database_client)
245+
246+
247+
def get_long_term_memory_database_adapter(database_client: BaseDatabase):
248+
return MAPPING[type(database_client)](database_client=database_client)

veadk/database/viking/viking_database.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,12 @@ def _add_doc(self, collection_name: str, tos_url: str, doc_id: str, **kwargs: An
215215

216216
return doc_id
217217

218-
def add(self, data: str | list[str] | TextIO | BinaryIO | bytes, **kwargs: Any):
218+
def add(
219+
self,
220+
data: str | list[str] | TextIO | BinaryIO | bytes,
221+
collection_name: str,
222+
**kwargs,
223+
):
219224
"""
220225
Args:
221226
data: str, file path or file stream: Both file or file.read() are acceptable.
@@ -226,8 +231,6 @@ def add(self, data: str | list[str] | TextIO | BinaryIO | bytes, **kwargs: Any):
226231
"doc_id": "<doc_id>",
227232
}
228233
"""
229-
collection_name = kwargs.get("collection_name")
230-
assert collection_name is not None, "collection_name is required"
231234

232235
status, tos_url = self._upload_to_tos(data=data, **kwargs)
233236
if status != 200:

veadk/knowledgebase/knowledgebase.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@
1414

1515
from typing import BinaryIO, Literal, TextIO
1616

17+
from veadk.database.database_adapter import get_knowledgebase_database_adapter
1718
from veadk.database.database_factory import DatabaseFactory
1819
from veadk.utils.logger import get_logger
1920

20-
from .knowledgebase_database_adapter import get_knowledgebase_adapter
21-
2221
logger = get_logger(__name__)
2322

2423

24+
def build_knowledgebase_index(app_name: str):
25+
return f"{app_name}"
26+
27+
2528
class KnowledgeBase:
2629
def __init__(
2730
self,
@@ -37,9 +40,7 @@ def __init__(
3740
self.top_k = top_k
3841

3942
self.db_client = DatabaseFactory.create(backend=backend, config=db_config)
40-
self.adapter = get_knowledgebase_adapter(backend)(
41-
database_client=self.db_client
42-
)
43+
self.adapter = get_knowledgebase_database_adapter(self.db_client)
4344

4445
logger.info(
4546
f"Initialized knowledgebase: db_client={self.db_client} adapter={self.adapter}"
@@ -53,23 +54,18 @@ def add(
5354
):
5455
"""
5556
Add documents to the vector database.
56-
You can only upload files or file characters when the adapter type you use is vikingdb.
57+
You can only upload files or file characters when the adapter type used is vikingdb.
5758
In addition, if you upload data of the bytes type,
5859
for example, if you read the file stream of a pdf, then you need to pass an additional parameter file_ext = '.pdf'.
5960
"""
60-
kwargs.pop("session_id", None) # remove session_id
61-
logger.info(f"Adding documents to knowledgebase: app_name={app_name}")
62-
self.adapter.add(data, app_name=app_name, **kwargs)
61+
# TODO: add check for data type
62+
...
6363

64-
def search(self, query: str, app_name: str, top_k: int = None) -> list[str]:
65-
"""Retrieve documents similar to the query text in the vector database.
64+
index = build_knowledgebase_index(app_name)
65+
logger.info(f"Adding documents to knowledgebase: index={index}")
66+
self.adapter.add(data=data, index=index)
6667

67-
Args:
68-
query (str): The query text to be retrieved (e.g., "Who proposed the Turing machine model?")
69-
70-
Returns:
71-
list[str]: A list of the top most similar document contents retrieved (sorted by vector similarity)
72-
"""
68+
def search(self, query: str, app_name: str, top_k: int = None) -> list[str]:
7369
top_k = self.top_k if top_k is None else top_k
7470

7571
logger.info(

0 commit comments

Comments
 (0)