Skip to content

Commit fd11b5c

Browse files
committed
refactor: assistant support all sqlbot datasource
1 parent 46f0ca3 commit fd11b5c

File tree

3 files changed

+118
-91
lines changed

3 files changed

+118
-91
lines changed

backend/apps/db/db.py

Lines changed: 100 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from apps.datasource.utils.utils import aes_decrypt
2525
from apps.db.constant import DB, ConnectType
2626
from apps.db.engine import get_engine_config
27-
from apps.system.crud.assistant import get_ds_engine
27+
from apps.system.crud.assistant import get_out_ds_conf
2828
from apps.system.schemas.system_schema import AssistantOutDsSchema
2929
from common.core.deps import Trans
3030
from common.utils.utils import SQLBotLogUtil, equals_ignore_case
@@ -146,92 +146,25 @@ def get_engine(ds: CoreDatasource, timeout: int = 0) -> Engine:
146146

147147

148148
def get_session(ds: CoreDatasource | AssistantOutDsSchema):
149-
engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds)
149+
# engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds)
150+
if isinstance(ds, AssistantOutDsSchema):
151+
out_conf = get_out_ds_conf(ds, 30)
152+
ds.configuration = out_conf
153+
154+
engine = get_engine(ds)
150155
session_maker = sessionmaker(bind=engine)
151156
session = session_maker()
152157
return session
153158

154159

155160
def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDsSchema, is_raise: bool = False):
156-
if isinstance(ds, CoreDatasource):
157-
db = DB.get_db(ds.type)
158-
if db.connect_type == ConnectType.sqlalchemy:
159-
conn = get_engine(ds, 10)
160-
try:
161-
with conn.connect() as connection:
162-
SQLBotLogUtil.info("success")
163-
return True
164-
except Exception as e:
165-
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
166-
if is_raise:
167-
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
168-
return False
169-
else:
170-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
171-
extra_config_dict = get_extra_config(conf)
172-
if equals_ignore_case(ds.type, 'dm'):
173-
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
174-
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
175-
try:
176-
cursor.execute('select 1', timeout=10).fetchall()
177-
SQLBotLogUtil.info("success")
178-
return True
179-
except Exception as e:
180-
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
181-
if is_raise:
182-
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
183-
return False
184-
elif equals_ignore_case(ds.type, 'doris', 'starrocks'):
185-
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
186-
port=conf.port, db=conf.database, connect_timeout=10,
187-
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
188-
try:
189-
cursor.execute('select 1')
190-
SQLBotLogUtil.info("success")
191-
return True
192-
except Exception as e:
193-
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
194-
if is_raise:
195-
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
196-
return False
197-
elif equals_ignore_case(ds.type, 'redshift'):
198-
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database,
199-
user=conf.username,
200-
password=conf.password,
201-
timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
202-
try:
203-
cursor.execute('select 1')
204-
SQLBotLogUtil.info("success")
205-
return True
206-
except Exception as e:
207-
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
208-
if is_raise:
209-
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
210-
return False
211-
elif equals_ignore_case(ds.type, 'kingbase'):
212-
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database,
213-
user=conf.username,
214-
password=conf.password,
215-
connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
216-
try:
217-
cursor.execute('select 1')
218-
SQLBotLogUtil.info("success")
219-
return True
220-
except Exception as e:
221-
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
222-
if is_raise:
223-
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
224-
return False
225-
elif equals_ignore_case(ds.type, 'es'):
226-
es_conn = get_es_connect(conf)
227-
if es_conn.ping():
228-
SQLBotLogUtil.info("success")
229-
return True
230-
else:
231-
SQLBotLogUtil.info("failed")
232-
return False
233-
else:
234-
conn = get_ds_engine(ds)
161+
if isinstance(ds, AssistantOutDsSchema):
162+
out_conf = get_out_ds_conf(ds, 10)
163+
ds.configuration = out_conf
164+
165+
db = DB.get_db(ds.type)
166+
if db.connect_type == ConnectType.sqlalchemy:
167+
conn = get_engine(ds, 10)
235168
try:
236169
with conn.connect() as connection:
237170
SQLBotLogUtil.info("success")
@@ -241,26 +174,102 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
241174
if is_raise:
242175
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
243176
return False
177+
else:
178+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
179+
extra_config_dict = get_extra_config(conf)
180+
if equals_ignore_case(ds.type, 'dm'):
181+
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
182+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
183+
try:
184+
cursor.execute('select 1', timeout=10).fetchall()
185+
SQLBotLogUtil.info("success")
186+
return True
187+
except Exception as e:
188+
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
189+
if is_raise:
190+
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
191+
return False
192+
elif equals_ignore_case(ds.type, 'doris', 'starrocks'):
193+
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
194+
port=conf.port, db=conf.database, connect_timeout=10,
195+
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
196+
try:
197+
cursor.execute('select 1')
198+
SQLBotLogUtil.info("success")
199+
return True
200+
except Exception as e:
201+
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
202+
if is_raise:
203+
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
204+
return False
205+
elif equals_ignore_case(ds.type, 'redshift'):
206+
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database,
207+
user=conf.username,
208+
password=conf.password,
209+
timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
210+
try:
211+
cursor.execute('select 1')
212+
SQLBotLogUtil.info("success")
213+
return True
214+
except Exception as e:
215+
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
216+
if is_raise:
217+
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
218+
return False
219+
elif equals_ignore_case(ds.type, 'kingbase'):
220+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database,
221+
user=conf.username,
222+
password=conf.password,
223+
connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
224+
try:
225+
cursor.execute('select 1')
226+
SQLBotLogUtil.info("success")
227+
return True
228+
except Exception as e:
229+
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
230+
if is_raise:
231+
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
232+
return False
233+
elif equals_ignore_case(ds.type, 'es'):
234+
es_conn = get_es_connect(conf)
235+
if es_conn.ping():
236+
SQLBotLogUtil.info("success")
237+
return True
238+
else:
239+
SQLBotLogUtil.info("failed")
240+
return False
241+
# else:
242+
# conn = get_ds_engine(ds)
243+
# try:
244+
# with conn.connect() as connection:
245+
# SQLBotLogUtil.info("success")
246+
# return True
247+
# except Exception as e:
248+
# SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
249+
# if is_raise:
250+
# raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
251+
# return False
244252

