Skip to content

Commit fdbc4de

Browse files
authored
refactor(ingest): Call source_helpers via new WorkUnitProcessors in base Source (#8101)
1 parent 0e0d893 commit fdbc4de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1898
-654
lines changed

metadata-ingestion/src/datahub/ingestion/api/source.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,21 @@
33
from collections import defaultdict
44
from dataclasses import dataclass, field
55
from enum import Enum
6-
from typing import Dict, Generic, Iterable, Optional, Set, Type, TypeVar, Union, cast
6+
from functools import partial
7+
from typing import (
8+
Callable,
9+
Dict,
10+
Generic,
11+
Iterable,
12+
List,
13+
Optional,
14+
Sequence,
15+
Set,
16+
Type,
17+
TypeVar,
18+
Union,
19+
cast,
20+
)
721

822
from pydantic import BaseModel
923

@@ -12,6 +26,11 @@
1226
from datahub.ingestion.api.closeable import Closeable
1327
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
1428
from datahub.ingestion.api.report import Report
29+
from datahub.ingestion.api.source_helpers import (
30+
auto_materialize_referenced_tags,
31+
auto_status_aspect,
32+
auto_workunit_reporter,
33+
)
1534
from datahub.ingestion.api.workunit import MetadataWorkUnit
1635
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
1736
from datahub.utilities.lossy_collections import LossyDict, LossyList
@@ -118,6 +137,9 @@ class TestConnectionReport(Report):
118137
WorkUnitType = TypeVar("WorkUnitType", bound=WorkUnit)
119138
ExtractorConfig = TypeVar("ExtractorConfig", bound=ConfigModel)
120139

140+
WorkUnitProcessor = Callable[[Iterable[WorkUnitType]], Iterable[WorkUnitType]]
141+
MetadataWorkUnitProcessor = WorkUnitProcessor[MetadataWorkUnit]
142+
121143

122144
class Extractor(Generic[WorkUnitType, ExtractorConfig], Closeable, metaclass=ABCMeta):
123145
ctx: PipelineContext
@@ -155,9 +177,35 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
155177
# can't make this method abstract.
156178
raise NotImplementedError('sources must implement "create"')
157179

158-
@abstractmethod
180+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
181+
"""A list of functions that transforms the workunits produced by this source.
182+
Run in order, first in list is applied first. Be careful with order when overriding.
183+
"""
184+
return [
185+
auto_status_aspect,
186+
auto_materialize_referenced_tags,
187+
partial(auto_workunit_reporter, self.get_report()),
188+
]
189+
190+
@staticmethod
191+
def _apply_workunit_processors(
192+
workunit_processors: Sequence[Optional[MetadataWorkUnitProcessor]],
193+
stream: Iterable[MetadataWorkUnit],
194+
) -> Iterable[MetadataWorkUnit]:
195+
for processor in workunit_processors:
196+
if processor is not None:
197+
stream = processor(stream)
198+
return stream
199+
159200
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
160-
pass
201+
return self._apply_workunit_processors(
202+
self.get_workunit_processors(), self.get_workunits_internal()
203+
)
204+
205+
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
206+
raise NotImplementedError(
207+
"get_workunits_internal must be implemented if get_workunits is not overriden."
208+
)
161209

162210
@abstractmethod
163211
def get_report(self) -> SourceReport:

metadata-ingestion/src/datahub/ingestion/api/source_helpers.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
import logging
22
from collections import defaultdict
3-
from typing import Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union
3+
from typing import (
4+
TYPE_CHECKING,
5+
Callable,
6+
Dict,
7+
Iterable,
8+
List,
9+
Optional,
10+
Set,
11+
TypeVar,
12+
Union,
13+
)
414

515
from datahub.emitter.mcp import MetadataChangeProposalWrapper
6-
from datahub.ingestion.api.common import WorkUnit
7-
from datahub.ingestion.api.source import SourceReport
816
from datahub.ingestion.api.workunit import MetadataWorkUnit
9-
from datahub.ingestion.source.state.stale_entity_removal_handler import (
10-
StaleEntityRemovalHandler,
11-
)
1217
from datahub.metadata.schema_classes import (
1318
BrowsePathEntryClass,
1419
BrowsePathsV2Class,
@@ -22,6 +27,12 @@
2227
from datahub.utilities.urns.urn import guess_entity_type
2328
from datahub.utilities.urns.urn_iter import list_urns
2429

30+
if TYPE_CHECKING:
31+
from datahub.ingestion.api.source import SourceReport
32+
from datahub.ingestion.source.state.stale_entity_removal_handler import (
33+
StaleEntityRemovalHandler,
34+
)
35+
2536
logger = logging.getLogger(__name__)
2637

2738

@@ -85,7 +96,7 @@ def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
8596

8697

8798
def auto_stale_entity_removal(
88-
stale_entity_removal_handler: StaleEntityRemovalHandler,
99+
stale_entity_removal_handler: "StaleEntityRemovalHandler",
89100
stream: Iterable[MetadataWorkUnit],
90101
entity_type_fn: Callable[
91102
[MetadataWorkUnit], Optional[str]
@@ -111,10 +122,10 @@ def auto_stale_entity_removal(
111122
yield from stale_entity_removal_handler.gen_removed_entity_workunits()
112123

113124

114-
T = TypeVar("T", bound=WorkUnit)
125+
T = TypeVar("T", bound=MetadataWorkUnit)
115126

116127

117-
def auto_workunit_reporter(report: SourceReport, stream: Iterable[T]) -> Iterable[T]:
128+
def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Iterable[T]:
118129
"""
119130
Calls report.report_workunit() on each workunit.
120131
"""
@@ -126,14 +137,9 @@ def auto_workunit_reporter(report: SourceReport, stream: Iterable[T]) -> Iterabl
126137

127138
def auto_materialize_referenced_tags(
128139
stream: Iterable[MetadataWorkUnit],
129-
active: bool = True,
130140
) -> Iterable[MetadataWorkUnit]:
131141
"""For all references to tags, emit a tag key aspect to ensure that the tag exists in our backend."""
132142

133-
if not active:
134-
yield from stream
135-
return
136-
137143
referenced_tags = set()
138144
tags_with_aspects = set()
139145

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@
4848
platform_name,
4949
support_status,
5050
)
51-
from datahub.ingestion.api.source_helpers import (
52-
auto_stale_entity_removal,
53-
auto_status_aspect,
54-
auto_workunit_reporter,
55-
)
51+
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
5652
from datahub.ingestion.api.workunit import MetadataWorkUnit
5753
from datahub.ingestion.source.aws import s3_util
5854
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
@@ -62,9 +58,6 @@
6258
DatasetSubTypes,
6359
)
6460
from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig
65-
from datahub.ingestion.source.state.sql_common_state import (
66-
BaseSQLAlchemyCheckpointState,
67-
)
6861
from datahub.ingestion.source.state.stale_entity_removal_handler import (
6962
StaleEntityRemovalHandler,
7063
StaleEntityRemovalSourceReport,
@@ -273,15 +266,6 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
273266
self.extract_transforms = config.extract_transforms
274267
self.env = config.env
275268

276-
# Create and register the stateful ingestion use-case handlers.
277-
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
278-
source=self,
279-
config=self.source_config,
280-
state_type_class=BaseSQLAlchemyCheckpointState,
281-
pipeline_name=self.ctx.pipeline_name,
282-
run_id=self.ctx.run_id,
283-
)
284-
285269
def get_glue_arn(
286270
self, account_id: str, database: str, table: Optional[str] = None
287271
) -> str:
@@ -919,13 +903,13 @@ def _get_domain_wu(
919903
domain_urn=domain_urn,
920904
)
921905

922-
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
923-
return auto_stale_entity_removal(
924-
self.stale_entity_removal_handler,
925-
auto_workunit_reporter(
926-
self.report, auto_status_aspect(self.get_workunits_internal())
927-
),
928-
)
906+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
907+
return [
908+
*super().get_workunit_processors(),
909+
StaleEntityRemovalHandler.create(
910+
self, self.source_config, self.ctx
911+
).workunit_processor,
912+
]
929913

930914
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
931915
database_seen = set()

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,11 @@
3030
)
3131
from datahub.ingestion.api.source import (
3232
CapabilityReport,
33+
MetadataWorkUnitProcessor,
3334
SourceCapability,
3435
TestableSource,
3536
TestConnectionReport,
3637
)
37-
from datahub.ingestion.api.source_helpers import (
38-
auto_materialize_referenced_tags,
39-
auto_stale_entity_removal,
40-
auto_status_aspect,
41-
auto_workunit_reporter,
42-
)
4338
from datahub.ingestion.api.workunit import MetadataWorkUnit
4439
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
4540
BigqueryTableIdentifier,
@@ -80,9 +75,6 @@
8075
from datahub.ingestion.source.state.redundant_run_skip_handler import (
8176
RedundantRunSkipHandler,
8277
)
83-
from datahub.ingestion.source.state.sql_common_state import (
84-
BaseSQLAlchemyCheckpointState,
85-
)
8678
from datahub.ingestion.source.state.stale_entity_removal_handler import (
8779
StaleEntityRemovalHandler,
8880
)
@@ -228,15 +220,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
228220
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
229221
self.usage_extractor = BigQueryUsageExtractor(config, self.report)
230222

231-
# Create and register the stateful ingestion use-case handler.
232-
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
233-
source=self,
234-
config=self.config,
235-
state_type_class=BaseSQLAlchemyCheckpointState,
236-
pipeline_name=self.ctx.pipeline_name,
237-
run_id=self.ctx.run_id,
238-
)
239-
240223
self.domain_registry: Optional[DomainRegistry] = None
241224
if self.config.domain:
242225
self.domain_registry = DomainRegistry(
@@ -491,6 +474,14 @@ def gen_dataset_containers(
491474
tags=tags_joined,
492475
)
493476

477+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
478+
return [
479+
*super().get_workunit_processors(),
480+
StaleEntityRemovalHandler.create(
481+
self, self.config, self.ctx
482+
).workunit_processor,
483+
]
484+
494485
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
495486
conn: bigquery.Client = get_bigquery_client(self.config)
496487
self.add_config_to_report()
@@ -514,17 +505,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
514505
self.report.set_ingestion_stage(project.id, "Lineage Extraction")
515506
yield from self.generate_lineage(project.id)
516507

517-
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
518-
return auto_materialize_referenced_tags(
519-
auto_stale_entity_removal(
520-
self.stale_entity_removal_handler,
521-
auto_workunit_reporter(
522-
self.report,
523-
auto_status_aspect(self.get_workunits_internal()),
524-
),
525-
)
526-
)
527-
528508
def _should_ingest_usage(self) -> bool:
529509
if not self.config.include_usage_statistics:
530510
return False

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@
3131
platform_name,
3232
support_status,
3333
)
34-
from datahub.ingestion.api.source_helpers import (
35-
auto_materialize_referenced_tags,
36-
auto_stale_entity_removal,
37-
auto_status_aspect,
38-
auto_workunit_reporter,
39-
)
34+
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
4035
from datahub.ingestion.api.workunit import MetadataWorkUnit
4136
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
4237
from datahub.ingestion.source.sql.sql_types import (
@@ -50,7 +45,6 @@
5045
resolve_trino_modified_type,
5146
resolve_vertica_modified_type,
5247
)
53-
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
5448
from datahub.ingestion.source.state.stale_entity_removal_handler import (
5549
StaleEntityRemovalHandler,
5650
StaleEntityRemovalSourceReport,
@@ -688,12 +682,8 @@ def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str)
688682
self.config.owner_extraction_pattern
689683
)
690684
# Create and register the stateful ingestion use-case handler.
691-
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
692-
source=self,
693-
config=self.config,
694-
state_type_class=GenericCheckpointState,
695-
pipeline_name=self.ctx.pipeline_name,
696-
run_id=self.ctx.run_id,
685+
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
686+
self, self.config, ctx
697687
)
698688

