Skip to content

Commit 7786ff1

Browse files
committed
feat(datasource): Support Excel
1 parent 279c4b7 commit 7786ff1

File tree

8 files changed

+185
-43
lines changed

8 files changed

+185
-43
lines changed

backend/apps/datasource/api/datasource.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
import hashlib
2+
import uuid
3+
from typing import List
4+
5+
import pandas as pd
16
from fastapi import APIRouter, File, UploadFile, HTTPException
7+
from numba.core.cgutils import if_zero
8+
9+
from apps.db.engine import create_table, get_data_engine, insert_data
10+
from common.core.deps import SessionDep
211
from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \
312
execSql, update_table_and_fields, getTablesByDs, chooseTables, preview
4-
from common.core.deps import SessionDep
5-
from ..models.datasource import CoreDatasource, CreateDatasource, EditObj, CoreTable
6-
from ..crud.table import get_tables_by_ds_id
713
from ..crud.field import get_fields_by_table_id
8-
from typing import List
9-
import uuid
10-
import pandas as pd
11-
import hashlib
14+
from ..crud.table import get_tables_by_ds_id
15+
from ..models.datasource import CoreDatasource, CreateDatasource, EditObj, CoreTable
1216

1317
router = APIRouter(tags=["datasource"], prefix="/datasource")
1418

@@ -92,18 +96,24 @@ async def upload_excel(session: SessionDep, file: UploadFile = File(...)):
9296
contents = await file.read()
9397
df_sheets = pd.read_excel(contents, sheet_name=None)
9498
# build columns and data to insert db
95-
# todo
9699
sheets = []
100+
conn = get_data_engine()
97101
for sheet_name, df in df_sheets.items():
98-
print("--------------------")
99-
print(f"Sheet: {sheet_name}, {hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}")
100-
sheets.append({"name": f"{sheet_name}",
101-
"uniqueName": f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}"})
102+
tableName = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}"
103+
sheets.append({"tableName": tableName, "tableComment": ""})
102104
column_len = len(df.dtypes)
105+
fields = []
103106
for i in range(column_len):
104-
print(f"{df.columns[i]} , {df.dtypes[i]}")
105-
106-
for row in df.values:
107-
print(row)
108-
107+
# build fields
108+
fields.append({"name": df.columns[i], "type": str(df.dtypes[i]), "relType": ""})
109+
# create table
110+
create_table(conn, tableName, fields)
111+
112+
data = [
113+
{df.columns[i]: int(row[i]) if "int" in str(df.dtypes[i]) else row[i] for i in range(len(row))}
114+
for row in df.values
115+
]
116+
# insert data
117+
insert_data(conn, tableName, fields, data)
118+
conn.close()
109119
return {"filename": file.filename, "sheets": sheets}

backend/apps/datasource/crud/datasource.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
import json
33
from typing import List
44

5-
from sqlalchemy import text, and_
5+
from sqlalchemy import and_, text
66
from sqlmodel import select
77

88
from apps.datasource.utils.utils import aes_decrypt
99
from apps.db.db import get_engine, get_tables, get_fields, exec_sql
10+
from apps.db.engine import get_engine_config
11+
from apps.db.engine import get_engine_conn
1012
from common.core.deps import SessionDep
1113
from common.utils.utils import deepcopy_ignore_extra
1214
from ..crud.field import delete_field_by_ds_id, update_field
@@ -36,8 +38,8 @@ def create_ds(session: SessionDep, create_ds: CreateDatasource):
3638
ds = CoreDatasource()
3739
deepcopy_ignore_extra(create_ds, ds)
3840
ds.create_time = datetime.datetime.now()
39-
status = check_status(session, ds)
40-
ds.status = "Success" if status is True else "Fail"
41+
# status = check_status(session, ds)
42+
ds.status = "Success"
4143
record = CoreDatasource(**ds.model_dump())
4244
session.add(record)
4345
session.flush()
@@ -46,9 +48,7 @@ def create_ds(session: SessionDep, create_ds: CreateDatasource):
4648
session.commit()
4749

4850
# save tables and fields
49-
if status:
50-
sync_table(session, ds, create_ds.tables)
51-
51+
sync_table(session, ds, create_ds.tables)
5252
return ds
5353

5454

@@ -72,6 +72,16 @@ def update_ds(session: SessionDep, ds: CoreDatasource):
7272

