55from xml .dom .minidom import parseString
66
77import dicttoxml
8+ from sqlalchemy import BigInteger
89from sqlalchemy import and_ , or_ , select , func , delete , update , union
910from sqlalchemy import text
1011from sqlalchemy .orm import aliased
1112from sqlalchemy .orm .session import Session
1213
1314from apps .ai_model .embedding import EmbeddingModelCache
15+ from apps .datasource .models .datasource import CoreDatasource
1416from apps .template .generate_chart .generator import get_base_terminology_template
1517from apps .terminology .models .terminology_model import Terminology , TerminologyInfo
1618from common .core .config import settings
@@ -80,20 +82,51 @@ def page_terminology(session: SessionDep, current_page: int = 1, page_size: int
8082 .subquery ()
8183 )
8284
85+ # 创建子查询来获取数据源名称,添加类型转换
86+ datasource_names_subquery = (
87+ select (
88+ func .jsonb_array_elements (Terminology .datasource_ids ).cast (BigInteger ).label ('ds_id' ),
89+ Terminology .id .label ('term_id' )
90+ )
91+ .where (Terminology .id .in_ (paginated_parent_ids ))
92+ .subquery ()
93+ )
94+
8395 # 主查询
8496 stmt = (
8597 select (
8698 Terminology .id ,
8799 Terminology .word ,
88100 Terminology .create_time ,
89101 Terminology .description ,
90- children_subquery .c .other_words
102+ Terminology .specific_ds ,
103+ Terminology .datasource_ids ,
104+ children_subquery .c .other_words ,
105+ func .jsonb_agg (CoreDatasource .name ).filter (CoreDatasource .id .isnot (None )).label ('datasource_names' )
91106 )
92107 .outerjoin (
93108 children_subquery ,
94109 Terminology .id == children_subquery .c .pid
95110 )
111+ # 关联数据源名称子查询和 CoreDatasource 表
112+ .outerjoin (
113+ datasource_names_subquery ,
114+ datasource_names_subquery .c .term_id == Terminology .id
115+ )
116+ .outerjoin (
117+ CoreDatasource ,
118+ CoreDatasource .id == datasource_names_subquery .c .ds_id
119+ )
96120 .where (and_ (Terminology .id .in_ (paginated_parent_ids ), Terminology .oid == oid ))
121+ .group_by (
122+ Terminology .id ,
123+ Terminology .word ,
124+ Terminology .create_time ,
125+ Terminology .description ,
126+ Terminology .specific_ds ,
127+ Terminology .datasource_ids ,
128+ children_subquery .c .other_words
129+ )
97130 .order_by (Terminology .create_time .desc ())
98131 )
99132 else :
@@ -116,15 +149,37 @@ def page_terminology(session: SessionDep, current_page: int = 1, page_size: int
116149 .subquery ()
117150 )
118151
152+ # 创建子查询来获取数据源名称
153+ datasource_names_subquery = (
154+ select (
155+ func .jsonb_array_elements (Terminology .datasource_ids ).cast (BigInteger ).label ('ds_id' ),
156+ Terminology .id .label ('term_id' )
157+ )
158+ .where (Terminology .id .in_ (paginated_parent_ids ))
159+ .subquery ()
160+ )
161+
119162 stmt = (
120163 select (
121164 Terminology .id ,
122165 Terminology .word ,
123166 Terminology .create_time ,
124167 Terminology .description ,
125- func .jsonb_agg (child .word ).filter (child .word .isnot (None )).label ('other_words' )
168+ Terminology .specific_ds ,
169+ Terminology .datasource_ids ,
170+ func .jsonb_agg (child .word ).filter (child .word .isnot (None )).label ('other_words' ),
171+ func .jsonb_agg (CoreDatasource .name ).filter (CoreDatasource .id .isnot (None )).label ('datasource_names' )
126172 )
127173 .outerjoin (child , and_ (Terminology .id == child .pid ))
174+ # 关联数据源名称子查询和 CoreDatasource 表
175+ .outerjoin (
176+ datasource_names_subquery ,
177+ datasource_names_subquery .c .term_id == Terminology .id
178+ )
179+ .outerjoin (
180+ CoreDatasource ,
181+ CoreDatasource .id == datasource_names_subquery .c .ds_id
182+ )
128183 .where (and_ (Terminology .id .in_ (paginated_parent_ids ), Terminology .oid == oid ))
129184 .group_by (Terminology .id , Terminology .word )
130185 .order_by (Terminology .create_time .desc ())
@@ -139,14 +194,23 @@ def page_terminology(session: SessionDep, current_page: int = 1, page_size: int
139194 create_time = row .create_time ,
140195 description = row .description ,
141196 other_words = row .other_words if row .other_words else [],
197+ specific_ds = row .specific_ds if row .specific_ds is not None else False ,
198+ datasource_ids = row .datasource_ids if row .datasource_ids is not None else [],
199+ datasource_names = row .datasource_names if row .datasource_names is not None else [],
142200 ))
143201
144202 return current_page , page_size , total_count , total_pages , _list
145203
146204
147205def create_terminology (session : SessionDep , info : TerminologyInfo , oid : int , trans : Trans ):
148206 create_time = datetime .datetime .now ()
149- parent = Terminology (word = info .word , create_time = create_time , description = info .description , oid = oid )
207+
208+ specific_ds = info .specific_ds if info .specific_ds is not None else False
209+ datasource_ids = info .datasource_ids if info .datasource_ids is not None else []
210+
211+ parent = Terminology (word = info .word , create_time = create_time , description = info .description , oid = oid ,
212+ specific_ds = specific_ds ,
213+ datasource_ids = datasource_ids )
150214
151215 words = [info .word ]
152216 for child in info .other_words :
@@ -175,7 +239,8 @@ def create_terminology(session: SessionDep, info: TerminologyInfo, oid: int, tra
175239 if other_word .strip () == "" :
176240 continue
177241 _list .append (
178- Terminology (pid = result .id , word = other_word , create_time = create_time , oid = oid ))
242+ Terminology (pid = result .id , word = other_word , create_time = create_time , oid = oid ,
243+ specific_ds = specific_ds , datasource_ids = datasource_ids ))
179244 session .bulk_save_objects (_list )
180245 session .flush ()
181246 session .commit ()
@@ -214,9 +279,14 @@ def update_terminology(session: SessionDep, info: TerminologyInfo, oid: int, tra
214279 if exists :
215280 raise Exception (trans ("i18n_terminology.exists_in_db" ))
216281
282+ specific_ds = info .specific_ds if info .specific_ds is not None else False
283+ datasource_ids = info .datasource_ids if info .datasource_ids is not None else []
284+
217285 stmt = update (Terminology ).where (and_ (Terminology .id == info .id )).values (
218286 word = info .word ,
219287 description = info .description ,
288+ specific_ds = specific_ds ,
289+ datasource_ids = datasource_ids
220290 )
221291 session .execute (stmt )
222292 session .commit ()
@@ -232,7 +302,8 @@ def update_terminology(session: SessionDep, info: TerminologyInfo, oid: int, tra
232302 if other_word .strip () == "" :
233303 continue
234304 _list .append (
235- Terminology (pid = info .id , word = other_word , create_time = create_time , oid = oid ))
305+ Terminology (pid = info .id , word = other_word , create_time = create_time , oid = oid ,
306+ specific_ds = specific_ds , datasource_ids = datasource_ids ))
236307 session .bulk_save_objects (_list )
237308 session .flush ()
238309 session .commit ()
0 commit comments