11from sqlmodel import select
2- from ..models .datasource import CoreDatasource , DatasourceConf
2+ from ..models .datasource import CoreDatasource , CreateDatasource , CoreTable , CoreField , ColumnSchema
33import datetime
44from common .core .deps import SessionDep
55from apps .db .db import get_session , get_tables , get_fields , exec_sql
6- from sqlalchemy import text
6+ from sqlalchemy import text , and_
7+ from common .utils .utils import deepcopy_ignore_extra
8+ from typing import List
9+ from ..crud .table import delete_table_by_ds_id
10+ from ..crud .field import delete_field_by_ds_id
711
812
913def get_datasource_list (session : SessionDep ) -> CoreDatasource :
@@ -26,21 +30,30 @@ def check_status(session: SessionDep, ds: CoreDatasource):
2630 return False
2731
2832
29- def create_ds (session : SessionDep , ds : CoreDatasource ):
33+ def create_ds (session : SessionDep , create_ds : CreateDatasource ):
34+ ds = CoreDatasource ()
35+ deepcopy_ignore_extra (create_ds , ds )
3036 ds .create_time = datetime .datetime .now ()
3137 status = check_status (session , ds )
3238 ds .status = "Success" if status is True else "Fail"
3339 record = CoreDatasource (** ds .model_dump ())
34- # get tables and fields
35- if status :
36- pass
3740 session .add (record )
41+ session .flush ()
42+ session .refresh (record )
43+ ds .id = record .id
3844 session .commit ()
45+
46+ # save tables and fields
47+ if status :
48+ sync_table (session , ds , create_ds .tables )
49+
3950 return ds
4051
4152
4253def update_ds (session : SessionDep , ds : CoreDatasource ):
4354 ds .id = int (ds .id )
55+ status = check_status (session , ds )
56+ ds .status = "Success" if status is True else "Fail"
4457 record = session .exec (select (CoreDatasource ).where (CoreDatasource .id == ds .id )).first ()
4558 update_data = ds .model_dump (exclude_unset = True )
4659 for field , value in update_data .items ():
@@ -54,6 +67,8 @@ def delete_ds(session: SessionDep, id: int):
5467 term = session .exec (select (CoreDatasource ).where (CoreDatasource .id == id )).first ()
5568 session .delete (term )
5669 session .commit ()
70+ delete_table_by_ds_id (session , id )
71+ delete_field_by_ds_id (session , id )
5772 return {
5873 "message" : f"Datasource with ID { id } deleted successfully."
5974 }
@@ -74,3 +89,65 @@ def getFields(session: SessionDep, id: int, table_name: str):
7489def execSql (session : SessionDep , id : int , sql : str ):
7590 ds = session .exec (select (CoreDatasource ).where (CoreDatasource .id == id )).first ()
7691 return exec_sql (ds , sql )
92+
93+
94+ def sync_table (session : SessionDep , ds : CoreDatasource , tables : List [CoreTable ]):
95+ id_list = []
96+ for item in tables :
97+ statement = select (CoreTable ).where (and_ (CoreTable .ds_id == ds .id , CoreTable .table_name == item .table_name ))
98+ record = session .exec (statement ).first ()
99+ # update exist table, only update table_comment
100+ if record is not None :
101+ item .id = record .id
102+ id_list .append (record .id )
103+
104+ record .table_comment = item .table_comment
105+ session .add (record )
106+ session .commit ()
107+ else :
108+ # save new table
109+ table = CoreTable (ds_id = ds .id , checked = True , table_name = item .table_name , table_comment = item .table_comment ,
110+ custom_comment = item .table_comment )
111+ session .add (table )
112+ session .flush ()
113+ session .refresh (table )
114+ item .id = table .id
115+ id_list .append (table .id )
116+ session .commit ()
117+
118+ # sync field
119+ fields = getFields (session , ds .id , item .table_name )
120+ sync_fields (session , ds , item , fields )
121+
122+ if len (id_list ) > 0 :
123+ session .query (CoreTable ).filter (and_ (CoreTable .ds_id == ds .id , CoreTable .id .not_in (id_list ))).delete (
124+ synchronize_session = False )
125+
126+
127+ def sync_fields (session : SessionDep , ds : CoreDatasource , table : CoreTable , fields : List [ColumnSchema ]):
128+ id_list = []
129+ for item in fields :
130+ statement = select (CoreField ).where (
131+ and_ (CoreField .table_id == table .id , CoreField .field_name == item .fieldName ))
132+ record = session .exec (statement ).first ()
133+ if record is not None :
134+ item .id = record .id
135+ id_list .append (record .id )
136+
137+ record .field_comment = item .fieldComment
138+ session .add (record )
139+ session .commit ()
140+ else :
141+ field = CoreField (ds_id = ds .id , table_id = table .id , checked = True , field_name = item .fieldName ,
142+ field_type = item .fieldType , field_comment = item .fieldComment ,
143+ custom_comment = item .fieldComment )
144+ session .add (field )
145+ session .flush ()
146+ session .refresh (field )
147+ item .id = field .id
148+ id_list .append (field .id )
149+ session .commit ()
150+
151+ if len (id_list ) > 0 :
152+ session .query (CoreField ).filter (and_ (CoreField .table_id == table .id , CoreField .id .not_in (id_list ))).delete (
153+ synchronize_session = False )
0 commit comments