88
99if platform .system () != "Darwin" :
1010 import dmPython
11+ import pymysql
1112from sqlalchemy import create_engine , text , Engine
1213from sqlalchemy .orm import sessionmaker
1314
@@ -125,6 +126,19 @@ def check_connection(trans: Trans, ds: CoreDatasource, is_raise: bool = False):
125126 if is_raise :
126127 raise HTTPException (status_code = 500 , detail = trans ('i18n_ds_invalid' ) + f': { e .args } ' )
127128 return False
129+ elif ds .type == 'doris' :
130+ with pymysql .connect (user = conf .username , passwd = conf .password , host = conf .host ,
131+ port = conf .port , db = conf .database , connect_timeout = 10 ,
132+ read_timeout = 10 ) as conn , conn .cursor () as cursor :
133+ try :
134+ cursor .execute ('select 1' )
135+ SQLBotLogUtil .info ("success" )
136+ return True
137+ except Exception as e :
138+ SQLBotLogUtil .error (f"Datasource { ds .id } connection failed: { e } " )
139+ if is_raise :
140+ raise HTTPException (status_code = 500 , detail = trans ('i18n_ds_invalid' ) + f': { e .args } ' )
141+ return False
128142
129143
130144def get_schema (ds : CoreDatasource ):
@@ -171,6 +185,14 @@ def get_tables(ds: CoreDatasource):
171185 res = cursor .fetchall ()
172186 res_list = [TableSchema (* item ) for item in res ]
173187 return res_list
188+ elif ds .type == 'doris' :
189+ with pymysql .connect (user = conf .username , passwd = conf .password , host = conf .host ,
190+ port = conf .port , db = conf .database , connect_timeout = conf .timeout ,
191+ read_timeout = conf .timeout ) as conn , conn .cursor () as cursor :
192+ cursor .execute (sql )
193+ res = cursor .fetchall ()
194+ res_list = [TableSchema (* item ) for item in res ]
195+ return res_list
174196
175197
176198def get_fields (ds : CoreDatasource , table_name : str = None ):
@@ -191,6 +213,14 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
191213 res = cursor .fetchall ()
192214 res_list = [ColumnSchema (* item ) for item in res ]
193215 return res_list
216+ elif ds .type == 'doris' :
217+ with pymysql .connect (user = conf .username , passwd = conf .password , host = conf .host ,
218+ port = conf .port , db = conf .database , connect_timeout = conf .timeout ,
219+ read_timeout = conf .timeout ) as conn , conn .cursor () as cursor :
220+ cursor .execute (sql )
221+ res = cursor .fetchall ()
222+ res_list = [ColumnSchema (* item ) for item in res ]
223+ return res_list
194224
195225
196226def exec_sql (ds : CoreDatasource | AssistantOutDsSchema , sql : str , origin_column = False ):
@@ -233,3 +263,22 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column=
233263 "sql" : bytes .decode (base64 .b64encode (bytes (sql , 'utf-8' )))}
234264 except Exception as ex :
235265 raise ex
266+ elif ds .type == 'doris' :
267+ with pymysql .connect (user = conf .username , passwd = conf .password , host = conf .host ,
268+ port = conf .port , db = conf .database , connect_timeout = conf .timeout ,
269+ read_timeout = conf .timeout ) as conn , conn .cursor () as cursor :
270+ try :
271+ cursor .execute (sql )
272+ res = cursor .fetchall ()
273+ columns = [field [0 ] for field in cursor .description ] if origin_column else [field [0 ].lower () for
274+ field in
275+ cursor .description ]
276+ result_list = [
277+ {str (columns [i ]): float (value ) if isinstance (value , Decimal ) else value for i , value in
278+ enumerate (tuple_item )}
279+ for tuple_item in res
280+ ]
281+ return {"fields" : columns , "data" : result_list ,
282+ "sql" : bytes .decode (base64 .b64encode (bytes (sql , 'utf-8' )))}
283+ except Exception as ex :
284+ raise ex
0 commit comments