Skip to content

Commit 8b79461

Browse files
authored
feat(ingest/looker): browse path followups (#10217)
1 parent 223b72f commit 8b79461

32 files changed

+7331
-2038
lines changed

metadata-ingestion/src/datahub/ingestion/api/source.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,12 @@ def close(self) -> None:
277277

278278
def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
279279
config = self.get_config()
280-
platform = getattr(self, "platform", None) or getattr(config, "platform", None)
280+
281+
platform = (
282+
getattr(config, "platform_name", None)
283+
or getattr(self, "platform", None)
284+
or getattr(config, "platform", None)
285+
)
281286
env = getattr(config, "env", None)
282287
browse_path_drop_dirs = [
283288
platform,

metadata-ingestion/src/datahub/ingestion/api/source_helpers.py

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ def auto_browse_path_v2(
240240
platform: Optional[str] = None,
241241
platform_instance: Optional[str] = None,
242242
) -> Iterable[MetadataWorkUnit]:
243-
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects.
243+
"""Generate BrowsePathsV2 from Container and BrowsePaths and BrowsePathsV2 aspects.
244244
245245
Generates browse paths v2 on demand, rather than waiting for end of ingestion,
246246
for better UI experience while ingestion is running.
@@ -251,37 +251,60 @@ def auto_browse_path_v2(
251251
252252
Calculates the correct BrowsePathsV2 at end of workunit stream,
253253
and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed.
254+
255+
Source-generated original BrowsePathsV2 are assumed to be correct and are preferred
256+
over other aspects when generating BrowsePathsV2 of an entity or its children.
257+
This helper also prepends platform instance BrowsePathEntry to BrowsePathsV2 so the
258+
source need not include it in its browse paths v2.
254259
"""
255260

256261
# For telemetry, to see if our sources violate assumptions
257262
num_out_of_order = 0
258263
num_out_of_batch = 0
259264

260265
# Set for all containers and urns with a Container aspect
261-
# Used to construct container paths while iterating through stream
262-
# Assumes topological order of entities in stream
266+
# Used to construct browse path v2 while iterating through stream
267+
# Assumes topological order of entities in stream, i.e. parent's
268+
# browse path/container is seen before child's browse path/container.
263269
paths: Dict[str, List[BrowsePathEntryClass]] = {}
264270

265271
emitted_urns: Set[str] = set()
266272
containers_used_as_parent: Set[str] = set()
267273
for urn, batch in _batch_workunits_by_urn(stream):
268274
container_path: Optional[List[BrowsePathEntryClass]] = None
269275
legacy_path: Optional[List[BrowsePathEntryClass]] = None
270-
has_browse_path_v2 = False
276+
browse_path_v2: Optional[List[BrowsePathEntryClass]] = None
271277

272278
for wu in batch:
273-
yield wu
274279
if not wu.is_primary_source:
280+
yield wu
275281
continue
276282

283+
browse_path_v2_aspect = wu.get_aspect_of_type(BrowsePathsV2Class)
284+
if browse_path_v2_aspect is None:
285+
yield wu
286+
else:
287+
# This is browse path v2 aspect. We will process
288+
# and emit it later with platform instance, as required.
289+
browse_path_v2 = browse_path_v2_aspect.path
290+
if guess_entity_type(urn) == "container":
291+
paths[urn] = browse_path_v2
292+
277293
container_aspect = wu.get_aspect_of_type(ContainerClass)
278294
if container_aspect:
279295
parent_urn = container_aspect.container
280296
containers_used_as_parent.add(parent_urn)
281-
paths[urn] = [
282-
*paths.setdefault(parent_urn, []), # Guess parent has no parents
283-
BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
284-
]
297+
# If a container has both parent container and browsePathsV2
298+
# emitted from source, prefer browsePathsV2, so using setdefault.
299+
paths.setdefault(
300+
urn,
301+
[
302+
*paths.setdefault(
303+
parent_urn, []
304+
), # Guess parent has no parents
305+
BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
306+
],
307+
)
285308
container_path = paths[urn]
286309

287310
if urn in containers_used_as_parent:
@@ -297,16 +320,28 @@ def auto_browse_path_v2(
297320
if p.strip() and p.strip() not in drop_dirs
298321
]
299322

300-
if wu.get_aspect_of_type(BrowsePathsV2Class):
301-
has_browse_path_v2 = True
302-
303-
path = container_path or legacy_path
304-
if (path is not None or has_browse_path_v2) and urn in emitted_urns:
323+
# Order of preference: browse path v2, container path, legacy browse path
324+
path = browse_path_v2 or container_path or legacy_path
325+
if path is not None and urn in emitted_urns:
305326
# Batch invariant violated
306327
# TODO: Add sentry alert
307328
num_out_of_batch += 1
308-
elif has_browse_path_v2:
329+
elif browse_path_v2 is not None:
309330
emitted_urns.add(urn)
331+
if not dry_run:
332+
yield MetadataChangeProposalWrapper(
333+
entityUrn=urn,
334+
aspect=BrowsePathsV2Class(
335+
path=_prepend_platform_instance(
336+
browse_path_v2, platform, platform_instance
337+
)
338+
),
339+
).as_workunit()
340+
else:
341+
yield MetadataChangeProposalWrapper(
342+
entityUrn=urn,
343+
aspect=BrowsePathsV2Class(path=browse_path_v2),
344+
).as_workunit()
310345
elif path is not None:
311346
emitted_urns.add(urn)
312347
if not dry_run:

metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class DatasetContainerSubTypes(str, Enum):
3838

3939
class BIContainerSubTypes(str, Enum):
4040
LOOKER_FOLDER = "Folder"
41+
LOOKML_PROJECT = "LookML Project"
42+
LOOKML_MODEL = "LookML Model"
4143
TABLEAU_WORKBOOK = "Workbook"
4244
POWERBI_WORKSPACE = "Workspace"
4345
POWERBI_DATASET = "PowerBI Dataset"

0 commit comments

Comments
 (0)