Skip to content

Commit 8357fc8

Browse files
authored
feat(ingest): Browse Path v2 helper (#8012)
1 parent 287a292 commit 8357fc8

File tree

5 files changed

+311
-17
lines changed

5 files changed

+311
-17
lines changed

metadata-ingestion/src/datahub/emitter/mcp.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,38 @@ def from_obj(
178178
"value": json.dumps(obj["aspect"]["json"]),
179179
}
180180

181-
mcp = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)
181+
mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)
182182

183183
# We don't know how to deserialize the entity key aspects yet.
184-
if mcp.entityKeyAspect is not None:
185-
return mcp
184+
if mcpc.entityKeyAspect is not None:
185+
return mcpc
186186

187187
# Try to deserialize the aspect.
188-
converted, aspect = _try_from_generic_aspect(mcp.aspectName, mcp.aspect)
188+
return cls.try_from_mcpc(mcpc) or mcpc
189+
190+
@classmethod
191+
def try_from_mcpc(
192+
cls, mcpc: MetadataChangeProposalClass
193+
) -> Optional["MetadataChangeProposalWrapper"]:
194+
"""Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass.
195+
Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type.
196+
197+
Raises:
198+
Exception if the generic aspect is invalid, e.g. contains invalid json.
199+
"""
200+
converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
189201
if converted:
190202
return cls(
191-
entityType=mcp.entityType,
192-
entityUrn=mcp.entityUrn,
193-
changeType=mcp.changeType,
194-
auditHeader=mcp.auditHeader,
195-
aspectName=mcp.aspectName,
203+
entityType=mcpc.entityType,
204+
entityUrn=mcpc.entityUrn,
205+
changeType=mcpc.changeType,
206+
auditHeader=mcpc.auditHeader,
207+
aspectName=mcpc.aspectName,
196208
aspect=aspect,
197-
systemMetadata=mcp.systemMetadata,
209+
systemMetadata=mcpc.systemMetadata,
198210
)
199-
200-
return mcp
211+
else:
212+
return None
201213

202214
@classmethod
203215
def from_obj_require_wrapper(

metadata-ingestion/src/datahub/ingestion/api/source_helpers.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Callable, Iterable, Optional, Set, TypeVar, Union
1+
import logging
2+
from collections import defaultdict
3+
from typing import Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union
24

35
from datahub.emitter.mcp import MetadataChangeProposalWrapper
46
from datahub.ingestion.api.common import WorkUnit
@@ -8,6 +10,9 @@
810
StaleEntityRemovalHandler,
911
)
1012
from datahub.metadata.schema_classes import (
13+
BrowsePathEntryClass,
14+
BrowsePathsV2Class,
15+
ContainerClass,
1116
MetadataChangeEventClass,
1217
MetadataChangeProposalClass,
1318
StatusClass,
@@ -17,6 +22,8 @@
1722
from datahub.utilities.urns.urn import guess_entity_type
1823
from datahub.utilities.urns.urn_iter import list_urns
1924

25+
logger = logging.getLogger(__name__)
26+
2027

2128
def auto_workunit(
2229
stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]]
@@ -148,3 +155,50 @@ def auto_materialize_referenced_tags(
148155
entityUrn=urn,
149156
aspect=TagKeyClass(name=tag_urn.get_entity_id()[0]),
150157
).as_workunit()
158+
159+
160+
def auto_browse_path_v2(
161+
stream: Iterable[MetadataWorkUnit],
162+
) -> Iterable[MetadataWorkUnit]:
163+
"""Generate BrowsePathsV2 from Container aspects."""
164+
# TODO: Generate BrowsePathsV2 from BrowsePaths as well
165+
166+
ignore_urns: Set[str] = set()
167+
container_urns: Set[str] = set()
168+
parent_container_map: Dict[str, str] = {}
169+
children: Dict[str, List[str]] = defaultdict(list)
170+
for wu in stream:
171+
yield wu
172+
173+
urn = wu.get_urn()
174+
if guess_entity_type(urn) == "container":
175+
container_urns.add(urn)
176+
177+
container_aspects = wu.get_aspects_of_type(ContainerClass)
178+
for aspect in container_aspects:
179+
parent = aspect.container
180+
parent_container_map[urn] = parent
181+
children[parent].append(urn)
182+
183+
if wu.get_aspects_of_type(BrowsePathsV2Class):
184+
ignore_urns.add(urn)
185+
186+
paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path
187+
# Yield browse paths v2 in topological order, starting with root containers
188+
nodes = container_urns - parent_container_map.keys()
189+
while nodes:
190+
node = nodes.pop()
191+
nodes.update(children[node])
192+
193+
if node not in parent_container_map: # root
194+
paths[node] = []
195+
else:
196+
parent = parent_container_map[node]
197+
paths[node] = [*paths[parent], parent]
198+
if node not in ignore_urns:
199+
yield MetadataChangeProposalWrapper(
200+
entityUrn=node,
201+
aspect=BrowsePathsV2Class(
202+
path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]]
203+
),
204+
).as_workunit()

