@@ -240,7 +240,7 @@ def auto_browse_path_v2(
240240 platform : Optional [str ] = None ,
241241 platform_instance : Optional [str ] = None ,
242242) -> Iterable [MetadataWorkUnit ]:
243- """Generate BrowsePathsV2 from Container and BrowsePaths aspects.
243+ """Generate BrowsePathsV2 from Container and BrowsePaths and BrowsePathsV2 aspects.
244244
245245 Generates browse paths v2 on demand, rather than waiting for end of ingestion,
246246 for better UI experience while ingestion is running.
@@ -251,37 +251,60 @@ def auto_browse_path_v2(
251251
252252 Calculates the correct BrowsePathsV2 at end of workunit stream,
253253 and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed.
254+
255+ Source-generated original BrowsePathsV2 are assumed to be correct and are preferred
256+ over other aspects when generating BrowsePathsV2 of an entity or its children.
257+ This helper also prepends platform instance BrowsePathEntry to BrowsePathsV2 so the
258+ source need not include it in its browse paths v2.
254259 """
255260
256261 # For telemetry, to see if our sources violate assumptions
257262 num_out_of_order = 0
258263 num_out_of_batch = 0
259264
260265 # Set for all containers and urns with a Container aspect
261- # Used to construct container paths while iterating through stream
262- # Assumes topological order of entities in stream
266+ # Used to construct browse path v2 while iterating through stream
267+ # Assumes topological order of entities in stream, i.e. parent's
268+ # browse path/container is seen before child's browse path/container.
263269 paths : Dict [str , List [BrowsePathEntryClass ]] = {}
264270
265271 emitted_urns : Set [str ] = set ()
266272 containers_used_as_parent : Set [str ] = set ()
267273 for urn , batch in _batch_workunits_by_urn (stream ):
268274 container_path : Optional [List [BrowsePathEntryClass ]] = None
269275 legacy_path : Optional [List [BrowsePathEntryClass ]] = None
270- has_browse_path_v2 = False
276+ browse_path_v2 : Optional [ List [ BrowsePathEntryClass ]] = None
271277
272278 for wu in batch :
273- yield wu
274279 if not wu .is_primary_source :
280+ yield wu
275281 continue
276282
283+ browse_path_v2_aspect = wu .get_aspect_of_type (BrowsePathsV2Class )
284+ if browse_path_v2_aspect is None :
285+ yield wu
286+ else :
287+ # This is browse path v2 aspect. We will process
288+ # and emit it later with platform instance, as required.
289+ browse_path_v2 = browse_path_v2_aspect .path
290+ if guess_entity_type (urn ) == "container" :
291+ paths [urn ] = browse_path_v2
292+
277293 container_aspect = wu .get_aspect_of_type (ContainerClass )
278294 if container_aspect :
279295 parent_urn = container_aspect .container
280296 containers_used_as_parent .add (parent_urn )
281- paths [urn ] = [
282- * paths .setdefault (parent_urn , []), # Guess parent has no parents
283- BrowsePathEntryClass (id = parent_urn , urn = parent_urn ),
284- ]
297+ # If a container has both parent container and browsePathsV2
298+ # emitted from source, prefer browsePathsV2, so using setdefault.
299+ paths .setdefault (
300+ urn ,
301+ [
302+ * paths .setdefault (
303+ parent_urn , []
304+ ), # Guess parent has no parents
305+ BrowsePathEntryClass (id = parent_urn , urn = parent_urn ),
306+ ],
307+ )
285308 container_path = paths [urn ]
286309
287310 if urn in containers_used_as_parent :
@@ -297,16 +320,28 @@ def auto_browse_path_v2(
297320 if p .strip () and p .strip () not in drop_dirs
298321 ]
299322
300- if wu .get_aspect_of_type (BrowsePathsV2Class ):
301- has_browse_path_v2 = True
302-
303- path = container_path or legacy_path
304- if (path is not None or has_browse_path_v2 ) and urn in emitted_urns :
323+ # Order of preference: browse path v2, container path, legacy browse path
324+ path = browse_path_v2 or container_path or legacy_path
325+ if path is not None and urn in emitted_urns :
305326 # Batch invariant violated
306327 # TODO: Add sentry alert
307328 num_out_of_batch += 1
308- elif has_browse_path_v2 :
329+ elif browse_path_v2 is not None :
309330 emitted_urns .add (urn )
331+ if not dry_run :
332+ yield MetadataChangeProposalWrapper (
333+ entityUrn = urn ,
334+ aspect = BrowsePathsV2Class (
335+ path = _prepend_platform_instance (
336+ browse_path_v2 , platform , platform_instance
337+ )
338+ ),
339+ ).as_workunit ()
340+ else :
341+ yield MetadataChangeProposalWrapper (
342+ entityUrn = urn ,
343+ aspect = BrowsePathsV2Class (path = browse_path_v2 ),
344+ ).as_workunit ()
310345 elif path is not None :
311346 emitted_urns .add (urn )
312347 if not dry_run :
0 commit comments