Skip to content

Commit 480c6e5

Browse files
committed
feat: Implement query fetching in DatahubMetadataFetcher
1 parent 707a637 commit 480c6e5

File tree

2 files changed

+2808
-1
lines changed

2 files changed

+2808
-1
lines changed

data_utils/datahub_source.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
from datahub.metadata.schema_classes import UpstreamLineageClass
77
from collections import defaultdict
88
import requests
9-
from data_utils.queries import ROOT_GLOSSARY_NODES_QUERY, GLOSSARY_NODE_QUERY
9+
from data_utils.queries import (
10+
ROOT_GLOSSARY_NODES_QUERY,
11+
GLOSSARY_NODE_QUERY,
12+
LIST_QUERIES_QUERY,
13+
)
1014

1115

1216
class DatahubMetadataFetcher:
@@ -480,3 +484,101 @@ def get_glossary_data(self):
480484
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
481485
else:
482486
return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."}
487+
488+
def get_queries(self, start=0, count=10, query="*", filters=None):
489+
"""
490+
DataHub에서 쿼리 목록을 가져오는 함수
491+
492+
Args:
493+
start (int): 시작 인덱스 (기본값=0)
494+
count (int): 반환할 쿼리 수 (기본값=10)
495+
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
496+
filters (list): 추가 필터 (기본값=None)
497+
498+
Returns:
499+
dict: 쿼리 목록 정보
500+
"""
501+
# GraphQL 요청용 입력 변수 준비
502+
input_params = {"start": start, "count": count, "query": query}
503+
504+
if filters:
505+
input_params["filters"] = filters
506+
507+
variables = {"input": input_params}
508+
509+
# GraphQL 요청 보내기
510+
headers = {"Content-Type": "application/json"}
511+
response = requests.post(
512+
f"{self.gms_server}/api/graphql",
513+
json={"query": LIST_QUERIES_QUERY, "variables": variables},
514+
headers=headers,
515+
)
516+
print(response.json())
517+
518+
# 결과 반환
519+
if response.status_code == 200:
520+
return response.json()
521+
else:
522+
return {
523+
"error": True,
524+
"status_code": response.status_code,
525+
"message": response.text,
526+
}
527+
528+
def process_queries(self, result):
529+
"""
530+
쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수
531+
532+
Args:
533+
result (dict): API 응답 결과
534+
535+
Returns:
536+
dict: 처리된 쿼리 목록 데이터 (urn, name, description, statement만 포함)
537+
"""
538+
if "error" in result:
539+
return result
540+
541+
processed_result = {"total_queries": 0, "count": 0, "start": 0, "queries": []}
542+
543+
if "data" in result and "listQueries" in result["data"]:
544+
list_queries = result["data"]["listQueries"]
545+
processed_result["total_queries"] = list_queries.get("total", 0)
546+
processed_result["count"] = list_queries.get("count", 0)
547+
processed_result["start"] = list_queries.get("start", 0)
548+
549+
for query in list_queries.get("queries", []):
550+
query_info = {"urn": query.get("urn")}
551+
552+
props = query.get("properties", {})
553+
query_info["name"] = props.get("name")
554+
query_info["description"] = props.get("description")
555+
query_info["statement"] = props.get("statement", {}).get("value")
556+
557+
processed_result["queries"].append(query_info)
558+
559+
return processed_result
560+
561+
def get_query_data(self, start=0, count=10, query="*", filters=None):
562+
"""
563+
DataHub에서 쿼리 목록을 가져와 처리하는 함수
564+
565+
Args:
566+
start (int): 시작 인덱스 (기본값=0)
567+
count (int): 반환할 쿼리 수 (기본값=10)
568+
query (str): 필터링에 사용할 쿼리 문자열 (기본값="*")
569+
filters (list): 추가 필터 (기본값=None)
570+
571+
Returns:
572+
dict: 처리된 쿼리 목록 데이터
573+
"""
574+
# DataHub 서버에 연결하여 쿼리 목록 가져오기
575+
result = self.get_queries(start, count, query, filters)
576+
577+
# 결과 처리
578+
if result:
579+
try:
580+
return self.process_queries(result)
581+
except KeyError as e:
582+
return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"}
583+
else:
584+
return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."}

0 commit comments

Comments
 (0)