7373
def delete_ds(session: SessionDep, id: int):
7474
term = session.exec(select(CoreDatasource).where(CoreDatasource.id == id)).first()
75+
if term.type == "excel":
76+
# drop all tables for current datasource
77+
engine = get_engine_conn()
78+
conf = DatasourceConf(**json.loads(aes_decrypt(term.configuration)))
79+
with engine.connect() as conn:
80+
for sheet in conf.sheets:
81+
conn.execute(text(f'DROP TABLE IF EXISTS "{sheet["tableName"]}"'))
82+
conn.commit()
83+
conn.close()
84+
7585
session.delete(term)
7686
session.commit()
7787
delete_table_by_ds_id(session, id)
@@ -98,6 +108,11 @@ def getFields(session: SessionDep, id: int, table_name: str):
98108
return fields
99109

100110

111+
def getFieldsByDs(session: SessionDep, ds: CoreDatasource, table_name: str):
112+
fields = get_fields(ds, table_name)
113+
return fields
114+
115+
101116
def execSql(session: SessionDep, id: int, sql: str):
102117
ds = session.exec(select(CoreDatasource).where(CoreDatasource.id == id)).first()
103118
return exec_sql(ds, sql)
@@ -128,7 +143,7 @@ def sync_table(session: SessionDep, ds: CoreDatasource, tables: List[CoreTable])
128143
session.commit()
129144

130145
# sync field
131-
fields = getFields(session, ds.id, item.table_name)
146+
fields = getFieldsByDs(session, ds, item.table_name)
132147
sync_fields(session, ds, item, fields)
133148

134149
if len(id_list) > 0:
@@ -177,14 +192,14 @@ def update_table_and_fields(session: SessionDep, data: EditObj):
177192

178193
def preview(session: SessionDep, id: int, data: EditObj):
179194
ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first()
180-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
195+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
181196
sql: str = ""
182197
if ds.type == "mysql":
183198
sql = f"""SELECT `{"`, `".join([f.field_name for f in data.fields if f.checked])}` FROM `{data.table.table_name}` LIMIT 100"""
184199
elif ds.type == "sqlServer":
185200
sql = f"""SELECT [{"], [".join([f.field_name for f in data.fields if f.checked])}] FROM [{conf.dbSchema}].[{data.table.table_name}]
186201
ORDER BY [{data.fields[0].field_name}]
187202
OFFSET 0 ROWS FETCH NEXT 100 ROWS ONLY"""
188-
elif ds.type == "pg":
203+
elif ds.type == "pg" or ds.type == "excel":
189204
sql = f"""SELECT "{'", "'.join([f.field_name for f in data.fields if f.checked])}" FROM "{conf.dbSchema}"."{data.table.table_name}" LIMIT 100"""
190205
return exec_sql(ds, sql)

backend/apps/datasource/models/datasource.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from sqlmodel import SQLModel, Field
2-
from sqlalchemy import Column, Text, BigInteger, DateTime, Integer, Identity
31
from datetime import datetime
4-
from pydantic import BaseModel
52
from typing import List, Optional
63

4+
from pydantic import BaseModel
5+
from sqlalchemy import Column, Text, BigInteger, DateTime, Integer, Identity
6+
from sqlmodel import SQLModel, Field
7+
78

89
class CoreDatasource(SQLModel, table=True):
910
__tablename__ = "core_datasource"
@@ -68,6 +69,22 @@ class DatasourceConf(BaseModel):
6869
driver: str = ''
6970
extraJdbc: str = ''
7071
dbSchema: str = ''
72+
filename: str = ''
73+
sheets: List = ''
74+
75+
def to_dict(self):
76+
return {
77+
"host": self.host,
78+
"port": self.port,
79+
"username": self.username,
80+
"password": self.password,
81+
"database": self.database,
82+
"driver": self.driver,
83+
"extraJdbc": self.extraJdbc,
84+
"dbSchema": self.dbSchema,
85+
"filename": self.filename,
86+
"sheets": self.sheets
87+
}
7188

7289

7390
class TableSchema:

backend/apps/db/db.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88

99
from apps.datasource.models.datasource import DatasourceConf, CoreDatasource, TableSchema, ColumnSchema
1010
from apps.datasource.utils.utils import aes_decrypt
11+
from apps.db.engine import get_engine_config
1112

1213

1314
def get_uri(ds: CoreDatasource):
14-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
15+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
1516
db_url: str
1617
if ds.type == "mysql":
1718
if conf.extraJdbc is not None and conf.extraJdbc != '':
@@ -23,7 +24,7 @@ def get_uri(ds: CoreDatasource):
2324
db_url = f"mssql+pymssql://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}?{urllib.parse.quote(conf.extraJdbc)}"
2425
else:
2526
db_url = f"mssql+pymssql://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}"
26-
elif ds.type == "pg":
27+
elif ds.type == "pg" or ds.type == "excel":
2728
if conf.extraJdbc is not None and conf.extraJdbc != '':
2829
db_url = f"postgresql+psycopg2://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}?{urllib.parse.quote(conf.extraJdbc)}"
2930
else:
@@ -34,7 +35,7 @@ def get_uri(ds: CoreDatasource):
3435