metadata-ingestion/src/datahub/ingestion/api/workunit.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import Iterable, Optional, Union, overload
2+
from typing import Iterable, List, Optional, Type, TypeVar, Union, overload
33

44
from deprecated import deprecated
55

@@ -9,7 +9,9 @@
99
MetadataChangeEvent,
1010
MetadataChangeProposal,
1111
)
12-
from datahub.metadata.schema_classes import UsageAggregationClass
12+
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect
13+
14+
T_Aspect = TypeVar("T_Aspect", bound=_Aspect)
1315

1416

1517
@dataclass
@@ -88,6 +90,27 @@ def get_urn(self) -> str:
8890
assert self.metadata.entityUrn
8991
return self.metadata.entityUrn
9092

93+
def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
94+
aspects: list
95+
if isinstance(self.metadata, MetadataChangeEvent):
96+
aspects = self.metadata.proposedSnapshot.aspects
97+
elif isinstance(self.metadata, MetadataChangeProposalWrapper):
98+
aspects = [self.metadata.aspect]
99+
elif isinstance(self.metadata, MetadataChangeProposal):
100+
aspects = []
101+
# Best effort attempt to deserialize MetadataChangeProposalClass
102+
if self.metadata.aspectName == aspect_cls.ASPECT_NAME:
103+
try:
104+
mcp = MetadataChangeProposalWrapper.try_from_mcpc(self.metadata)
105+
if mcp:
106+
aspects = [mcp.aspect]
107+
except Exception:
108+
pass
109+
else:
110+
raise ValueError(f"Unexpected type {type(self.metadata)}")
111+
112+
return [a for a in aspects if isinstance(a, aspect_cls)]
113+
91114
def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
92115
from datahub.emitter.mcp_builder import mcps_from_mce
93116

metadata-ingestion/tests/unit/test_source_helpers.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import List, Union
1+
from typing import Any, Dict, Iterable, List, Union
22

33
import datahub.metadata.schema_classes as models
4+
from datahub.emitter.mce_builder import make_container_urn
45
from datahub.emitter.mcp import MetadataChangeProposalWrapper
5-
from datahub.ingestion.api.source_helpers import auto_status_aspect, auto_workunit
6+
from datahub.ingestion.api.source_helpers import (
7+
auto_browse_path_v2,
8+
auto_status_aspect,
9+
auto_workunit,
10+
)
611
from datahub.ingestion.api.workunit import MetadataWorkUnit
712

