Skip to content

Commit 330fd2d

Browse files
committed
feat: support kingbase datasource
1 parent 3dc4b8e commit 330fd2d

File tree

4 files changed

+97
-5
lines changed

4 files changed

+97
-5
lines changed

backend/apps/datasource/crud/datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ def preview(session: SessionDep, current_user: CurrentUser, id: int, data: Table
279279
sql = f"""SELECT TOP 100 [{"], [".join(fields)}] FROM [{conf.dbSchema}].[{data.table.table_name}]
280280
{where}
281281
"""
282-
elif ds.type == "pg" or ds.type == "excel" or ds.type == "redshift":
282+
elif ds.type == "pg" or ds.type == "excel" or ds.type == "redshift" or ds.type == "kingbase":
283283
sql = f"""SELECT "{'", "'.join(fields)}" FROM "{conf.dbSchema}"."{data.table.table_name}"
284284
{where}
285285
LIMIT 100"""

backend/apps/db/constant.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class DB(Enum):
2323
doris = ('doris', 'Apache Doris', '`', '`', ConnectType.py_driver)
2424
redshift = ('redshift', 'AWS Redshift', '"', '"', ConnectType.py_driver)
2525
es = ('es', 'Elasticsearch', '"', '"', ConnectType.py_driver)
26+
kingbase = ('kingbase', 'Kingbase', '"', '"', ConnectType.py_driver)
2627

2728
def __init__(self, type, db_name, prefix, suffix, connect_type: ConnectType):
2829
self.type = type

backend/apps/db/db.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from decimal import Decimal
66
from typing import Optional
77

8+
import psycopg2
89
import pymssql
9-
1010
from apps.db.db_sql import get_table_sql, get_field_sql, get_version_sql
1111
from common.error import ParseSQLResultError
1212

@@ -191,6 +191,20 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
191191
if is_raise:
192192
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
193193
return False
194+
elif ds.type == 'kingbase':
195+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database,
196+
user=conf.username,
197+
password=conf.password,
198+
connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
199+
try:
200+
cursor.execute('select 1')
201+
SQLBotLogUtil.info("success")
202+
return True
203+
except Exception as e:
204+
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
205+
if is_raise:
206+
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
207+
return False
194208
elif ds.type == 'es':
195209
es_conn = get_es_connect(conf)
196210
if es_conn.ping():
@@ -269,8 +283,7 @@ def get_schema(ds: CoreDatasource):
269283
if ds.type == "sqlServer":
270284
sql = """select name from sys.schemas"""
271285
elif ds.type == "pg" or ds.type == "excel":
272-
sql = """SELECT nspname
273-
FROM pg_namespace"""
286+
sql = """SELECT nspname FROM pg_namespace"""
274287
elif ds.type == "oracle":
275288
sql = """select * from all_users"""
276289
with session.execute(text(sql)) as result:
@@ -294,6 +307,15 @@ def get_schema(ds: CoreDatasource):
294307
res = cursor.fetchall()
295308
res_list = [item[0] for item in res]
296309
return res_list
310+
elif ds.type == 'kingbase':
311+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
312+
password=conf.password,
313+
options=f"-c statement_timeout={conf.timeout * 1000}",
314+
**extra_config_dict) as conn, conn.cursor() as cursor:
315+
cursor.execute("""SELECT nspname FROM pg_namespace""")
316+
res = cursor.fetchall()
317+
res_list = [item[0] for item in res]
318+
return res_list
297319

298320