245253
return False
246254

247255

248256
def get_version(ds: CoreDatasource | AssistantOutDsSchema):
249257
version = ''
250-
conf = None
251258
if isinstance(ds, CoreDatasource):
252259
conf = DatasourceConf(
253260
**json.loads(aes_decrypt(ds.configuration))) if not equals_ignore_case(ds.type,
254261
"excel") else get_engine_config()
255-
if isinstance(ds, AssistantOutDsSchema):
256-
conf = DatasourceConf()
257-
conf.host = ds.host
258-
conf.port = ds.port
259-
conf.username = ds.user
260-
conf.password = ds.password
261-
conf.database = ds.dataBase
262-
conf.dbSchema = ds.db_schema
263-
conf.timeout = 10
262+
else:
263+
conf = DatasourceConf(**json.loads(aes_decrypt(get_out_ds_conf(ds, 10))))
264+
# if isinstance(ds, AssistantOutDsSchema):
265+
# conf = DatasourceConf()
266+
# conf.host = ds.host
267+
# conf.port = ds.port
268+
# conf.username = ds.user
269+
# conf.password = ds.password
270+
# conf.database = ds.dataBase
271+
# conf.dbSchema = ds.db_schema
272+
# conf.timeout = 10
264273
db = DB.get_db(ds.type)
265274
sql = get_version_sql(ds, conf)
266275
try:

backend/apps/system/crud/assistant.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
# from apps.datasource.embedding.table_embedding import get_table_embedding
1212
from apps.datasource.models.datasource import CoreDatasource, DatasourceConf
13+
from apps.datasource.utils.utils import aes_encrypt
1314
from apps.system.models.system_model import AssistantModel
1415
from apps.system.schemas.auth import CacheName, CacheNamespace
1516
from apps.system.schemas.system_schema import AssistantHeader, AssistantOutDsSchema, UserInfoDTO
@@ -266,3 +267,19 @@ def get_ds_engine(ds: AssistantOutDsSchema) -> Engine:
266267
else:
267268
engine = create_engine(uri, connect_args={"connect_timeout": timeout}, pool_timeout=timeout)
268269
return engine
270+
271+
272+
def get_out_ds_conf(ds: AssistantOutDsSchema, timeout:int=30) -> str:
273+
conf = {
274+
"host":ds.host,
275+
"port":ds.port,
276+
"username":ds.user,
277+
"password":ds.password,
278+
"database":ds.dataBase,
279+
"driver":'',
280+
"extraJdbc":ds.extraParams or '',
281+
"dbSchema":ds.db_schema or '',
282+
"timeout":timeout
283+
}
284+
conf.extraJdbc = ''
285+
return aes_encrypt(json.dumps(conf))

backend/apps/system/schemas/system_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ class AssistantOutDsBase(BaseModel):
178178
type_name: Optional[str] = None
179179
comment: Optional[str] = None
180180
description: Optional[str] = None
181+
configuration: Optional[str] = None
181182

182183

183184
class AssistantOutDsSchema(AssistantOutDsBase):

0 commit comments

Comments
 (0)