diff --git a/elementary/monitor/api/groups/groups.py b/elementary/monitor/api/groups/groups.py index 5451b30b5..5f2b0a52c 100644 --- a/elementary/monitor/api/groups/groups.py +++ b/elementary/monitor/api/groups/groups.py @@ -16,6 +16,7 @@ NormalizedExposureSchema, NormalizedModelSchema, NormalizedSeedSchema, + NormalizedSnapshotSchema, NormalizedSourceSchema, ) from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema @@ -31,6 +32,7 @@ NormalizedExposureSchema, NormalizedTestSchema, NormalizedSeedSchema, + NormalizedSnapshotSchema, ] diff --git a/elementary/monitor/api/models/models.py b/elementary/monitor/api/models/models.py index 5d9c54d15..83c71318c 100644 --- a/elementary/monitor/api/models/models.py +++ b/elementary/monitor/api/models/models.py @@ -14,6 +14,7 @@ NormalizedExposureSchema, NormalizedModelSchema, NormalizedSeedSchema, + NormalizedSnapshotSchema, NormalizedSourceSchema, TotalsModelRunsSchema, ) @@ -26,6 +27,7 @@ from elementary.monitor.fetchers.models.schema import ( ModelSchema, SeedSchema, + SnapshotSchema, SourceSchema, ) from elementary.utils.log import get_logger @@ -39,6 +41,7 @@ class ModelsAPI(APIClient): SourceSchema: "sources", ModelSchema: "models", ExposureSchema: "exposures", + SnapshotSchema: "snapshots", } def __init__(self, dbt_runner: BaseDbtRunner): @@ -162,6 +165,16 @@ def get_sources(self) -> Dict[str, NormalizedSourceSchema]: sources[source_unique_id] = normalized_source return sources + def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]: + snapshots_results = self.models_fetcher.get_snapshots() + snapshots = dict() + if snapshots_results: + for snapshot_result in snapshots_results: + normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result) + snapshot_unique_id = cast(str, normalized_snapshot.unique_id) + snapshots[snapshot_unique_id] = normalized_snapshot + return snapshots + def get_exposures( self, upstream_node_ids: Optional[List[str]] = None, @@ -257,19 +270,30 @@ def _normalize_dbt_artifact_dict( ) -> NormalizedSourceSchema: ... + @overload + def _normalize_dbt_artifact_dict( + self, artifact: SnapshotSchema + ) -> NormalizedSnapshotSchema: + ... + def _normalize_dbt_artifact_dict( - self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema] + self, + artifact: Union[ + SeedSchema, ModelSchema, ExposureSchema, SourceSchema, SnapshotSchema + ], ) -> Union[ NormalizedSeedSchema, NormalizedModelSchema, NormalizedExposureSchema, NormalizedSourceSchema, + NormalizedSnapshotSchema, ]: schema_to_normalized_schema_map = { SeedSchema: NormalizedSeedSchema, ExposureSchema: NormalizedExposureSchema, ModelSchema: NormalizedModelSchema, SourceSchema: NormalizedSourceSchema, + SnapshotSchema: NormalizedSnapshotSchema, } artifact_name = artifact.name normalized_artifact = json.loads(artifact.json()) @@ -308,7 +332,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str @classmethod def _fqn( cls, - artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema], + artifact: Union[ + ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema + ], ) -> str: if isinstance(artifact, ExposureSchema): path = (artifact.meta or {}).get("path") diff --git a/elementary/monitor/api/models/schema.py b/elementary/monitor/api/models/schema.py index f9bbf9f19..211bab499 100644 --- a/elementary/monitor/api/models/schema.py +++ b/elementary/monitor/api/models/schema.py @@ -7,6 +7,7 @@ ExposureSchema, ModelSchema, SeedSchema, + SnapshotSchema, SourceSchema, ) from elementary.utils.pydantic_shim import BaseModel, Field, validator @@ -51,6 +52,11 @@ class NormalizedSourceSchema(NormalizedArtifactSchema, SourceSchema): artifact_type: str = Field("source", const=True) # type: ignore # noqa +# NormalizedArtifactSchema must be first in the inheritance order +class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema): + artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa + + # NormalizedArtifactSchema must be first in the inheritance order class NormalizedExposureSchema(NormalizedArtifactSchema, ExposureSchema): artifact_type: str = Field("exposure", const=True) # type: ignore # noqa diff --git a/elementary/monitor/api/report/report.py b/elementary/monitor/api/report/report.py index 5eae964ff..896473d29 100644 --- a/elementary/monitor/api/report/report.py +++ b/elementary/monitor/api/report/report.py @@ -14,6 +14,7 @@ NormalizedExposureSchema, NormalizedModelSchema, NormalizedSeedSchema, + NormalizedSnapshotSchema, NormalizedSourceSchema, ) from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema @@ -47,11 +48,19 @@ def _get_groups( sources: Iterable[NormalizedSourceSchema], exposures: Iterable[NormalizedExposureSchema], seeds: Iterable[NormalizedSeedSchema], + snapshots: Iterable[NormalizedSnapshotSchema], singular_tests: Iterable[NormalizedTestSchema], ) -> GroupsSchema: groups_api = GroupsAPI(self.dbt_runner) return groups_api.get_groups( - artifacts=[*models, *sources, *exposures, *seeds, *singular_tests] + artifacts=[ + *models, + *sources, + *exposures, + *seeds, + *snapshots, + *singular_tests, + ] ) def get_report_data( @@ -90,6 +99,8 @@ def get_report_data( lineage_node_ids.extend(models.keys()) sources = models_api.get_sources() lineage_node_ids.extend(sources.keys()) + snapshots = models_api.get_snapshots() + lineage_node_ids.extend(snapshots.keys()) exposures = models_api.get_exposures(upstream_node_ids=lineage_node_ids) lineage_node_ids.extend(exposures.keys()) singular_tests = tests_api.get_singular_tests() @@ -99,6 +110,7 @@ def get_report_data( sources.values(), exposures.values(), seeds.values(), + snapshots.values(), singular_tests, ) @@ -147,7 +159,7 @@ def get_report_data( serializable_groups = groups.dict() serializable_models = self._serialize_models( - models, sources, exposures, seeds + models, sources, snapshots, exposures, seeds ) serializable_model_runs = self._serialize_models_runs(models_runs.runs) serializable_model_runs_totals = models_runs.dict(include={"totals"})[ @@ -207,10 +219,11 @@ def _serialize_models( self, models: Dict[str, NormalizedModelSchema], sources: Dict[str, NormalizedSourceSchema], + snapshots: Dict[str, NormalizedSnapshotSchema], exposures: Dict[str, NormalizedExposureSchema], seeds: Dict[str, NormalizedSeedSchema], ) -> Dict[str, dict]: - nodes = dict(**models, **sources, **exposures, **seeds) + nodes = dict(**models, **sources, **snapshots, **exposures, **seeds) serializable_nodes = dict() for key in nodes.keys(): serializable_nodes[key] = dict(nodes[key]) diff --git a/elementary/monitor/dbt_project/macros/get_snapshots.sql b/elementary/monitor/dbt_project/macros/get_snapshots.sql new file mode 100644 index 000000000..c62c1bfee --- /dev/null +++ b/elementary/monitor/dbt_project/macros/get_snapshots.sql @@ -0,0 +1,33 @@ +{% macro get_snapshots() %} + {% set dbt_snapshots_relation = ref('elementary', 'dbt_snapshots') %} + {%- if elementary.relation_exists(dbt_snapshots_relation) -%} + {% set get_snapshots_query %} + with dbt_artifacts_snapshots as ( + select + name, + unique_id, + owner, + tags, + package_name, + description, + meta, + materialization, + database_name, + schema_name, + depends_on_macros, + depends_on_nodes, + original_path as full_path, + path, + patch_path, + generated_at, + unique_key, + incremental_strategy + from {{ dbt_snapshots_relation }} + ) + select * from dbt_artifacts_snapshots + {% endset %} + + {% set snapshots_agate = run_query(get_snapshots_query) %} + {% do return(elementary.agate_to_dicts(snapshots_agate)) %} + {%- endif -%} +{% endmacro %} diff --git a/elementary/monitor/fetchers/models/models.py b/elementary/monitor/fetchers/models/models.py index 8061573f3..310966f05 100644 --- a/elementary/monitor/fetchers/models/models.py +++ b/elementary/monitor/fetchers/models/models.py @@ -8,6 +8,7 @@ ModelSchema, ModelTestCoverage, SeedSchema, + SnapshotSchema, SourceSchema, ) from elementary.utils.log import get_logger @@ -62,6 +63,16 @@ def get_sources(self) -> List[SourceSchema]: sources = [SourceSchema(**source) for source in sources] return sources + def get_snapshots(self) -> List[SnapshotSchema]: + run_operation_response = self.dbt_runner.run_operation( + macro_name="elementary_cli.get_snapshots" + ) + snapshots = ( + json.loads(run_operation_response[0]) if run_operation_response else [] + ) + snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots] + return snapshots + def get_exposures(self) -> List[ExposureSchema]: run_operation_response = self.dbt_runner.run_operation( macro_name="elementary_cli.get_exposures" diff --git a/elementary/monitor/fetchers/models/schema.py b/elementary/monitor/fetchers/models/schema.py index 2c895a1c6..4449a3bff 100644 --- a/elementary/monitor/fetchers/models/schema.py +++ b/elementary/monitor/fetchers/models/schema.py @@ -81,6 +81,24 @@ def ref(self): return f"source('{self.source_name}', '{self.table_name}')" +class SnapshotSchema(ArtifactSchema): + database_name: str + schema_name: str + depends_on_macros: str + depends_on_nodes: str + path: str + patch_path: Optional[str] + generated_at: str + unique_key: str + incremental_strategy: Optional[str] + + table_name: Optional[str] = None + + @validator("table_name", always=True) + def set_table_name(cls, table_name, values): + return values.get("name") + + class OwnerSchema(ExtendedBaseModel): name: Optional[str] = None email: Optional[str] = None