299321
def get_tables(ds: CoreDatasource):
@@ -331,6 +353,15 @@ def get_tables(ds: CoreDatasource):
331353
res = cursor.fetchall()
332354
res_list = [TableSchema(*item) for item in res]
333355
return res_list
356+
elif ds.type == 'kingbase':
357+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
358+
password=conf.password,
359+
options=f"-c statement_timeout={conf.timeout * 1000}",
360+
**extra_config_dict) as conn, conn.cursor() as cursor:
361+
cursor.execute(sql.format(sql_param))
362+
res = cursor.fetchall()
363+
res_list = [TableSchema(*item) for item in res]
364+
return res_list
334365
elif ds.type == 'es':
335366
res = get_es_index(conf)
336367
res_list = [TableSchema(*item) for item in res]
@@ -372,6 +403,15 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
372403
res = cursor.fetchall()
373404
res_list = [ColumnSchema(*item) for item in res]
374405
return res_list
406+
elif ds.type == 'kingbase':
407+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
408+
password=conf.password,
409+
options=f"-c statement_timeout={conf.timeout * 1000}",
410+
**extra_config_dict) as conn, conn.cursor() as cursor:
411+
cursor.execute(sql.format(p1, p2))
412+
res = cursor.fetchall()
413+
res_list = [ColumnSchema(*item) for item in res]
414+
return res_list
375415
elif ds.type == 'es':
376416
res = get_es_fields(conf, table_name)
377417
res_list = [ColumnSchema(*item) for item in res]
@@ -457,6 +497,26 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column=
457497
"sql": bytes.decode(base64.b64encode(bytes(sql, 'utf-8')))}
458498
except Exception as ex:
459499
raise ParseSQLResultError(str(ex))
500+
elif ds.type == 'kingbase':
501+
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
502+
password=conf.password,
503+
options=f"-c statement_timeout={conf.timeout * 1000}",
504+
**extra_config_dict) as conn, conn.cursor() as cursor:
505+
try:
506+
cursor.execute(sql)
507+
res = cursor.fetchall()
508+
columns = [field[0] for field in cursor.description] if origin_column else [field[0].lower() for
509+
field in
510+
cursor.description]
511+
result_list = [
512+
{str(columns[i]): float(value) if isinstance(value, Decimal) else value for i, value in
513+
enumerate(tuple_item)}
514+
for tuple_item in res
515+
]
516+
return {"fields": columns, "data": result_list,
517+
"sql": bytes.decode(base64.b64encode(bytes(sql, 'utf-8')))}
518+
except Exception as ex:
519+
raise ParseSQLResultError(str(ex))
460520
elif ds.type == 'es':
461521
try:
462522
res, columns = get_es_data_by_http(conf, sql)

backend/apps/db/db_sql.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def get_version_sql(ds: CoreDatasource, conf: DatasourceConf):
1212
return """
1313
select SERVERPROPERTY('ProductVersion')
1414
"""
15-
elif ds.type == "pg" or ds.type == "excel":
15+
elif ds.type == "pg" or ds.type == "kingbase" or ds.type == "excel":
1616
return """
1717
SELECT current_setting('server_version')
1818
"""
@@ -140,6 +140,21 @@ def get_table_sql(ds: CoreDatasource, conf: DatasourceConf, db_version: str = ''
140140
WHERE
141141
TABLE_SCHEMA = %s
142142
""", conf.database
143+
elif ds.type == "kingbase":
144+
return """
145+
SELECT c.relname AS TABLE_NAME,
146+
COALESCE(d.description, obj_description(c.oid)) AS TABLE_COMMENT
147+
FROM pg_class c
148+
LEFT JOIN
149+
pg_namespace n ON n.oid = c.relnamespace
150+
LEFT JOIN
151+
pg_description d ON d.objoid = c.oid AND d.objsubid = 0
152+
WHERE n.nspname = '{0}'
153+
AND c.relkind IN ('r', 'v', 'p', 'm')
154+
AND c.relname NOT LIKE 'pg_%'
155+
AND c.relname NOT LIKE 'sql_%'
156+
ORDER BY c.relname \
157+
""", conf.dbSchema
143158
elif ds.type == "es":
144159
return "", None
145160

@@ -275,5 +290,21 @@ def get_field_sql(ds: CoreDatasource, conf: DatasourceConf, table_name: str = No
275290
"""
276291
sql2 = " AND TABLE_NAME = %s" if table_name is not None and table_name != "" else ""
277292
return sql1 + sql2, conf.database, table_name
293+
elif ds.type == "kingbase":
294+
sql1 = """
295+
SELECT a.attname AS COLUMN_NAME,
296+
pg_catalog.format_type(a.atttypid, a.atttypmod) AS DATA_TYPE,
297+
col_description(c.oid, a.attnum) AS COLUMN_COMMENT
298+
FROM pg_catalog.pg_attribute a
299+
JOIN
300+
pg_catalog.pg_class c ON a.attrelid = c.oid
301+
JOIN
302+
pg_catalog.pg_namespace n ON n.oid = c.relnamespace
303+
WHERE n.nspname = '{0}'
304+
AND a.attnum > 0
305+
AND NOT a.attisdropped \
306+
"""
307+
sql2 = " AND c.relname = '{1}'" if table_name is not None and table_name != "" else ""
308+
return sql1 + sql2, conf.dbSchema, table_name
278309
elif ds.type == "es":
279310
return "", None, None

0 commit comments

Comments
 (0)