Skip to content

Commit 8c75384

Browse files
anshbansalRyanHolstien
authored andcommitted
feat(ingest): add high level stage for ingestion (#14862)
1 parent 7f8acfc commit 8c75384

34 files changed

+2628
-2238
lines changed

metadata-ingestion/src/datahub/ingestion/api/source.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
from datahub.configuration.common import ConfigModel
2727
from datahub.configuration.source_common import PlatformInstanceConfigMixin
28-
from datahub.emitter.mcp import MetadataChangeProposalWrapper
2928
from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import (
3029
auto_patch_last_modified,
3130
)
@@ -46,8 +45,15 @@
4645
auto_workunit,
4746
auto_workunit_reporter,
4847
)
48+
from datahub.ingestion.api.source_protocols import (
49+
MetadataWorkUnitIterable,
50+
ProfilingCapable,
51+
)
4952
from datahub.ingestion.api.workunit import MetadataWorkUnit
50-
from datahub.sdk.entity import Entity
53+
from datahub.ingestion.source_report.ingestion_stage import (
54+
IngestionHighStage,
55+
IngestionStageReport,
56+
)
5157
from datahub.telemetry import stats
5258
from datahub.utilities.lossy_collections import LossyDict, LossyList
5359
from datahub.utilities.type_annotations import get_class_from_annotation
@@ -205,7 +211,7 @@ def infos(self) -> LossyList[StructuredLogEntry]:
205211

206212

