Skip to content

Commit c31166d

Browse files
committed
feat: Add build_table_metadata function to generate table metadata with lineage
- Generates table metadata including table name, description, and column info - Adds downstream and upstream lineage with degree filtering and optional sorting - Includes column-level upstream lineage for fine-grained analysis
1 parent 771abed commit c31166d

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

data_utils/datahub_source.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,55 @@ def min_degree_lineage(self, lineage_result):
225225
)
226226

227227
return table_degrees
228+
229+
def build_table_metadata(self, urn, max_degree=2, sort_by_degree=True):
230+
# 테이블 단위로 테이블 이름, 설명, 컬럼, 테이블 별 리니지(downstream/upstream), 컬럼 별 리니지(upstream)이 포함된 메타데이터 생성 함수
231+
"""
232+
Builds table metadata including description, columns, and lineage info.
233+
234+
Args:
235+
urn (str): Dataset URN
236+
max_degree (int): Max lineage depth to include (filtering)
237+
sort_by_degree (bool): Whether to sort downstream/upstream tables by degree
238+
239+
Returns:
240+
dict: Table metadata
241+
"""
242+
metadata = {
243+
"table_name": self.get_table_name(urn),
244+
"description": self.get_table_description(urn),
245+
"columns": self.get_column_names_and_descriptions(urn),
246+
"lineage": {},
247+
}
248+
249+
def process_lineage(direction):
250+
# direction : DOWNSTREAM/UPSTREAM 별로 degree가 최소인 lineage를 가져오는 함수
251+
252+
# 테이블 lineage 가져오기
253+
lineage_result = self.get_table_lineage(urn, direction=direction)
254+
table_degrees = self.min_degree_lineage(lineage_result)
255+
256+
# degree 필터링
257+
filtered_lineage = [
258+
{"table": table, "degree": degree}
259+
for table, degree in table_degrees.items()
260+
if degree <= max_degree
261+
]
262+
263+
# degree 기준 정렬
264+
if sort_by_degree:
265+
filtered_lineage.sort(key=lambda x: x["degree"])
266+
267+
return filtered_lineage
268+
269+
# DOWNSTREAM / UPSTREAM 링크 추가
270+
metadata["lineage"]["downstream"] = process_lineage("DOWNSTREAM")
271+
metadata["lineage"]["upstream"] = process_lineage("UPSTREAM")
272+
273+
# 컬럼 단위 lineage 추가
274+
column_lineage = self.get_column_lineage(urn)
275+
metadata["lineage"]["upstream_columns"] = column_lineage.get(
276+
"lineage_by_upstream_dataset", []
277+
)
278+
279+
return metadata

0 commit comments

Comments
 (0)