@@ -76,7 +76,7 @@ def _get_table_info(max_workers: int = 8) -> Dict[str, str]:
7676
7777
7878def _get_column_info (
79- table_name : str , urn_table_mapping : Dict [str , str ], max_workers : int = 8
79+ table_name : str , urn_table_mapping : Dict [str , str ]
8080) -> List [Dict [str , str ]]:
8181 target_urn = urn_table_mapping .get (table_name )
8282 if not target_urn :
@@ -103,7 +103,21 @@ def _extract_dataset_name_from_urn(urn: str) -> Optional[str]:
103103 return None
104104
105105
106- def get_info_from_db (max_workers : int = 8 ) -> List [Document ]:
106+ def get_metadata_from_db () -> List [Dict ]:
107+ fetcher = _get_fetcher ()
108+ urns = list (fetcher .get_urns ())
109+
110+ metadata = []
111+ total = len (urns )
112+ for idx , urn in enumerate (urns , 1 ):
113+ print (f"[{ idx } /{ total } ] Processing URN: { urn } " )
114+ table_metadata = fetcher .build_table_metadata (urn )
115+ metadata .append (table_metadata )
116+
117+ return metadata
118+
119+
120+ def _prepare_datahub_metadata_mappings (max_workers : int = 8 ):
107121 table_info = _get_table_info (max_workers = max_workers )
108122
109123 fetcher = _get_fetcher ()
@@ -118,20 +132,31 @@ def get_info_from_db(max_workers: int = 8) -> List[Document]:
118132 if parsed_name :
119133 display_name_by_table [original_name ] = parsed_name
120134
121- def process_table_info (item : tuple [str , str , str ]) -> str :
122- original_table_name , table_description , display_table_name = item
123- # 컬럼 조회는 기존 테이블 이름으로 수행 (urn_table_mapping과 일치)
124- column_info = _get_column_info (
125- original_table_name , urn_table_mapping , max_workers = max_workers
126- )
127- column_info_str = "\n " .join (
128- [
129- f"{ col ['column_name' ]} : { col ['column_description' ]} "
130- for col in column_info
131- ]
132- )
133- used_name = display_table_name or original_table_name
134- return f"{ used_name } : { table_description } \n Columns:\n { column_info_str } "
135+ return table_info , urn_table_mapping , display_name_by_table
136+
137+
138+ def _format_datahub_table_info (
139+ item : tuple [str , str , str ], urn_table_mapping : Dict [str , str ]
140+ ) -> Dict :
141+ original_table_name , table_description , display_table_name = item
142+ # 컬럼 조회는 기존 테이블 이름으로 수행 (urn_table_mapping과 일치)
143+ column_info = _get_column_info (original_table_name , urn_table_mapping )
144+
145+ columns = {col ["column_name" ]: col ["column_description" ] for col in column_info }
146+
147+ used_name = display_table_name or original_table_name
148+ return {
149+ used_name : {
150+ "table_description" : table_description ,
151+ "columns" : columns ,
152+ }
153+ }
154+
155+
156+ def get_table_schema (max_workers : int = 8 ) -> List [Dict ]:
157+ table_info , urn_table_mapping , display_name_by_table = (
158+ _prepare_datahub_metadata_mappings (max_workers )
159+ )
135160
136161 # 표시용 이름을 세 번째 파라미터로 함께 전달
137162 items_with_display = [
@@ -143,25 +168,15 @@ def process_table_info(item: tuple[str, str, str]) -> str:
143168 for name , desc in table_info .items ()
144169 ]
145170
146- table_info_str_list = parallel_process (
171+ # parallel_process에 전달할 함수 래핑
172+ def process_fn (item ):
173+ return _format_datahub_table_info (item , urn_table_mapping )
174+
175+ table_info_list = parallel_process (
147176 items_with_display ,
148- process_table_info ,
177+ process_fn ,
149178 max_workers = max_workers ,
150179 desc = "컬럼 정보 수집 중" ,
151180 )
152181
153- return [Document (page_content = info ) for info in table_info_str_list ]
154-
155-
156- def get_metadata_from_db () -> List [Dict ]:
157- fetcher = _get_fetcher ()
158- urns = list (fetcher .get_urns ())
159-
160- metadata = []
161- total = len (urns )
162- for idx , urn in enumerate (urns , 1 ):
163- print (f"[{ idx } /{ total } ] Processing URN: { urn } " )
164- table_metadata = fetcher .build_table_metadata (urn )
165- metadata .append (table_metadata )
166-
167- return metadata
182+ return table_info_list
0 commit comments