Skip to content

Commit 279c4b7

Browse files
committed
feat(datasource): Support PostgreSQL
1 parent e61ead2 commit 279c4b7

File tree

7 files changed

+118
-31
lines changed

7 files changed

+118
-31
lines changed

backend/apps/datasource/crud/datasource.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1-
from sqlmodel import select
2-
from ..models.datasource import CoreDatasource, CreateDatasource, CoreTable, CoreField, ColumnSchema, EditObj
31
import datetime
4-
from common.core.deps import SessionDep
5-
from apps.db.db import get_session, get_tables, get_fields, exec_sql
2+
import json
3+
from typing import List
4+
65
from sqlalchemy import text, and_
6+
from sqlmodel import select
7+
8+
from apps.datasource.utils.utils import aes_decrypt
9+
from apps.db.db import get_engine, get_tables, get_fields, exec_sql
10+
from common.core.deps import SessionDep
711
from common.utils.utils import deepcopy_ignore_extra
8-
from typing import List
9-
from ..crud.table import delete_table_by_ds_id, update_table
1012
from ..crud.field import delete_field_by_ds_id, update_field
13+
from ..crud.table import delete_table_by_ds_id, update_table
14+
from ..models.datasource import CoreDatasource, CreateDatasource, CoreTable, CoreField, ColumnSchema, EditObj, \
15+
DatasourceConf
1116

1217

1318
def get_datasource_list(session: SessionDep) -> CoreDatasource:
@@ -17,17 +22,14 @@ def get_datasource_list(session: SessionDep) -> CoreDatasource:
1722

1823

1924
def check_status(session: SessionDep, ds: CoreDatasource):
20-
conn = get_session(ds)
25+
conn = get_engine(ds)
2126
try:
22-
conn.execute(text("SELECT 1")).scalar()
23-
print("success")
24-
return True
27+
with conn.connect() as connection:
28+
print("success")
29+
return True
2530
except Exception as e:
2631
print("Fail:", e)
2732
return False
28-
finally:
29-
conn.close()
30-
return False
3133

3234

3335
def create_ds(session: SessionDep, create_ds: CreateDatasource):
@@ -175,13 +177,14 @@ def update_table_and_fields(session: SessionDep, data: EditObj):
175177

176178
def preview(session: SessionDep, id: int, data: EditObj):
177179
ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first()
180+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
178181
sql: str = ""
179182
if ds.type == "mysql":
180-
sql = f"""SELECT {", ".join([f.field_name for f in data.fields if f.checked])} FROM {data.table.table_name} LIMIT 100"""
183+
sql = f"""SELECT `{"`, `".join([f.field_name for f in data.fields if f.checked])}` FROM `{data.table.table_name}` LIMIT 100"""
181184
elif ds.type == "sqlServer":
182-
sql = f"""
183-
SELECT {", ".join([f.field_name for f in data.fields if f.checked])} FROM {data.table.table_name}
184-
ORDER BY {data.fields[0].field_name}
185-
OFFSET 0 ROWS FETCH NEXT 100 ROWS ONLY
186-
"""
185+
sql = f"""SELECT [{"], [".join([f.field_name for f in data.fields if f.checked])}] FROM [{conf.dbSchema}].[{data.table.table_name}]
186+
ORDER BY [{data.fields[0].field_name}]
187+
OFFSET 0 ROWS FETCH NEXT 100 ROWS ONLY"""
188+
elif ds.type == "pg":
189+
sql = f"""SELECT "{'", "'.join([f.field_name for f in data.fields if f.checked])}" FROM "{conf.dbSchema}"."{data.table.table_name}" LIMIT 100"""
187190
return exec_sql(ds, sql)

backend/apps/db/db.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
import base64
12
import json
23
import urllib.parse
34
from typing import Any
45

5-
from sqlalchemy import create_engine, text, Result
6+
from sqlalchemy import create_engine, text, Result, Engine
67
from sqlalchemy.orm import sessionmaker
78

89
from apps.datasource.models.datasource import DatasourceConf, CoreDatasource, TableSchema, ColumnSchema
@@ -22,13 +23,28 @@ def get_uri(ds: CoreDatasource):
2223
db_url = f"mssql+pymssql://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}?{urllib.parse.quote(conf.extraJdbc)}"
2324
else:
2425
db_url = f"mssql+pymssql://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}"
26+
elif ds.type == "pg":
27+
if conf.extraJdbc is not None and conf.extraJdbc != '':
28+
db_url = f"postgresql+psycopg2://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}?{urllib.parse.quote(conf.extraJdbc)}"
29+
else:
30+
db_url = f"postgresql+psycopg2://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}"
2531
else:
2632
raise 'The datasource type not support.'
2733
return db_url
2834