3536

3637
def get_engine(ds: CoreDatasource) -> Engine:
37-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
38+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
3839
if ds.type == "pg" and (conf.dbSchema is not None and conf.dbSchema != ""):
3940
engine = create_engine(get_uri(ds), connect_args={"options": f"-c search_path={conf.dbSchema}"},
4041
pool_timeout=30, pool_size=20, max_overflow=10)
@@ -51,7 +52,7 @@ def get_session(ds: CoreDatasource):
5152

5253

5354
def get_tables(ds: CoreDatasource):
54-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
55+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
5556
session = get_session(ds)
5657
result: Result[Any]
5758
sql: str = ''
@@ -82,7 +83,7 @@ def get_tables(ds: CoreDatasource):
8283
t.TABLE_TYPE IN ('BASE TABLE', 'VIEW')
8384
AND t.TABLE_SCHEMA = '{conf.dbSchema}';
8485
"""
85-
elif ds.type == "pg":
86+
elif ds.type == "pg" or ds.type == "excel":
8687
sql = """
8788
SELECT
8889
c.relname AS TABLE_NAME,
@@ -113,7 +114,7 @@ def get_tables(ds: CoreDatasource):
113114

114115

115116
def get_fields(ds: CoreDatasource, table_name: str = None):
116-
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
117+
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config()
117118
session = get_session(ds)
118119
result: Result[Any]
119120
sql: str = ''
@@ -149,7 +150,7 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
149150
"""
150151
sql2 = f" AND C.TABLE_NAME = '{table_name}';" if table_name is not None and table_name != "" else ";"
151152
sql = sql1 + sql2
152-
elif ds.type == "pg":
153+
elif ds.type == "pg" or ds.type == "excel":
153154
sql1 = """
154155
SELECT
155156
a.attname AS COLUMN_NAME,

backend/apps/db/engine.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,68 @@
11
# Author: Junjun
22
# Date: 2025/5/19
3+
from typing import List
4+
5+
from sqlalchemy import create_engine, text, MetaData, Table
6+
from sqlalchemy.orm import sessionmaker
7+
8+
from apps.datasource.models.datasource import DatasourceConf
9+
10+
11+
def get_engine_config():
12+
return DatasourceConf(username="root", password="123456", host="127.0.0.1", port=5432, database="sqlbot",
13+
dbSchema="public")
14+
15+
16+
def get_engine_uri(conf: DatasourceConf):
17+
return f"postgresql+psycopg2://{conf.username}:{conf.password}@{conf.host}:{conf.port}/{conf.database}"
18+
19+
20+
def get_engine_conn():
21+
conf = get_engine_config()
22+
db_url = get_engine_uri(conf)
23+
engine = create_engine(db_url, connect_args={"options": f"-c search_path={conf.dbSchema}"}, pool_timeout=300,
24+
pool_size=20,
25+
max_overflow=10)
26+
return engine
27+
28+
29+
def get_data_engine():
30+
engine = get_engine_conn()
31+
session_maker = sessionmaker(bind=engine)
32+
session = session_maker()
33+
return session
34+
35+
36+
def create_table(session, table_name: str, fields: List[any]):
37+
# field type relation
38+
list = []
39+
for f in fields:
40+
if "object" in f["type"]:
41+
f["relType"] = "text"
42+
elif "int" in f["type"]:
43+
f["relType"] = "bigint"
44+
elif "float" in f["type"]:
45+
f["relType"] = "numeric"
46+
elif "datetime" in f["type"]:
47+
f["relType"] = "timestamp"
48+
else:
49+
f["relType"] = "text"
50+
list.append(f'"{f["name"]}" {f["relType"]}')
51+
52+
sql = f"""
53+
CREATE TABLE "{table_name}" (
54+
{", ".join(list)}
55+
);
56+
"""
57+
session.execute(text(sql))
58+
session.commit()
59+
60+
61+
def insert_data(session, table_name: str, fields: List[any], data: List[any]):
62+
engine = get_engine_conn()
63+
metadata = MetaData()
64+
table = Table(table_name, metadata, autoload_with=engine)
65+
with engine.connect() as conn:
66+
stmt = table.insert().values(data)
67+
conn.execute(stmt)
68+
conn.commit()
Lines changed: 3 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)