Skip to content

Commit c21facf

Browse files
authored
Merge pull request #133 from #132
Feature/132 metadata 통합 강화 enhancement glossary쿼리예시 메타데이터를 sql 생성 컨텍스트에 반영
2 parents 7ea8648 + e5938a6 commit c21facf

File tree

11 files changed

+1119
-547
lines changed

11 files changed

+1119
-547
lines changed

cli/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ def cli(
122122
except Exception as e:
123123
click.secho(f"환경 변수 로드 중 오류 발생: {str(e)}", fg="red")
124124
ctx.exit(1)
125+
else:
126+
dotenv.load_dotenv(override=True)
125127

126128
# 프롬프트 디렉토리를 환경 변수로 설정
127129
if prompt_dir_path:
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
"""
2+
DataHub 유틸리티 패키지
3+
4+
DataHub와의 상호작용을 위한 모듈들을 제공합니다.
5+
6+
주요 구성요소:
7+
- DataHubBaseClient: 기본 연결 및 통신
8+
- MetadataService: 메타데이터, 리니지, URN 관련 기능
9+
- QueryService: 쿼리 관련 기능
10+
- GlossaryService: 용어집 관련 기능
11+
"""
12+
13+
from .base_client import DataHubBaseClient
14+
from .metadata_service import MetadataService
15+
from .query_service import QueryService
16+
from .glossary_service import GlossaryService
17+
18+
__all__ = ["DataHubBaseClient", "MetadataService", "QueryService", "GlossaryService"]
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""
2+
DataHub 기본 클라이언트 모듈
3+
4+
DataHub GMS 서버와의 기본 연결 및 통신 기능을 제공합니다.
5+
"""
6+
7+
import requests
8+
from datahub.emitter.rest_emitter import DatahubRestEmitter
9+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
10+
11+
12+
class DataHubBaseClient:
13+
"""DataHub 기본 클라이언트 클래스"""
14+
15+
def __init__(self, gms_server="http://localhost:8080", extra_headers={}):
16+
"""
17+
DataHub 클라이언트 초기화
18+
19+
Args:
20+
gms_server (str): DataHub GMS 서버 URL
21+
extra_headers (dict): 추가 HTTP 헤더
22+
"""
23+
# gms_server 주소 유효성 검사
24+
if not self._is_valid_gms_server(gms_server):
25+
raise ValueError(f"유효하지 않은 GMS 서버 주소: {gms_server}")
26+
27+
self.gms_server = gms_server
28+
self.extra_headers = extra_headers
29+
30+
# DataHub 클라이언트 초기화
31+
self.emitter = DatahubRestEmitter(
32+
gms_server=gms_server, extra_headers=extra_headers
33+
)
34+
self.datahub_graph = self.emitter.to_graph()
35+
36+
def _is_valid_gms_server(self, gms_server):
37+
"""
38+
GMS 서버 주소의 유효성을 검사하는 함수
39+
40+
Args:
41+
gms_server (str): 검사할 GMS 서버 URL
42+
43+
Returns:
44+
bool: 서버가 유효한 경우 True
45+
"""
46+
query = {"query": "{ health { status } }"}
47+
headers = {"Content-Type": "application/json"}
48+
49+
try:
50+
response = requests.post(
51+
f"{gms_server}/api/graphql", json=query, headers=headers
52+
)
53+
return response.status_code == 200
54+
except requests.exceptions.RequestException:
55+
return False
56+
57+
def execute_graphql_query(self, query, variables=None):
58+
"""
59+
GraphQL 쿼리 실행
60+
61+
Args:
62+
query (str): GraphQL 쿼리 문자열
63+
variables (dict, optional): 쿼리 변수
64+
65+
Returns:
66+
dict: GraphQL 응답
67+
"""
68+
headers = {"Content-Type": "application/json"}
69+
payload = {"query": query}
70+
71+
if variables:
72+
payload["variables"] = variables
73+
74+
response = requests.post(
75+
f"{self.gms_server}/api/graphql",
76+
json=payload,
77+
headers=headers,
78+
)
79+
80+
if response.status_code == 200:
81+
return response.json()
82+
else:
83+
return {
84+
"error": True,
85+
"status_code": response.status_code,
86+
"message": response.text,
87+
}
88+
89+
def get_datahub_graph(self):
90+
"""DataHub Graph 클라이언트 반환"""
91+
return self.datahub_graph
92+
93+
def get_urns(self):
94+
"""필터를 적용하여 데이터셋의 URN 가져오기"""
95+
return self.datahub_graph.get_urns_by_filter()
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""
2+
DataHub 용어집 서비스 모듈
3+
4+
DataHub의 glossary 관련 기능을 제공합니다.
5+
"""
6+
7+
from data_utils.queries import (
8+
ROOT_GLOSSARY_NODES_QUERY,
9+
GLOSSARY_NODE_QUERY,
10+
GLOSSARY_TERMS_BY_URN_QUERY,
11+
)
12+
from data_utils.datahub_services.base_client import DataHubBaseClient
13+
14+
15+
class GlossaryService:
16+
"""용어집 관련 서비스 클래스"""
17+
18+
def __init__(self, client: DataHubBaseClient):
19+
"""
20+
용어집 서비스 초기화
21+
22+
Args:
23+
client (DataHubBaseClient): DataHub 기본 클라이언트
24+
"""
25+
self.client = client
26+
27+
def get_root_glossary_nodes(self):
28+
"""
29+
DataHub에서 루트 용어집 노드를 가져오는 함수
30+
31+
Returns:
32+
dict: 루트 용어집 노드 정보
33+
"""
34+
return self.client.execute_graphql_query(ROOT_GLOSSARY_NODES_QUERY)
35+
36+
def get_glossary_node_by_urn(self, urn):
37+
"""
38+
DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수
39+
40+
Args:
41+
urn (str): 용어집 노드의 URN
42+
43+
Returns:
44+
dict: 용어집 노드 정보와 자식 항목
45+
"""
46+
variables = {"urn": urn}
47+
return self.client.execute_graphql_query(GLOSSARY_NODE_QUERY, variables)
48+
49+
def get_node_basic_info(self, node, index):
50+
"""
51+
용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수
52+
53+
Args:
54+
node (dict): 용어집 노드 정보
55+
index (int): 노드의 인덱스
56+
57+
Returns:
58+
dict: 노드의 기본 정보
59+
"""
60+
result = {"index": index, "name": node["properties"]["name"]}
61+
62+
if node["properties"] and node["properties"].get("description"):
63+
result["description"] = node["properties"]["description"]
64+
65+
# 자식 노드/용어 관계 정보 수 추가
66+
if "children" in node and node["children"]["total"] > 0:
67+
result["child_count"] = node["children"]["total"]
68+
69+
return result
70+
71+
def get_child_entity_info(self, entity, index):
72+
"""
73+
자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수
74+
75+
Args:
76+
entity (dict): 자식 엔티티 정보
77+
index (int): 엔티티의 인덱스
78+
79+
Returns:
80+
dict: 엔티티 정보
81+
"""
82+
entity_type = entity["type"]
83+
result = {"index": index, "type": entity_type}
84+
85+
if entity_type == "GLOSSARY_TERM":
86+
if "properties" in entity and entity["properties"]:
87+
result["name"] = entity["properties"].get("name", "N/A")
88+
89+
if (
90+
"description" in entity["properties"]
91+
and entity["properties"]["description"]
92+
):
93+
result["description"] = entity["properties"]["description"]
94+
95+
elif entity_type == "GLOSSARY_NODE":
96+
if "properties" in entity and entity["properties"]:
97+
result["name"] = entity["properties"].get("name", "N/A")
98+
99+
return result
100+
101+
def process_node_details(self, node):
102+
"""
103+
노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수
104+
105+
Args:
106+
node (dict): 용어집 노드 정보
107+
108+
Returns:
109+
dict: 노드의 상세 정보
110+
"""
111+
node_urn = node["urn"]
112+
detailed_node = self.get_glossary_node_by_urn(node_urn)
113+
114+
result = {"name": node["properties"]["name"], "children": []}
115+
116+
if (
117+
detailed_node
118+
and "data" in detailed_node
119+
and "glossaryNode" in detailed_node["data"]
120+
):
121+
node_detail = detailed_node["data"]["glossaryNode"]
122+
123+
# 자식 항목 정보 추출
124+
if "children" in node_detail and node_detail["children"]["total"] > 0:
125+
relationships = node_detail["children"]["relationships"]
126+
127+
for j, rel in enumerate(relationships, 1):
128+
entity = rel["entity"]
129+
result["children"].append(self.get_child_entity_info(entity, j))
130+
131+
return result
132+
133+
def process_glossary_nodes(self, result):
134+
"""
135+
용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수
136+
137+
Args:
138+
result (dict): API 응답 결과
139+
140+
Returns:
141+
dict: 처리된 용어집 노드 데이터
142+
"""
143+
if "error" in result:
144+
return result
145+
146+
processed_result = {"total_nodes": 0, "nodes": []}
147+
148+
# 노드 목록 추출
149+
nodes = result["data"]["getRootGlossaryNodes"]["nodes"]
150+
processed_result["total_nodes"] = len(nodes)
151+
152+
for i, node in enumerate(nodes, 1):
153+
node_info = self.get_node_basic_info(node, i)
154+
155+
# 자식 노드가 있으면 상세 정보 처리
156+
if "children" in node and node["children"]["total"] > 0:
157+
node_details = self.process_node_details(node)
158+
node_info["details"] = node_details
159+
160+
processed_result["nodes"].append(node_info)
161+
162+
return processed_result
163+
164+
def get_glossary_data(self):
165+
"""
166+
DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수
167+
168+
Returns:
169+
dict: 처리된 용어집 데이터
170+
"""
171+
# DataHub 서버에 연결하여 용어집 노드 가져오기
172+
result = self.get_root_glossary_nodes()
173+
174+
# 결과 처리
175+
if result:
176+
try:
177+
return self.process_glossary_nodes(result)
178+
except KeyError as e:
179+
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
180+
else:
181+
return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."}
182+
183+
def get_glossary_terms_by_urn(self, dataset_urn):
184+
"""
185+
특정 데이터셋 URN의 glossary terms를 조회하는 함수
186+
187+
Args:
188+
dataset_urn (str): 데이터셋 URN
189+
190+
Returns:
191+
dict: glossary terms 정보
192+
"""
193+
variables = {"urn": dataset_urn}
194+
return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables)

0 commit comments

Comments
 (0)