2935

36+
def get_engine(ds: CoreDatasource) -> Engine:
37+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
38+
if ds.type == "pg" and (conf.dbSchema is not None and conf.dbSchema != ""):
39+
engine = create_engine(get_uri(ds), connect_args={"options": f"-c search_path={conf.dbSchema}"},
40+
pool_timeout=30, pool_size=20, max_overflow=10)
41+
else:
42+
engine = create_engine(get_uri(ds), pool_timeout=30, pool_size=20, max_overflow=10)
43+
return engine
44+
45+
3046
def get_session(ds: CoreDatasource):
31-
engine = create_engine(get_uri(ds))
47+
engine = get_engine(ds)
3248
session_maker = sessionmaker(bind=engine)
3349
session = session_maker()
3450
return session
@@ -66,6 +82,24 @@ def get_tables(ds: CoreDatasource):
6682
t.TABLE_TYPE IN ('BASE TABLE', 'VIEW')
6783
AND t.TABLE_SCHEMA = '{conf.dbSchema}';
6884
"""
85+
elif ds.type == "pg":
86+
sql = """
87+
SELECT
88+
c.relname AS TABLE_NAME,
89+
COALESCE(d.description, obj_description(c.oid)) AS TABLE_COMMENT
90+
FROM
91+
pg_class c
92+
LEFT JOIN
93+
pg_namespace n ON n.oid = c.relnamespace
94+
LEFT JOIN
95+
pg_description d ON d.objoid = c.oid AND d.objsubid = 0
96+
WHERE
97+
n.nspname = current_schema()
98+
AND c.relkind IN ('r', 'v')
99+
AND c.relname NOT LIKE 'pg_%'
100+
AND c.relname NOT LIKE 'sql_%'
101+
ORDER BY c.relname;
102+
"""
69103

70104
result = session.execute(text(sql))
71105
res = result.fetchall()
@@ -113,7 +147,26 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
113147
WHERE
114148
C.TABLE_SCHEMA = '{conf.dbSchema}'
115149
"""
116-
sql2 = f"AND C.TABLE_NAME = '{table_name}';" if table_name is not None and table_name != "" else ";"
150+
sql2 = f" AND C.TABLE_NAME = '{table_name}';" if table_name is not None and table_name != "" else ";"
151+
sql = sql1 + sql2
152+
elif ds.type == "pg":
153+
sql1 = """
154+
SELECT
155+
a.attname AS COLUMN_NAME,
156+
pg_catalog.format_type(a.atttypid, a.atttypmod) AS DATA_TYPE,
157+
col_description(c.oid, a.attnum) AS COLUMN_COMMENT
158+
FROM
159+
pg_catalog.pg_attribute a
160+
JOIN
161+
pg_catalog.pg_class c ON a.attrelid = c.oid
162+
JOIN
163+
pg_catalog.pg_namespace n ON n.oid = c.relnamespace
164+
WHERE
165+
n.nspname = current_schema()
166+
AND a.attnum > 0
167+
AND NOT a.attisdropped
168+
"""
169+
sql2 = f" AND c.relname = '{table_name}';" if table_name is not None and table_name != "" else ";"
117170
sql = sql1 + sql2
118171

119172
result = session.execute(text(sql))
@@ -137,7 +190,7 @@ def exec_sql(ds: CoreDatasource, sql: str):
137190
{columns[i]: value for i, value in enumerate(tuple_item)}
138191
for tuple_item in res
139192
]
140-
return {"fields": columns, "data": result_list}
193+
return {"fields": columns, "data": result_list, "sql": base64.b64encode(bytes(sql, 'utf-8'))}
141194
finally:
142195
if result is not None:
143196
result.close()

backend/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ dependencies = [
3131
"llama_index>=0.12.35",
3232
"pymssql (>=2.3.4,<3.0.0)",
3333
"pandas (>=2.2.3,<3.0.0)",
34-
"openpyxl (>=3.1.5,<4.0.0)"
34+
"openpyxl (>=3.1.5,<4.0.0)",
35+
"psycopg2 (>=2.9.10,<3.0.0)"
3536
]
3637
[[tool.uv.index]]
3738
url = "https://pypi.tuna.tsinghua.edu.cn/simple"

0 commit comments

Comments
 (0)