Skip to content

Commit 707a637

Browse files
committed
feat: Add DataHub Glossary integration for domain term understanding
1 parent dc1b65f commit 707a637

File tree

3 files changed

+1103
-0
lines changed

3 files changed

+1103
-0
lines changed

data_utils/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# data_utils 패키지 초기화 파일

data_utils/datahub_source.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from datahub.metadata.schema_classes import UpstreamLineageClass
77
from collections import defaultdict
88
import requests
9+
from data_utils.queries import ROOT_GLOSSARY_NODES_QUERY, GLOSSARY_NODE_QUERY
910

1011

1112
class DatahubMetadataFetcher:
@@ -292,3 +293,190 @@ def process_lineage(direction):
292293
)
293294

294295
return metadata
296+
297+
def get_root_glossary_nodes(self):
298+
"""
299+
DataHub에서 루트 용어집 노드를 가져오는 함수
300+
301+
Returns:
302+
dict: 루트 용어집 노드 정보
303+
"""
304+
# GraphQL 요청 보내기
305+
headers = {"Content-Type": "application/json"}
306+
response = requests.post(
307+
f"{self.gms_server}/api/graphql",
308+
json={"query": ROOT_GLOSSARY_NODES_QUERY},
309+
headers=headers,
310+
)
311+
312+
# 결과 반환
313+
if response.status_code == 200:
314+
return response.json()
315+
else:
316+
return {
317+
"error": True,
318+
"status_code": response.status_code,
319+
"message": response.text,
320+
}
321+
322+
def get_glossary_node_by_urn(self, urn):
323+
"""
324+
DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수
325+
326+
Args:
327+
urn (str): 용어집 노드의 URN
328+
329+
Returns:
330+
dict: 용어집 노드 정보와 자식 항목
331+
"""
332+
# GraphQL 요청 보내기
333+
headers = {"Content-Type": "application/json"}
334+
response = requests.post(
335+
f"{self.gms_server}/api/graphql",
336+
json={"query": GLOSSARY_NODE_QUERY, "variables": {"urn": urn}},
337+
headers=headers,
338+
)
339+
340+
# 결과 반환
341+
if response.status_code == 200:
342+
return response.json()
343+
else:
344+
return {
345+
"error": True,
346+
"status_code": response.status_code,
347+
"message": response.text,
348+
}
349+
350+
def get_node_basic_info(self, node, index):
351+
"""
352+
용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수
353+
354+
Args:
355+
node (dict): 용어집 노드 정보
356+
index (int): 노드의 인덱스
357+
358+
Returns:
359+
dict: 노드의 기본 정보
360+
"""
361+
result = {"index": index, "name": node["properties"]["name"]}
362+
363+
if node["properties"] and node["properties"].get("description"):
364+
result["description"] = node["properties"]["description"]
365+
366+
# 자식 노드/용어 관계 정보 수 추가
367+
if "children" in node and node["children"]["total"] > 0:
368+
result["child_count"] = node["children"]["total"]
369+
370+
return result
371+
372+
def get_child_entity_info(self, entity, index):
373+
"""
374+
자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수
375+
376+
Args:
377+
entity (dict): 자식 엔티티 정보
378+
index (int): 엔티티의 인덱스
379+
380+
Returns:
381+
dict: 엔티티 정보
382+
"""
383+
entity_type = entity["type"]
384+
result = {"index": index, "type": entity_type}
385+
386+
if entity_type == "GLOSSARY_TERM":
387+
if "properties" in entity and entity["properties"]:
388+
result["name"] = entity["properties"].get("name", "N/A")
389+
390+
if (
391+
"description" in entity["properties"]
392+
and entity["properties"]["description"]
393+
):
394+
result["description"] = entity["properties"]["description"]
395+
396+
elif entity_type == "GLOSSARY_NODE":
397+
if "properties" in entity and entity["properties"]:
398+
result["name"] = entity["properties"].get("name", "N/A")
399+
400+
return result
401+
402+
def process_node_details(self, node):
403+
"""
404+
노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수
405+
406+
Args:
407+
node (dict): 용어집 노드 정보
408+
409+
Returns:
410+
dict: 노드의 상세 정보
411+
"""
412+
node_urn = node["urn"]
413+
detailed_node = self.get_glossary_node_by_urn(node_urn)
414+
415+
result = {"name": node["properties"]["name"], "children": []}
416+
417+
if (
418+
detailed_node
419+
and "data" in detailed_node
420+
and "glossaryNode" in detailed_node["data"]
421+
):
422+
node_detail = detailed_node["data"]["glossaryNode"]
423+
424+
# 자식 항목 정보 추출
425+
if "children" in node_detail and node_detail["children"]["total"] > 0:
426+
relationships = node_detail["children"]["relationships"]
427+
428+
for j, rel in enumerate(relationships, 1):
429+
entity = rel["entity"]
430+
result["children"].append(self.get_child_entity_info(entity, j))
431+
432+
return result
433+
434+
def process_glossary_nodes(self, result):
435+
"""
436+
용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수
437+
438+
Args:
439+
result (dict): API 응답 결과
440+
441+
Returns:
442+
dict: 처리된 용어집 노드 데이터
443+
"""
444+
if "error" in result:
445+
return result
446+
447+
processed_result = {"total_nodes": 0, "nodes": []}
448+
449+
# 노드 목록 추출
450+
nodes = result["data"]["getRootGlossaryNodes"]["nodes"]
451+
processed_result["total_nodes"] = len(nodes)
452+
453+
for i, node in enumerate(nodes, 1):
454+
node_info = self.get_node_basic_info(node, i)
455+
456+
# 자식 노드가 있으면 상세 정보 처리
457+
if "children" in node and node["children"]["total"] > 0:
458+
node_details = self.process_node_details(node)
459+
node_info["details"] = node_details
460+
461+
processed_result["nodes"].append(node_info)
462+
463+
return processed_result
464+
465+
def get_glossary_data(self):
466+
"""
467+
DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수
468+
469+
Returns:
470+
dict: 처리된 용어집 데이터
471+
"""
472+
# DataHub 서버에 연결하여 용어집 노드 가져오기
473+
result = self.get_root_glossary_nodes()
474+
475+
# 결과 처리
476+
if result:
477+
try:
478+
return self.process_glossary_nodes(result)
479+
except KeyError as e:
480+
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
481+
else:
482+
return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."}

0 commit comments

Comments
 (0)