|
44 | 44 | TestConnectionReport, |
45 | 45 | ) |
46 | 46 | from datahub.ingestion.api.workunit import MetadataWorkUnit |
| 47 | +from datahub.ingestion.source.common.subtypes import DatasetSubTypes |
47 | 48 | from datahub.ingestion.source.omni.omni_api import OmniClient |
48 | 49 | from datahub.ingestion.source.omni.omni_config import OmniSourceConfig |
49 | 50 | from datahub.ingestion.source.omni.omni_lineage_parser import ( |
@@ -203,10 +204,12 @@ def _emit_platform_metadata(self) -> Iterator[MetadataWorkUnit]: |
203 | 204 | displayName="Omni", |
204 | 205 | logoUrl=self.DEFAULT_LOGO_URL, |
205 | 206 | ) |
206 | | - yield MetadataChangeProposalWrapper( |
207 | | - entityUrn=make_data_platform_urn(self.PLATFORM), |
208 | | - aspect=platform_info, |
209 | | - ).as_workunit() |
| 207 | + yield self._as_wu( |
| 208 | + MetadataChangeProposalWrapper( |
| 209 | + entityUrn=make_data_platform_urn(self.PLATFORM), |
| 210 | + aspect=platform_info, |
| 211 | + ) |
| 212 | + ) |
210 | 213 |
|
211 | 214 | def _emit_upstream_lineage( |
212 | 215 | self, |
@@ -373,16 +376,10 @@ def _emit_dashboard( |
373 | 376 | dt = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) |
374 | 377 | dashboard.set_last_modified(dt) |
375 | 378 | except Exception as exc: |
376 | | - logger.warning( |
377 | | - "Skipping dashboard last_modified: failed to parse %r: %s", |
378 | | - updated_at, |
379 | | - exc, |
380 | | - ) |
| 379 | + logger.warning("Skipping dashboard last_modified: failed to parse %r: %s", updated_at, exc) |
381 | 380 | if owner_id or owner_name: |
382 | 381 | owner_urn = make_user_urn(owner_id or owner_name) |
383 | | - dashboard.set_owners( |
384 | | - [(CorpUserUrn(owner_urn), OwnershipTypeClass.DATAOWNER)] |
385 | | - ) |
| 382 | + dashboard.set_owners([(CorpUserUrn(owner_urn), OwnershipTypeClass.DATAOWNER)]) |
386 | 383 | yield from dashboard.as_workunits() |
387 | 384 |
|
388 | 385 | def _emit_chart( |
@@ -414,11 +411,7 @@ def _emit_chart( |
414 | 411 | dt = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) |
415 | 412 | chart.set_last_modified(dt) |
416 | 413 | except Exception as exc: |
417 | | - logger.warning( |
418 | | - "Skipping chart last_modified: failed to parse %r: %s", |
419 | | - updated_at, |
420 | | - exc, |
421 | | - ) |
| 414 | + logger.warning("Skipping chart last_modified: failed to parse %r: %s", updated_at, exc) |
422 | 415 | if owner_id or owner_name: |
423 | 416 | owner_urn = make_user_urn(owner_id or owner_name) |
424 | 417 | chart.set_owners([(CorpUserUrn(owner_urn), OwnershipTypeClass.DATAOWNER)]) |
@@ -640,7 +633,7 @@ def _ingest_topic_payload( |
640 | 633 | display_name=f"{readable_model}.topic.{topic_name}", |
641 | 634 | description="Omni topic entity.", |
642 | 635 | custom_properties=topic_props, |
643 | | - subtype="Topic", |
| 636 | + subtype=DatasetSubTypes.TOPIC, |
644 | 637 | ) |
645 | 638 | self.report.semantic_datasets_emitted += 1 |
646 | 639 |
|
@@ -1140,7 +1133,7 @@ def _collect_tile_data( |
1140 | 1133 | "topicName": topic_name, |
1141 | 1134 | "inferred": "true", |
1142 | 1135 | }, |
1143 | | - subtype="Topic", |
| 1136 | + subtype=DatasetSubTypes.TOPIC, |
1144 | 1137 | ) |
1145 | 1138 | self.report.semantic_datasets_emitted += 1 |
1146 | 1139 | chart_inputs[qp_id].add(topic_urn) |
|
0 commit comments