813
_base_metadata: List[
@@ -75,3 +80,94 @@ def test_auto_status_aspect():
7580
),
7681
]
7782
assert list(auto_status_aspect(initial_wu)) == expected
83+
84+
85+
def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
86+
for k, v in d.items():
87+
yield MetadataChangeProposalWrapper(
88+
entityUrn=make_container_urn(k),
89+
aspect=models.StatusClass(removed=False),
90+
).as_workunit()
91+
92+
for child in list(v):
93+
yield MetadataChangeProposalWrapper(
94+
entityUrn=make_container_urn(child),
95+
aspect=models.ContainerClass(
96+
container=make_container_urn(k),
97+
),
98+
).as_workunit()
99+
if isinstance(v, dict):
100+
yield from _create_container_aspects(v)
101+
102+
103+
def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]:
104+
return [
105+
models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s))
106+
for s in path
107+
]
108+
109+
110+
def _get_browse_paths_from_wu(
111+
stream: Iterable[MetadataWorkUnit],
112+
) -> Dict[str, List[models.BrowsePathEntryClass]]:
113+
paths = {}
114+
for wu in stream:
115+
browse_path_v2 = wu.get_aspects_of_type(models.BrowsePathsV2Class)
116+
if browse_path_v2:
117+
name = wu.get_urn().split(":")[-1]
118+
paths[name] = browse_path_v2[0].path
119+
return paths
120+
121+
122+
def test_auto_browse_path_v2():
123+
structure = {
124+
"one": {
125+
"a": {"i": ["1", "2", "3"], "ii": ["4"]},
126+
"b": {"iii": ["5", "6"]},
127+
},
128+
"two": {
129+
"c": {"iv": [], "v": ["7", "8"]},
130+
},
131+
"three": {"d": {}},
132+
"four": {},
133+
}
134+
135+
wus = list(auto_status_aspect(_create_container_aspects(structure)))
136+
assert ( # Sanity check
137+
sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21
138+
)
139+
140+
new_wus = list(auto_browse_path_v2(wus))
141+
assert (
142+
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
143+
== 21
144+
)
145+
146+
paths = _get_browse_paths_from_wu(new_wus)
147+
assert paths["one"] == []
148+
assert paths["7"] == paths["8"] == _make_browse_path_entries(["two", "c", "v"])
149+
assert paths["d"] == _make_browse_path_entries(["three"])
150+
assert paths["i"] == _make_browse_path_entries(["one", "a"])
151+
152+
153+
def test_auto_browse_path_v2_ignores_urns_already_with():
154+
structure = {"a": {"b": {"c": {"d": ["e"]}}}}
155+
156+
mcp = MetadataChangeProposalWrapper(
157+
entityUrn=make_container_urn("c"),
158+
aspect=models.BrowsePathsV2Class(
159+
path=_make_browse_path_entries(["custom", "path"])
160+
),
161+
)
162+
wus = [*auto_status_aspect(_create_container_aspects(structure)), mcp.as_workunit()]
163+
164+
new_wus = list(auto_browse_path_v2(wus))
165+
assert (
166+
sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
167+
== 5
168+
)
169+
170+
paths = _get_browse_paths_from_wu(new_wus)
171+
assert paths["a"] == []
172+
assert paths["c"] == _make_browse_path_entries(["custom", "path"])
173+
assert paths["e"] == _make_browse_path_entries(["a", "b", "c", "d"])
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import json
2+
3+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
4+
from datahub.ingestion.api.workunit import MetadataWorkUnit
5+
from datahub.metadata.schema_classes import (
6+
ChangeTypeClass,
7+
ContainerClass,
8+
DatasetSnapshotClass,
9+
GenericAspectClass,
10+
MetadataChangeEventClass,
11+
MetadataChangeProposalClass,
12+
StatusClass,
13+
UpstreamLineageClass,
14+
)
15+
16+
17+
def test_get_aspects_of_type_mcp():
18+
aspect = StatusClass(False)
19+
wu = MetadataChangeProposalWrapper(
20+
entityUrn="urn:li:container:asdf", aspect=aspect
21+
).as_workunit()
22+
assert wu.get_aspects_of_type(StatusClass) == [aspect]
23+
assert wu.get_aspects_of_type(ContainerClass) == []
24+
25+
26+
def test_get_aspects_of_type_mce():
27+
status_aspect = StatusClass(False)
28+
status_aspect_2 = StatusClass(True)
29+
lineage_aspect = UpstreamLineageClass(upstreams=[])
30+
mce = MetadataChangeEventClass(
31+
proposedSnapshot=DatasetSnapshotClass(
32+
urn="urn:li:dataset:asdf",
33+
aspects=[status_aspect, lineage_aspect, status_aspect_2],
34+
)
35+
)
36+
wu = MetadataWorkUnit(id="id", mce=mce)
37+
assert wu.get_aspects_of_type(StatusClass) == [status_aspect, status_aspect_2]
38+
assert wu.get_aspects_of_type(UpstreamLineageClass) == [lineage_aspect]
39+
assert wu.get_aspects_of_type(ContainerClass) == []
40+
41+
42+
def test_get_aspects_of_type_mcpc():
43+
aspect = StatusClass(False)
44+
mcpc = MetadataChangeProposalClass(
45+
entityUrn="urn:li:container:asdf",
46+
entityType="container",
47+
changeType=ChangeTypeClass.UPSERT,
48+
aspectName=StatusClass.ASPECT_NAME,
49+
aspect=GenericAspectClass(
50+
value=json.dumps(aspect.to_obj()).encode(),
51+
contentType="application/json",
52+
),
53+
)
54+
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
55+
assert wu.get_aspects_of_type(StatusClass) == [aspect]
56+
assert wu.get_aspects_of_type(ContainerClass) == []
57+
58+
# Failure scenarios
59+
mcpc = MetadataChangeProposalClass(
60+
entityUrn="urn:li:container:asdf",
61+
entityType="container",
62+
changeType=ChangeTypeClass.UPSERT,
63+
aspectName="not status",
64+
aspect=GenericAspectClass(
65+
value=json.dumps(aspect.to_obj()).encode(),
66+
contentType="application/json",
67+
),
68+
)
69+
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
70+
assert wu.get_aspects_of_type(StatusClass) == []
71+
72+
mcpc = MetadataChangeProposalClass(
73+
entityUrn="urn:li:container:asdf",
74+
entityType="container",
75+
changeType=ChangeTypeClass.PATCH,
76+
aspectName=StatusClass.ASPECT_NAME,
77+
aspect=GenericAspectClass(
78+
value=json.dumps({"not_status": True}).encode(),
79+
contentType="application/json-patch+json",
80+
),
81+
)
82+
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
83+
assert wu.get_aspects_of_type(StatusClass) == []
84+
85+
mcpc = MetadataChangeProposalClass(
86+
entityUrn="urn:li:container:asdf",
87+
entityType="container",
88+
changeType=ChangeTypeClass.UPSERT,
89+
aspectName=StatusClass.ASPECT_NAME,
90+
aspect=GenericAspectClass(
91+
value=(json.dumps(aspect.to_obj()) + "aaa").encode(),
92+
contentType="application/json",
93+
),
94+
)
95+
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
96+
assert wu.get_aspects_of_type(StatusClass) == []
97+
98+
mcpc = MetadataChangeProposalClass(
99+
entityUrn="urn:li:container:asdf",
100+
entityType="container",
101+
changeType=ChangeTypeClass.UPSERT,
102+
aspectName=StatusClass.ASPECT_NAME,
103+
aspect=GenericAspectClass(
104+
value='{"ß": 2}'.encode("latin_1"),
105+
contentType="application/json",
106+
),
107+
)
108+
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
109+
assert wu.get_aspects_of_type(StatusClass) == []

0 commit comments

Comments
 (0)