Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -31,6 +32,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down
30 changes: 28 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -39,6 +41,7 @@ class ModelsAPI(APIClient):
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
SnapshotSchema: "snapshots",
}

def __init__(self, dbt_runner: BaseDbtRunner):
Expand Down Expand Up @@ -162,6 +165,16 @@ def get_sources(self) -> Dict[str, NormalizedSourceSchema]:
sources[source_unique_id] = normalized_source
return sources

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshots_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshots_results:
for snapshot_result in snapshots_results:
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_exposures(
self,
upstream_node_ids: Optional[List[str]] = None,
Expand Down Expand Up @@ -257,19 +270,30 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSourceSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, ModelSchema, ExposureSchema, SourceSchema, SnapshotSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
NormalizedSnapshotSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
SnapshotSchema: NormalizedSnapshotSchema,
}
artifact_name = artifact.name
normalized_artifact = json.loads(artifact.json())
Expand Down Expand Up @@ -308,7 +332,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -51,6 +52,11 @@ class NormalizedSourceSchema(NormalizedArtifactSchema, SourceSchema):
artifact_type: str = Field("source", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedExposureSchema(NormalizedArtifactSchema, ExposureSchema):
artifact_type: str = Field("exposure", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -90,6 +99,8 @@ def get_report_data(
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
lineage_node_ids.extend(sources.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
exposures = models_api.get_exposures(upstream_node_ids=lineage_node_ids)
lineage_node_ids.extend(exposures.keys())
singular_tests = tests_api.get_singular_tests()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, snapshots, exposures, seeds
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -207,10 +219,11 @@ def _serialize_models(
self,
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **snapshots, **exposures, **seeds)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
33 changes: 33 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{% macro get_snapshots() %}
{% set dbt_snapshots_relation = ref('elementary', 'dbt_snapshots') %}
{%- if elementary.relation_exists(dbt_snapshots_relation) -%}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
unique_id,
owner,
tags,
package_name,
description,
meta,
materialization,
database_name,
schema_name,
depends_on_macros,
depends_on_nodes,
original_path as full_path,
path,
patch_path,
generated_at,
unique_key,
incremental_strategy
from {{ dbt_snapshots_relation }}
)
select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{%- endif -%}
{% endmacro %}
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -62,6 +63,16 @@ def get_sources(self) -> List[SourceSchema]:
sources = [SourceSchema(**source) for source in sources]
return sources

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_exposures(self) -> List[ExposureSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_exposures"
Expand Down
18 changes: 18 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ def ref(self):
return f"source('{self.source_name}', '{self.table_name}')"


class SnapshotSchema(ArtifactSchema):
database_name: str
schema_name: str
depends_on_macros: str
depends_on_nodes: str
path: str
patch_path: Optional[str]
generated_at: str
unique_key: str
incremental_strategy: Optional[str]

table_name: Optional[str] = None

@validator("table_name", always=True)
def set_table_name(cls, table_name, values):
return values.get("name")


class OwnerSchema(ExtendedBaseModel):
name: Optional[str] = None
email: Optional[str] = None
Expand Down
Loading