Skip to content

Commit e8673c8

Browse files
committed
refactor: datasource connect support extra config
1 parent 1e99dac commit e8673c8

File tree

2 files changed

+40
-17
lines changed

2 files changed

+40
-17
lines changed

backend/apps/datasource/api/datasource.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def inner():
193193
return await asyncio.to_thread(inner)
194194

195195

196+
# not used
196197
@router.post("/fieldEnum/{id}")
197198
async def field_enum(session: SessionDep, id: int):
198199
def inner():

backend/apps/db/db.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,21 @@ def get_uri_from_config(type: str, conf: DatasourceConf) -> str:
7272
return db_url
7373

7474

75+
def get_extra_config(conf: DatasourceConf):
76+
config_dict = {}
77+
if conf.extraJdbc:
78+
config_arr = conf.extraJdbc.split("&")
79+
for config in config_arr:
80+
kv = config.split("=")
81+
if len(kv) == 2 and kv[0] and kv[1]:
82+
config_dict[kv[0]] = kv[1]
83+
else:
84+
raise Exception(f'param: {config} is error')
85+
return config_dict
86+
87+
7588
def get_origin_connect(type: str, conf: DatasourceConf):
89+
extra_config_dict = get_extra_config(conf)
7690
if type == "sqlServer":
7791
return pymssql.connect(
7892
server=conf.host,
@@ -81,10 +95,12 @@ def get_origin_connect(type: str, conf: DatasourceConf):
8195
password=conf.password,
8296
database=conf.database,
8397
timeout=conf.timeout,
84-
tds_version='7.0' # options: '4.2', '7.0', '8.0' ...
98+
tds_version='7.0', # options: '4.2', '7.0', '8.0' ...,
99+
**extra_config_dict
85100
)
86101

87102

