Skip to content

Commit 3e155a3

Browse files
committed
feat : Add lineage fetching function
1 parent 10a3db5 commit 3e155a3

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed

data_utils/datahub_source.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,71 @@ def get_column_names_and_descriptions(self, urn):
6767
}
6868
)
6969
return columns
70+
71+
def get_linage(
72+
self,
73+
gms_server_url,
74+
urn,
75+
counts=100,
76+
direction="DOWNSTREAM",
77+
degree_values=None,
78+
):
79+
# URN에 대한 DOWNSTREAM/UPSTREAM lineage entity counts 만큼 가져오는 함수
80+
# degree_values에 따라 lineage depth가 결정
81+
"""
82+
Fetches downstream/upstream lineage entities for a given dataset URN using DataHub's GraphQL API.
83+
84+
Args:
85+
gms_server_url (str): DataHub GMS endpoint.
86+
urn (str): Dataset URN to fetch lineage for.
87+
count (int): Maximum number of entities to fetch (default=100).
88+
direction (str): DOWNSTREAM or UPSTREAM.
89+
degree_values (List[str]): Degree filter values like ["1", "2", "3+"]. Defaults to ["1", "2"].
90+
91+
Returns:
92+
List[Tuple[str, dict]]: A list containing the dataset URN and its lineage result.
93+
"""
94+
95+
if degree_values is None:
96+
degree_values = ["1", "2"]
97+
98+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
99+
100+
datahub_graph = DataHubGraph(DatahubClientConfig(server=gms_server_url))
101+
102+
query = """
103+
query scrollAcrossLineage($input: ScrollAcrossLineageInput!) {
104+
scrollAcrossLineage(input: $input) {
105+
searchResults {
106+
degree
107+
entity {
108+
urn
109+
type
110+
}
111+
}
112+
}
113+
}
114+
"""
115+
variables = {
116+
"input": {
117+
"query": "*",
118+
"urn": urn,
119+
"count": counts,
120+
"direction": direction,
121+
"orFilters": [
122+
{
123+
"and": [
124+
{
125+
"condition": "EQUAL",
126+
"negated": "false",
127+
"field": "degree",
128+
"values": degree_values,
129+
}
130+
]
131+
}
132+
],
133+
}
134+
}
135+
136+
result = datahub_graph.execute_graphql(query=query, variables=variables)
137+
return [(urn, result)]

0 commit comments

Comments
 (0)