Skip to content

Commit 8edb6ac

Browse files
committed
feat: URN 정보 조회 및 관련 쿼리 기능 추가
- DatahubMetadataFetcher 클래스에 특정 URN에 대한 메타데이터 및 쿼리 정보를 조회하는 기능 추가 - get_urn_info, get_queries_by_urn, get_glossary_terms_by_urn 메서드 구현
1 parent 5079c7f commit 8edb6ac

File tree

2 files changed

+248
-0
lines changed

2 files changed

+248
-0
lines changed

data_utils/datahub_source.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
ROOT_GLOSSARY_NODES_QUERY,
99
GLOSSARY_NODE_QUERY,
1010
LIST_QUERIES_QUERY,
11+
QUERIES_BY_URN_QUERY,
12+
GLOSSARY_TERMS_BY_URN_QUERY,
1113
)
1214

1315

@@ -587,3 +589,174 @@ def get_query_data(self, start=0, count=10, query="*", filters=None):
587589
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
588590
else:
589591
return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."}
592+
593+
def get_urn_info(self, urn):
594+
"""
595+
특정 URN에 대한 모든 관련 정보를 가져오는 함수
596+
597+
Args:
598+
urn (str): 조회할 데이터셋 URN
599+
600+
Returns:
601+
dict: URN에 대한 전체 메타데이터 정보
602+
"""
603+
print(f"\n=== URN 정보 조회: {urn} ===\n")
604+
605+
try:
606+
# 기본 테이블 메타데이터 가져오기
607+
metadata = self.build_table_metadata(urn)
608+
609+
# 결과 출력
610+
self._print_urn_details(metadata)
611+
612+
return metadata
613+
614+
except Exception as e:
615+
error_msg = f"URN 정보 조회 중 오류 발생: {str(e)}"
616+
print(error_msg)
617+
return {"error": True, "message": error_msg}
618+
619+
def _print_urn_details(self, metadata):
620+
"""URN 메타데이터를 보기 좋게 출력하는 내부 함수"""
621+
622+
# 테이블 기본 정보
623+
print("📋 테이블 정보:")
624+
print(f" 이름: {metadata.get('table_name', 'N/A')}")
625+
print(f" 설명: {metadata.get('description', 'N/A')}\n")
626+
627+
# 컬럼 정보
628+
columns = metadata.get("columns", [])
629+
if columns:
630+
print(f"📊 컬럼 정보 ({len(columns)}개):")
631+
for i, col in enumerate(columns, 1):
632+
print(f" {i}. {col['column_name']} ({col.get('column_type', 'N/A')})")
633+
if col.get("column_description"):
634+
print(f" → {col['column_description']}")
635+
print()
636+
637+
# 리니지 정보
638+
lineage = metadata.get("lineage", {})
639+
640+
# Downstream 테이블
641+
downstream = lineage.get("downstream", [])
642+
if downstream:
643+
print(f"⬇️ Downstream 테이블 ({len(downstream)}개):")
644+
for table in downstream:
645+
print(f" - {table['table']} (degree: {table['degree']})")
646+
print()
647+
648+
# Upstream 테이블
649+
upstream = lineage.get("upstream", [])
650+
if upstream:
651+
print(f"⬆️ Upstream 테이블 ({len(upstream)}개):")
652+
for table in upstream:
653+
print(f" - {table['table']} (degree: {table['degree']})")
654+
print()
655+
656+
# 컬럼 레벨 리니지
657+
upstream_columns = lineage.get("upstream_columns", [])
658+
if upstream_columns:
659+
print("🔗 컬럼 레벨 리니지:")
660+
for upstream_dataset in upstream_columns:
661+
dataset_name = upstream_dataset["upstream_dataset"]
662+
columns = upstream_dataset["columns"]
663+
print(f" 📋 {dataset_name}:")
664+
for col in columns:
665+
confidence = col.get("confidence", 1.0)
666+
print(
667+
f" {col['upstream_column']}{col['downstream_column']} (신뢰도: {confidence})"
668+
)
669+
print()
670+
671+
def get_queries_by_urn(self, dataset_urn):
672+
"""
673+
특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수
674+
675+
전체 쿼리를 가져온 후 클라이언트 사이드에서 필터링하는 방식 사용
676+
677+
Args:
678+
dataset_urn (str): 데이터셋 URN
679+
680+
Returns:
681+
dict: 연관된 쿼리 목록
682+
"""
683+
# 먼저 전체 쿼리 목록을 가져옴
684+
input_params = {"start": 0, "count": 1000, "query": "*"} # 충분히 큰 수로 설정
685+
686+
variables = {"input": input_params}
687+
688+
headers = {"Content-Type": "application/json"}
689+
response = requests.post(
690+
f"{self.gms_server}/api/graphql",
691+
json={"query": QUERIES_BY_URN_QUERY, "variables": variables},
692+
headers=headers,
693+
)
694+
695+
if response.status_code == 200:
696+
result = response.json()
697+
if "data" in result and "listQueries" in result["data"]:
698+
# 클라이언트 사이드에서 특정 URN과 연관된 쿼리만 필터링
699+
all_queries = result["data"]["listQueries"]["queries"]
700+
filtered_queries = []
701+
702+
for query in all_queries:
703+
subjects = query.get("subjects", [])
704+
for subject in subjects:
705+
if subject.get("dataset", {}).get("urn") == dataset_urn:
706+
filtered_queries.append(query)
707+
break
708+
709+
# 필터링된 결과로 응답 구조 재구성
710+
result["data"]["listQueries"]["queries"] = filtered_queries
711+
result["data"]["listQueries"]["count"] = len(filtered_queries)
712+
713+
return result
714+
else:
715+
return {
716+
"error": True,
717+
"status_code": response.status_code,
718+
"message": response.text,
719+
}
720+
721+
def get_glossary_terms_by_urn(self, dataset_urn):
722+
"""
723+
특정 데이터셋 URN의 glossary terms를 조회하는 함수
724+
725+
Args:
726+
dataset_urn (str): 데이터셋 URN
727+
728+
Returns:
729+
dict: glossary terms 정보
730+
"""
731+
variables = {"urn": dataset_urn}
732+
733+
headers = {"Content-Type": "application/json"}
734+
response = requests.post(
735+
f"{self.gms_server}/api/graphql",
736+
json={"query": GLOSSARY_TERMS_BY_URN_QUERY, "variables": variables},
737+
headers=headers,
738+
)
739+
740+
if response.status_code == 200:
741+
return response.json()
742+
else:
743+
return {
744+
"error": True,
745+
"status_code": response.status_code,
746+
"message": response.text,
747+
}
748+
749+
750+
if __name__ == "__main__":
751+
fetcher = DatahubMetadataFetcher()
752+
753+
print(
754+
fetcher.get_queries_by_urn(
755+
"urn:li:dataset:(urn:li:dataPlatform:dbt,small_bank_1.small_bank_1.ACCOUNTS,PROD)"
756+
)
757+
)
758+
print(
759+
fetcher.get_glossary_terms_by_urn(
760+
"urn:li:dataset:(urn:li:dataPlatform:dbt,small_bank_1.small_bank_1.ACCOUNTS,PROD)"
761+
)
762+
)