207213
@dataclass
208-
class SourceReport(ExamplesReport):
214+
class SourceReport(ExamplesReport, IngestionStageReport):
209215
event_not_produced_warn: bool = True
210216
events_produced: int = 0
211217
events_produced_per_sec: int = 0
@@ -553,13 +559,31 @@ def _apply_workunit_processors(
553559
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
554560
workunit_processors = self.get_workunit_processors()
555561
workunit_processors.append(AutoSystemMetadata(self.ctx).stamp)
556-
return self._apply_workunit_processors(
562+
# Process main workunits
563+
yield from self._apply_workunit_processors(
557564
workunit_processors, auto_workunit(self.get_workunits_internal())
558565
)
566+
# Process profiling workunits
567+
yield from self._process_profiling_stage(workunit_processors)
568+
569+
def _process_profiling_stage(
570+
self, processors: List[Optional[MetadataWorkUnitProcessor]]
571+
) -> Iterable[MetadataWorkUnit]:
572+
"""Process profiling stage if source supports it."""
573+
if (
574+
not isinstance(self, ProfilingCapable)
575+
or not self.is_profiling_enabled_internal()
576+
):
577+
return
578+
with self.get_report().new_high_stage(IngestionHighStage.PROFILING):
579+
profiling_stream = self._apply_workunit_processors(
580+
processors, auto_workunit(self.get_profiling_internal())
581+
)
582+
yield from profiling_stream
559583

560584
def get_workunits_internal(
561585
self,
562-
) -> Iterable[Union[MetadataWorkUnit, MetadataChangeProposalWrapper, Entity]]:
586+
) -> MetadataWorkUnitIterable:
563587
raise NotImplementedError(
564588
"get_workunits_internal must be implemented if get_workunits is not overriden."
565589
)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import Iterable, Protocol, Union, runtime_checkable
2+
3+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
4+
from datahub.ingestion.api.workunit import MetadataWorkUnit
5+
from datahub.sdk.entity import Entity
6+
7+
# Type alias for metadata work units - Python 3.9 compatible
8+
MetadataWorkUnitIterable = Iterable[
9+
Union[MetadataWorkUnit, MetadataChangeProposalWrapper, Entity]
10+
]
11+
12+
13+
@runtime_checkable
14+
class ProfilingCapable(Protocol):
15+
"""Protocol for sources that support profiling functionality."""
16+
17+
def is_profiling_enabled_internal(self) -> bool:
18+
"""Check if profiling is enabled for this source."""
19+
...
20+
21+
def get_profiling_internal(self) -> MetadataWorkUnitIterable:
22+
"""Generate profiling work units."""
23+
...

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from datahub.ingestion.api.report import Report
1010
from datahub.ingestion.glossary.classification_mixin import ClassificationReportMixin
1111
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
12-
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
1312
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
1413
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
1514
from datahub.utilities.lossy_collections import LossyDict, LossyList, LossySet
@@ -78,7 +77,6 @@ class BigQueryQueriesExtractorReport(Report):
7877
@dataclass
7978
class BigQueryV2Report(
8079
SQLSourceReport,
81-
IngestionStageReport,
8280
BaseTimeWindowReport,
8381
ClassificationReportMixin,
8482
):

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
)
6767
from datahub.ingestion.source_report.ingestion_stage import (
6868
METADATA_EXTRACTION,
69-
PROFILING,
69+
IngestionHighStage,
7070
)
7171
from datahub.metadata.com.linkedin.pegasus2avro.common import (
7272
Status,
@@ -416,7 +416,7 @@ def _process_project(
416416

417417
if self.config.is_profiling_enabled():
418418
logger.info(f"Starting profiling project {project_id}")
419-
with self.report.new_stage(f"{project_id}: {PROFILING}"):
419+
with self.report.new_high_stage(IngestionHighStage.PROFILING):
420420
yield from self.profiler.get_workunits(
421421
project_id=project_id,
422422
tables=db_tables,

metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
2020
from datahub.ingestion.source.cassandra.cassandra_utils import CassandraSourceReport
21-
from datahub.ingestion.source_report.ingestion_stage import PROFILING
21+
from datahub.ingestion.source_report.ingestion_stage import IngestionHighStage
2222
from datahub.metadata.schema_classes import (
2323
DatasetFieldProfileClass,
2424
DatasetProfileClass,
@@ -71,7 +71,7 @@ def get_workunits(
7171
for keyspace_name in cassandra_data.keyspaces:
7272
tables = cassandra_data.tables.get(keyspace_name, [])
7373
with (
74-
self.report.new_stage(f"{keyspace_name}: {PROFILING}"),
74+
self.report.new_high_stage(IngestionHighStage.PROFILING),
7575
ThreadPoolExecutor(
7676
max_workers=self.config.profiling.max_workers
7777
) as executor,

metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from datahub.ingestion.source.state.stale_entity_removal_handler import (
77
StaleEntityRemovalSourceReport,
88
)
9-
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
109
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
1110
SchemaField,
1211
SchemaFieldDataType,
@@ -35,7 +34,7 @@
3534

3635

3736
@dataclass
38-
class CassandraSourceReport(StaleEntityRemovalSourceReport, IngestionStageReport):
37+
class CassandraSourceReport(StaleEntityRemovalSourceReport):
3938
num_tables_failed: int = 0
4039
num_views_failed: int = 0
4140
tables_scanned: int = 0

metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from datahub.ingestion.source.state.stale_entity_removal_handler import (
77
StaleEntityRemovalSourceReport,
88
)
9-
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
109
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
1110
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
1211
from datahub.utilities.stats_collections import (
@@ -20,7 +19,6 @@
2019
class DremioSourceReport(
2120
SQLSourceReport,
2221
StaleEntityRemovalSourceReport,
23-
IngestionStageReport,
2422
BaseTimeWindowReport,
2523
):
2624
num_containers_failed: int = 0

metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from datahub.ingestion.source_report.ingestion_stage import (
5656
LINEAGE_EXTRACTION,
5757
METADATA_EXTRACTION,
58-
PROFILING,
58+
IngestionHighStage,
5959
)
6060
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
6161
DatasetLineageTypeClass,
@@ -283,7 +283,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
283283
# Profiling
284284
if self.config.is_profiling_enabled():
285285
with (
286-
self.report.new_stage(PROFILING),
286+
self.report.new_high_stage(IngestionHighStage.PROFILING),
287287
ThreadPoolExecutor(
288288
max_workers=self.config.profiling.max_workers
289289
) as executor,

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
SoftDeletedEntitiesCleanupConfig,
3535
SoftDeletedEntitiesReport,
3636
)
37-
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
3837

3938
logger = logging.getLogger(__name__)
4039

@@ -87,7 +86,6 @@ class DataHubGcSourceReport(
8786
DataProcessCleanupReport,
8887
SoftDeletedEntitiesReport,
8988
DatahubExecutionRequestCleanupReport,
90-
IngestionStageReport,
9189
):
9290
expired_tokens_revoked: int = 0
9391

metadata-ingestion/src/datahub/ingestion/source/grafana/report.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
from datahub.ingestion.source.state.stale_entity_removal_handler import (
44
StaleEntityRemovalSourceReport,
55
)
6-
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
76

87

98
@dataclass
10-
class GrafanaSourceReport(StaleEntityRemovalSourceReport, IngestionStageReport):
9+
class GrafanaSourceReport(StaleEntityRemovalSourceReport):
1110
# Entity counters
1211
dashboards_scanned: int = 0
1312
charts_scanned: int = 0

0 commit comments

Comments
 (0)