103+
# use sqlalchemy
88104
def get_engine(ds: CoreDatasource, timeout: int = 0) -> Engine:
89105
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
90106
if conf.timeout is None:
@@ -135,9 +151,10 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
135151
return False
136152
else:
137153
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
154+
extra_config_dict = get_extra_config(conf)
138155
if ds.type == 'dm':
139156
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
140-
port=conf.port) as conn, conn.cursor() as cursor:
157+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
141158
try:
142159
cursor.execute('select 1', timeout=10).fetchall()
143160
SQLBotLogUtil.info("success")
@@ -150,7 +167,7 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
150167
elif ds.type == 'doris':
151168
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
152169
port=conf.port, db=conf.database, connect_timeout=10,
153-
read_timeout=10) as conn, conn.cursor() as cursor:
170+
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
154171
try:
155172
cursor.execute('select 1')
156173
SQLBotLogUtil.info("success")
@@ -164,7 +181,7 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
164181
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database,
165182
user=conf.username,
166183
password=conf.password,
167-
timeout=10) as conn, conn.cursor() as cursor:
184+
timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
168185
try:
169186
cursor.execute('select 1')
170187
SQLBotLogUtil.info("success")
@@ -221,16 +238,17 @@ def get_version(ds: CoreDatasource | AssistantOutDsSchema):
221238
res = result.fetchall()
222239
version = res[0][0]
223240
else:
241+
extra_config_dict = get_extra_config(conf)
224242
if ds.type == 'dm':
225243
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
226244
port=conf.port) as conn, conn.cursor() as cursor:
227-
cursor.execute(sql, timeout=10)
245+
cursor.execute(sql, timeout=10, **extra_config_dict)
228246
res = cursor.fetchall()
229247
version = res[0][0]
230248
elif ds.type == 'doris':
231249
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
232250
port=conf.port, db=conf.database, connect_timeout=10,
233-
read_timeout=10) as conn, conn.cursor() as cursor:
251+
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
234252
cursor.execute(sql)
235253
res = cursor.fetchall()
236254
version = res[0][0]
@@ -260,17 +278,18 @@ def get_schema(ds: CoreDatasource):
260278
res_list = [item[0] for item in res]
261279
return res_list
262280
else:
281+
extra_config_dict = get_extra_config(conf)
263282
if ds.type == 'dm':
264283
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
265-
port=conf.port) as conn, conn.cursor() as cursor:
284+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
266285
cursor.execute("""select OBJECT_NAME from dba_objects where object_type='SCH'""", timeout=conf.timeout)
267286
res = cursor.fetchall()
268287
res_list = [item[0] for item in res]
269288
return res_list
270289
elif ds.type == 'redshift':
271290
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
272291
password=conf.password,
273-
timeout=conf.timeout) as conn, conn.cursor() as cursor:
292+
timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
274293
cursor.execute("""SELECT nspname FROM pg_namespace""")
275294
res = cursor.fetchall()
276295
res_list = [item[0] for item in res]
@@ -288,25 +307,26 @@ def get_tables(ds: CoreDatasource):
288307
res_list = [TableSchema(*item) for item in res]
289308
return res_list
290309
else:
310+
extra_config_dict = get_extra_config(conf)
291311
if ds.type == 'dm':
292312
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
293-
port=conf.port) as conn, conn.cursor() as cursor:
313+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
294314
cursor.execute(sql, {"param": sql_param}, timeout=conf.timeout)
295315
res = cursor.fetchall()
296316
res_list = [TableSchema(*item) for item in res]
297317
return res_list
298318
elif ds.type == 'doris':
299319
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
300320
port=conf.port, db=conf.database, connect_timeout=conf.timeout,
301-
read_timeout=conf.timeout) as conn, conn.cursor() as cursor:
321+
read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
302322
cursor.execute(sql, (sql_param,))
303323
res = cursor.fetchall()
304324
res_list = [TableSchema(*item) for item in res]
305325
return res_list
306326
elif ds.type == 'redshift':
307327
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
308328
password=conf.password,
309-
timeout=conf.timeout) as conn, conn.cursor() as cursor:
329+
timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
310330
cursor.execute(sql, (sql_param,))
311331
res = cursor.fetchall()
312332
res_list = [TableSchema(*item) for item in res]
@@ -328,25 +348,26 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
328348
res_list = [ColumnSchema(*item) for item in res]
329349
return res_list
330350
else:
351+
extra_config_dict = get_extra_config(conf)
331352
if ds.type == 'dm':
332353
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
333-
port=conf.port) as conn, conn.cursor() as cursor:
354+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
334355
cursor.execute(sql, {"param1": p1, "param2": p2}, timeout=conf.timeout)
335356
res = cursor.fetchall()
336357
res_list = [ColumnSchema(*item) for item in res]
337358
return res_list
338359
elif ds.type == 'doris':
339360
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
340361
port=conf.port, db=conf.database, connect_timeout=conf.timeout,
341-
read_timeout=conf.timeout) as conn, conn.cursor() as cursor:
362+
read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
342363
cursor.execute(sql, (p1, p2))
343364
res = cursor.fetchall()
344365
res_list = [ColumnSchema(*item) for item in res]
345366
return res_list
346367
elif ds.type == 'redshift':
347368
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
348369
password=conf.password,
349-
timeout=conf.timeout) as conn, conn.cursor() as cursor:
370+
timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
350371
cursor.execute(sql, (p1, p2))
351372
res = cursor.fetchall()
352373
res_list = [ColumnSchema(*item) for item in res]
@@ -379,9 +400,10 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column=
379400
raise ParseSQLResultError(str(ex))
380401
else:
381402
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
403+
extra_config_dict = get_extra_config(conf)
382404
if ds.type == 'dm':
383405
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
384-
port=conf.port) as conn, conn.cursor() as cursor:
406+
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
385407
try:
386408
cursor.execute(sql, timeout=conf.timeout)
387409
res = cursor.fetchall()
@@ -400,7 +422,7 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column=
400422
elif ds.type == 'doris':
401423
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
402424
port=conf.port, db=conf.database, connect_timeout=conf.timeout,
403-
read_timeout=conf.timeout) as conn, conn.cursor() as cursor:
425+
read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
404426
try:
405427
cursor.execute(sql)
406428
res = cursor.fetchall()
@@ -419,7 +441,7 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column=
419441
elif ds.type == 'redshift':
420442
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
421443
password=conf.password,
422-
timeout=conf.timeout) as conn, conn.cursor() as cursor:
444+
timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
423445
try:
424446
cursor.execute(sql)
425447
res = cursor.fetchall()

0 commit comments

Comments
 (0)