data_utils/queries.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3617,3 +3617,78 @@
36173617
__typename
36183618
}
36193619
"""
3620+
3621+
# 특정 URN과 연관된 쿼리를 찾는 GraphQL 쿼리 (수정된 버전)
3622+
QUERIES_BY_URN_QUERY = """
3623+
query listQueries($input: ListQueriesInput!) {
3624+
listQueries(input: $input) {
3625+
start
3626+
total
3627+
count
3628+
queries {
3629+
urn
3630+
properties {
3631+
name
3632+
description
3633+
statement {
3634+
value
3635+
language
3636+
__typename
3637+
}
3638+
__typename
3639+
}
3640+
subjects {
3641+
dataset {
3642+
urn
3643+
name
3644+
__typename
3645+
}
3646+
__typename
3647+
}
3648+
__typename
3649+
}
3650+
__typename
3651+
}
3652+
}
3653+
"""
3654+
3655+
# 특정 URN의 glossary terms를 조회하는 GraphQL 쿼리
3656+
GLOSSARY_TERMS_BY_URN_QUERY = """
3657+
query getDataset($urn: String!) {
3658+
dataset(urn: $urn) {
3659+
urn
3660+
name
3661+
glossaryTerms {
3662+
terms {
3663+
term {
3664+
urn
3665+
name
3666+
type
3667+
hierarchicalName
3668+
properties {
3669+
name
3670+
description
3671+
definition
3672+
__typename
3673+
}
3674+
parentNodes {
3675+
nodes {
3676+
urn
3677+
properties {
3678+
name
3679+
__typename
3680+
}
3681+
__typename
3682+
}
3683+
__typename
3684+
}
3685+
__typename
3686+
}
3687+
__typename
3688+
}
3689+
__typename
3690+
}
3691+
__typename
3692+
}
3693+
}
3694+
"""

0 commit comments

Comments
 (0)