33from apps .datasource .models .datasource import DatasourceConf , CoreDatasource , TableSchema , ColumnSchema
44import urllib .parse
55from typing import Any
6+ import json
7+ from apps .datasource .utils .utils import aes_decrypt
68
79
8- def get_session (conf : DatasourceConf , ds : CoreDatasource ):
10+ def get_session (ds : CoreDatasource ):
11+ conf = DatasourceConf (** json .loads (aes_decrypt (ds .configuration )))
912 db_url : str
1013 if ds .type == "mysql" :
1114 db_url = f"mysql+pymysql://{ urllib .parse .quote (conf .username )} :{ urllib .parse .quote (conf .password )} @{ conf .host } :{ conf .port } /{ urllib .parse .quote (conf .database )} "
@@ -17,39 +20,71 @@ def get_session(conf: DatasourceConf, ds: CoreDatasource):
1720 return session
1821
1922
20- def get_tables (conf : DatasourceConf , ds : CoreDatasource ):
21- session = get_session (conf , ds )
23+ def get_tables (ds : CoreDatasource ):
24+ conf = DatasourceConf (** json .loads (aes_decrypt (ds .configuration )))
25+ session = get_session (ds )
2226 result : Result [Any ]
23- if ds .type == "mysql" :
24- sql = f"""SELECT
25- TABLE_NAME,
26- TABLE_COMMENT
27- FROM
28- information_schema.TABLES
29- WHERE
30- TABLE_SCHEMA = '{ conf .database } ';"""
31- result = session .execute (text (sql ))
32-
33- res = result .fetchall ()
34- res_list = [TableSchema (* item ) for item in res ]
35- return res_list
36-
37-
38- def get_fields (conf : DatasourceConf , ds : CoreDatasource , table_name : str ):
39- session = get_session (conf , ds )
27+ try :
28+ if ds .type == "mysql" :
29+ sql = f"""SELECT
30+ TABLE_NAME,
31+ TABLE_COMMENT
32+ FROM
33+ information_schema.TABLES
34+ WHERE
35+ TABLE_SCHEMA = '{ conf .database } ';"""
36+ result = session .execute (text (sql ))
37+
38+ res = result .fetchall ()
39+ res_list = [TableSchema (* item ) for item in res ]
40+ return res_list
41+ finally :
42+ if result is not None :
43+ result .close ()
44+ if session is not None :
45+ session .close ()
46+
47+
48+ def get_fields (ds : CoreDatasource , table_name : str ):
49+ conf = DatasourceConf (** json .loads (aes_decrypt (ds .configuration )))
50+ session = get_session (ds )
4051 result : Result [Any ]
41- if ds .type == "mysql" :
42- sql = f"""SELECT
43- COLUMN_NAME,
44- DATA_TYPE,
45- COLUMN_COMMENT
46- FROM
47- INFORMATION_SCHEMA.COLUMNS
48- WHERE
49- TABLE_SCHEMA = '{ conf .database } ' AND
50- TABLE_NAME = '{ table_name } ';"""
51- result = session .execute (text (sql ))
52-
53- res = result .fetchall ()
54- res_list = [ColumnSchema (* item ) for item in res ]
55- return res_list
52+ try :
53+ if ds .type == "mysql" :
54+ sql = f"""SELECT
55+ COLUMN_NAME,
56+ DATA_TYPE,
57+ COLUMN_COMMENT
58+ FROM
59+ INFORMATION_SCHEMA.COLUMNS
60+ WHERE
61+ TABLE_SCHEMA = '{ conf .database } ' AND
62+ TABLE_NAME = '{ table_name } ';"""
63+ result = session .execute (text (sql ))
64+
65+ res = result .fetchall ()
66+ res_list = [ColumnSchema (* item ) for item in res ]
67+ return res_list
68+ finally :
69+ if result is not None :
70+ result .close ()
71+ if session is not None :
72+ session .close ()
73+
74+
75+ def exec_sql (ds : CoreDatasource , sql : str ):
76+ session = get_session (ds )
77+ result = session .execute (text (sql ))
78+ try :
79+ columns = result .keys ()._keys
80+ res = result .fetchall ()
81+ result_list = [
82+ {columns [i ]: value for i , value in enumerate (tuple_item )}
83+ for tuple_item in res
84+ ]
85+ return {"fields" : columns , "data" : result_list }
86+ finally :
87+ if result is not None :
88+ result .close ()
89+ if session is not None :
90+ session .close ()
0 commit comments