Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# data_utils 패키지 초기화 파일
289 changes: 289 additions & 0 deletions data_utils/datahub_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from datahub.metadata.schema_classes import UpstreamLineageClass
from collections import defaultdict
import requests
from data_utils.queries import (
ROOT_GLOSSARY_NODES_QUERY,
GLOSSARY_NODE_QUERY,
LIST_QUERIES_QUERY,
)


class DatahubMetadataFetcher:
Expand Down Expand Up @@ -292,3 +297,287 @@ def process_lineage(direction):
)

return metadata

def get_root_glossary_nodes(self):
"""
DataHub에서 루트 용어집 노드를 가져오는 함수

Returns:
dict: 루트 용어집 노드 정보
"""
# GraphQL 요청 보내기
headers = {"Content-Type": "application/json"}
response = requests.post(
f"{self.gms_server}/api/graphql",
json={"query": ROOT_GLOSSARY_NODES_QUERY},
headers=headers,
)

# 결과 반환
if response.status_code == 200:
return response.json()
else:
return {
"error": True,
"status_code": response.status_code,
"message": response.text,
}

def get_glossary_node_by_urn(self, urn):
"""
DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수

Args:
urn (str): 용어집 노드의 URN

Returns:
dict: 용어집 노드 정보와 자식 항목
"""
# GraphQL 요청 보내기
headers = {"Content-Type": "application/json"}
response = requests.post(
f"{self.gms_server}/api/graphql",
json={"query": GLOSSARY_NODE_QUERY, "variables": {"urn": urn}},
headers=headers,
)

# 결과 반환
if response.status_code == 200:
return response.json()
else:
return {
"error": True,
"status_code": response.status_code,
"message": response.text,
}

def get_node_basic_info(self, node, index):
"""
용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수

Args:
node (dict): 용어집 노드 정보
index (int): 노드의 인덱스

Returns:
dict: 노드의 기본 정보
"""
result = {"index": index, "name": node["properties"]["name"]}

if node["properties"] and node["properties"].get("description"):
result["description"] = node["properties"]["description"]

# 자식 노드/용어 관계 정보 수 추가
if "children" in node and node["children"]["total"] > 0:
result["child_count"] = node["children"]["total"]

return result

def get_child_entity_info(self, entity, index):
"""
자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수

Args:
entity (dict): 자식 엔티티 정보
index (int): 엔티티의 인덱스

Returns:
dict: 엔티티 정보
"""
entity_type = entity["type"]
result = {"index": index, "type": entity_type}

if entity_type == "GLOSSARY_TERM":
if "properties" in entity and entity["properties"]:
result["name"] = entity["properties"].get("name", "N/A")

if (
"description" in entity["properties"]
and entity["properties"]["description"]
):
result["description"] = entity["properties"]["description"]

elif entity_type == "GLOSSARY_NODE":
if "properties" in entity and entity["properties"]:
result["name"] = entity["properties"].get("name", "N/A")

return result

def process_node_details(self, node):
"""
노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수

Args:
node (dict): 용어집 노드 정보

Returns:
dict: 노드의 상세 정보
"""
node_urn = node["urn"]
detailed_node = self.get_glossary_node_by_urn(node_urn)

result = {"name": node["properties"]["name"], "children": []}

if (
detailed_node
and "data" in detailed_node
and "glossaryNode" in detailed_node["data"]
):
node_detail = detailed_node["data"]["glossaryNode"]

# 자식 항목 정보 추출
if "children" in node_detail and node_detail["children"]["total"] > 0:
relationships = node_detail["children"]["relationships"]

for j, rel in enumerate(relationships, 1):
entity = rel["entity"]
result["children"].append(self.get_child_entity_info(entity, j))

return result

def process_glossary_nodes(self, result):
"""
용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수

Args:
result (dict): API 응답 결과

Returns:
dict: 처리된 용어집 노드 데이터
"""
if "error" in result:
return result

processed_result = {"total_nodes": 0, "nodes": []}

# 노드 목록 추출
nodes = result["data"]["getRootGlossaryNodes"]["nodes"]
processed_result["total_nodes"] = len(nodes)

for i, node in enumerate(nodes, 1):
node_info = self.get_node_basic_info(node, i)

# 자식 노드가 있으면 상세 정보 처리
if "children" in node and node["children"]["total"] > 0:
node_details = self.process_node_details(node)
node_info["details"] = node_details

processed_result["nodes"].append(node_info)

return processed_result

def get_glossary_data(self):
"""
DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수

Returns:
dict: 처리된 용어집 데이터
"""
# DataHub 서버에 연결하여 용어집 노드 가져오기
result = self.get_root_glossary_nodes()

# 결과 처리
if result:
try:
return self.process_glossary_nodes(result)
except KeyError as e:
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
else:
return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."}

def get_queries(self, start=0, count=10, query="*", filters=None):
"""
DataHub에서 쿼리 목록을 가져오는 함수

Args:
start (int): 시작 인덱스 (기본값=0)
count (int): 반환할 쿼리 수 (기본값=10)
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
filters (list): 추가 필터 (기본값=None)

Returns:
dict: 쿼리 목록 정보
"""
# GraphQL 요청용 입력 변수 준비
input_params = {"start": start, "count": count, "query": query}

if filters:
input_params["filters"] = filters

variables = {"input": input_params}

# GraphQL 요청 보내기
headers = {"Content-Type": "application/json"}
response = requests.post(
f"{self.gms_server}/api/graphql",
json={"query": LIST_QUERIES_QUERY, "variables": variables},
headers=headers,
)

# 결과 반환
if response.status_code == 200:
return response.json()
else:
return {
"error": True,
"status_code": response.status_code,
"message": response.text,
}

def process_queries(self, result):
"""
쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수

Args:
result (dict): API 응답 결과

Returns:
dict: 처리된 쿼리 목록 데이터 (urn, name, description, statement만 포함)
"""
if "error" in result:
return result

processed_result = {"total_queries": 0, "count": 0, "start": 0, "queries": []}

if "data" in result and "listQueries" in result["data"]:
list_queries = result["data"]["listQueries"]
processed_result["total_queries"] = list_queries.get("total", 0)
processed_result["count"] = list_queries.get("count", 0)
processed_result["start"] = list_queries.get("start", 0)

for query in list_queries.get("queries", []):
query_info = {"urn": query.get("urn")}

props = query.get("properties", {})
query_info["name"] = props.get("name")
query_info["description"] = props.get("description")
query_info["statement"] = props.get("statement", {}).get("value")

processed_result["queries"].append(query_info)

return processed_result

def get_query_data(self, start=0, count=10, query="*", filters=None):
"""
DataHub에서 쿼리 목록을 가져와 처리하는 함수

Args:
start (int): 시작 인덱스 (기본값=0)
count (int): 반환할 쿼리 수 (기본값=10)
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
filters (list): 추가 필터 (기본값=None)

Returns:
dict: 처리된 쿼리 목록 데이터
"""
# DataHub 서버에 연결하여 쿼리 목록 가져오기
result = self.get_queries(start, count, query, filters)

# 결과 처리
if result:
try:
return self.process_queries(result)
except KeyError as e:
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
else:
return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."}
Loading