699689
def create_test_entity_mcps(
@@ -878,15 +868,11 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
878868
# return dbt nodes + global custom properties
879869
raise NotImplementedError()
880870

881-
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
882-
return auto_materialize_referenced_tags(
883-
auto_stale_entity_removal(
884-
self.stale_entity_removal_handler,
885-
auto_workunit_reporter(
886-
self.report, auto_status_aspect(self.get_workunits_internal())
887-
),
888-
)
889-
)
871+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
872+
return [
873+
*super().get_workunit_processors(),
874+
self.stale_entity_removal_handler.workunit_processor,
875+
]
890876

891877
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
892878
if self.config.write_semantics == "PATCH" and not self.ctx.graph:

metadata-ingestion/src/datahub/ingestion/source/file.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from collections import defaultdict
77
from dataclasses import dataclass, field
88
from enum import auto
9+
from functools import partial
910
from io import BufferedReader
10-
from typing import Any, Dict, Iterable, Iterator, Optional, Tuple, Union
11+
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
1112
from urllib import parse
1213

1314
import ijson
@@ -28,6 +29,7 @@
2829
)
2930
from datahub.ingestion.api.source import (
3031
CapabilityReport,
32+
MetadataWorkUnitProcessor,
3133
SourceReport,
3234
TestableSource,
3335
TestConnectionReport,
@@ -205,8 +207,9 @@ def get_filenames(self) -> Iterable[str]:
205207
self.report.total_num_files = 1
206208
return [str(self.config.path)]
207209

208-
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
209-
return auto_workunit_reporter(self.report, self.get_workunits_internal())
210+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
211+
# No super() call, as we don't want helpers that create / remove workunits
212+
return [partial(auto_workunit_reporter, self.report)]
210213

211214
def get_workunits_internal(
212215
self,

0 commit comments

Comments
 (0)