Skip to content

Commit 0e0d893

Browse files
amanda-herKhurzak
andauthored
feat(ingest): Add GenericAspectTransformer (#7994)
Co-authored-by: Adrián Pertíñez <[email protected]>
1 parent c075c5e commit 0e0d893

File tree

3 files changed

+417
-4
lines changed

3 files changed

+417
-4
lines changed

metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ def __init__(self):
6464
def _should_process(
6565
self,
6666
record: Union[
67-
MetadataChangeEventClass, MetadataChangeProposalWrapper, ControlRecord
67+
MetadataChangeEventClass,
68+
MetadataChangeProposalWrapper,
69+
MetadataChangeProposalClass,
70+
ControlRecord,
6871
],
6972
) -> bool:
7073
if isinstance(record, ControlRecord):
@@ -92,7 +95,9 @@ def _record_mce(self, mce: MetadataChangeEventClass) -> None:
9295
record_entry["seen"]["mce"] = mce.systemMetadata
9396
self.entity_map[mce.proposedSnapshot.urn] = record_entry
9497

95-
def _record_mcp(self, mcp: MetadataChangeProposalWrapper) -> None:
98+
def _record_mcp(
99+
self, mcp: Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
100+
) -> None:
96101
assert mcp.entityUrn
97102
record_entry = self.entity_map.get(mcp.entityUrn, {"seen": {}})
98103
if "seen" in record_entry and "mcp" not in record_entry["seen"]:
@@ -150,7 +155,7 @@ def _transform_or_record_mce(
150155

151156
return envelope
152157

153-
def _transform_or_record_mcp(
158+
def _transform_or_record_mcpw(
154159
self,
155160
envelope: RecordEnvelope[MetadataChangeProposalWrapper],
156161
) -> Optional[RecordEnvelope[MetadataChangeProposalWrapper]]:
@@ -187,7 +192,7 @@ def transform(
187192
elif isinstance(
188193
envelope.record, MetadataChangeProposalWrapper
189194
) and isinstance(self, SingleAspectTransformer):
190-
return_envelope = self._transform_or_record_mcp(envelope)
195+
return_envelope = self._transform_or_record_mcpw(envelope)
191196
if return_envelope is None:
192197
continue
193198
else:
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import logging
2+
from abc import ABCMeta, abstractmethod
3+
from typing import Iterable, Optional
4+
5+
from datahub.emitter.mce_builder import Aspect
6+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
7+
from datahub.ingestion.api.common import EndOfStream, RecordEnvelope
8+
from datahub.ingestion.transformer.base_transformer import (
9+
BaseTransformer,
10+
SingleAspectTransformer,
11+
)
12+
from datahub.metadata.schema_classes import (
13+
GenericAspectClass,
14+
MetadataChangeEventClass,
15+
MetadataChangeProposalClass,
16+
)
17+
from datahub.utilities.urns.urn import Urn
18+
19+
log = logging.getLogger(__name__)
20+
21+
22+
class GenericAspectTransformer(
23+
BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta
24+
):
25+
"""Transformer that does transform custom aspects using GenericAspectClass."""
26+
27+
def __init__(self):
28+
super().__init__()
29+
30+
def transform_aspect(
31+
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
32+
) -> Optional[Aspect]:
33+
"""Do not implement."""
34+
pass
35+
36+
@abstractmethod
37+
def transform_generic_aspect(
38+
self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass]
39+
) -> Optional[GenericAspectClass]:
40+
"""Implement this method to transform the single custom aspect for an entity.
41+
The purpose of this abstract method is to reinforce the use of GenericAspectClass."""
42+
pass
43+
44+
def _transform_or_record_mcpc(
45+
self,
46+
envelope: RecordEnvelope[MetadataChangeProposalClass],
47+
) -> Optional[RecordEnvelope[MetadataChangeProposalClass]]:
48+
assert envelope.record.entityUrn
49+
assert isinstance(self, SingleAspectTransformer)
50+
if envelope.record.aspectName == self.aspect_name() and envelope.record.aspect:
51+
transformed_aspect = self.transform_generic_aspect(
52+
entity_urn=envelope.record.entityUrn,
53+
aspect_name=envelope.record.aspectName,
54+
aspect=envelope.record.aspect,
55+
)
56+
self._mark_processed(envelope.record.entityUrn)
57+
if transformed_aspect is None:
58+
log.debug(
59+
f"Dropping record {envelope} as transformation result is None"
60+
)
61+
envelope.record.aspect = transformed_aspect
62+
else:
63+
self._record_mcp(envelope.record)
64+
return envelope if envelope.record.aspect is not None else None
65+
66+
def transform(
67+
self, record_envelopes: Iterable[RecordEnvelope]
68+
) -> Iterable[RecordEnvelope]:
69+
"""
70+
This method overrides the original one from BaseTransformer in order to support
71+
custom aspects. They need to be upserted with MetadataChangeProposalClass instead of
72+
MetadataChangeProposalWrapper used at the original method.
73+
"""
74+
for envelope in record_envelopes:
75+
if not self._should_process(envelope.record):
76+
pass
77+
elif isinstance(envelope.record, MetadataChangeEventClass):
78+
self._record_mce(envelope.record)
79+
elif isinstance(envelope.record, MetadataChangeProposalWrapper):
80+
self._record_mcp(envelope.record)
81+
elif isinstance(envelope.record, MetadataChangeProposalClass):
82+
return_envelope = self._transform_or_record_mcpc(envelope)
83+
if return_envelope is None:
84+
continue
85+
else:
86+
envelope = return_envelope
87+
elif isinstance(envelope.record, EndOfStream) and isinstance(
88+
self, SingleAspectTransformer
89+
):
90+
for urn, state in self.entity_map.items():
91+
if "seen" in state:
92+
last_seen_mcp = state["seen"].get("mcp")
93+
last_seen_mce_system_metadata = state["seen"].get("mce")
94+
95+
transformed_aspect = self.transform_generic_aspect(
96+
entity_urn=urn,
97+
aspect_name=self.aspect_name(),
98+
aspect=None,
99+
)
100+
if transformed_aspect:
101+
# for end of stream records, we modify the workunit-id
102+
structured_urn = Urn.create_from_string(urn)
103+
simple_name = "-".join(structured_urn.get_entity_id())
104+
record_metadata = envelope.metadata.copy()
105+
record_metadata.update(
106+
{
107+
"workunit_id": f"txform-{simple_name}-{self.aspect_name()}"
108+
}
109+
)
110+
yield RecordEnvelope(
111+
record=MetadataChangeProposalClass(
112+
entityType=structured_urn.get_type(),
113+
entityUrn=urn,
114+
changeType="UPSERT",
115+
aspectName=self.aspect_name(),
116+
aspect=transformed_aspect,
117+
systemMetadata=last_seen_mcp.systemMetadata
118+
if last_seen_mcp
119+
else last_seen_mce_system_metadata,
120+
),
121+
metadata=record_metadata,
122+
)
123+
self._mark_processed(urn)
124+
yield envelope

0 commit comments

Comments
 (0)