Skip to content

Commit 92794f9

Browse files
committed
When dimension nodes lose columns, we should validate that no nodes that linked to that dimension or cubes that use that dimension are broken
1 parent e40902c commit 92794f9

File tree

4 files changed

+288
-20
lines changed

4 files changed

+288
-20
lines changed

datajunction-server/datajunction_server/internal/deployment/orchestrator.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,21 @@ class _DryRunRollback(Exception):
158158

159159

160160
class _NodeIdProxy:
161-
"""Minimal stand-in for Node that carries the DB primary key and node type.
161+
"""Minimal stand-in for Node that carries the DB primary key, node type, and name.
162162
163163
Used in ``changed_nodes`` passed to ``compute_impact`` after a session
164164
commit or savepoint rollback, when the actual ORM Node objects may be
165165
expired or in an inconsistent state. The BFS needs ``.id`` to traverse
166-
``NodeRelationship`` and ``.type`` to check whether a changed node is a
167-
DIMENSION (Stage 3 of the BFS).
166+
``NodeRelationship``, ``.type`` to check whether a changed node is a
167+
DIMENSION (Stage 3 of the BFS), and ``.name`` for get_dimension_inbound_bfs.
168168
"""
169169

170-
__slots__ = ("id", "type")
170+
__slots__ = ("id", "type", "name")
171171

172-
def __init__(self, node_id: int, node_type: "NodeType") -> None:
172+
def __init__(self, node_id: int, node_type: "NodeType", name: str) -> None:
173173
self.id = node_id
174174
self.type = node_type
175+
self.name = name
175176

176177

177178
@dataclass
@@ -468,7 +469,7 @@ async def _compute_downstream_impact(
468469
node_type = pre_deploy_node_types.get(name, node_spec.node_type)
469470
changed_nodes[name] = (
470471
explicit_spec,
471-
cast(Node, _NodeIdProxy(node_id, node_type)),
472+
cast(Node, _NodeIdProxy(node_id, node_type, name)),
472473
set(),
473474
)
474475

@@ -479,7 +480,7 @@ async def _compute_downstream_impact(
479480
node_type = pre_deploy_node_types.get(name, node_spec.node_type)
480481
changed_nodes[name] = (
481482
None,
482-
cast(Node, _NodeIdProxy(node_id, node_type)),
483+
cast(Node, _NodeIdProxy(node_id, node_type, name)),
483484
set(),
484485
)
485486

datajunction-server/datajunction_server/internal/impact.py

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from sqlalchemy.sql.expression import bindparam, text
2121

2222
from datajunction_server.database.column import Column
23+
from datajunction_server.database.dimensionlink import DimensionLink
2324
from datajunction_server.database.node import Node, NodeRevision
2425
from datajunction_server.internal.deployment.validation import bulk_validate_node_data
2526
from datajunction_server.models.deployment import (
@@ -41,6 +42,7 @@
4142
get_downstream_nodes,
4243
)
4344
from datajunction_server.sql.parsing.types import PRIMITIVE_TYPES
45+
from datajunction_server.utils import SEPARATOR
4446

4547
_logger = logging.getLogger(__name__)
4648

@@ -795,18 +797,27 @@ async def _seed_bfs(
795797
)
796798

797799
# ---------------------------------------------------------------------------
798-
# Stage 3 — dimension-loses-columns pass
800+
# Stage 3 — dimension-loses-columns (or becomes-invalid) pass
799801
# ---------------------------------------------------------------------------
800-
dim_nodes_losing_cols = [
801-
state.node_cache[name]
802-
for name, prop in state.propagated_change.items()
803-
if prop.columns_removed
804-
and state.node_cache.get(name) is not None
805-
and state.node_cache[name].type == NodeType.DIMENSION
806-
]
807-
if dim_nodes_losing_cols:
802+
# Trigger when a dimension node loses specific columns OR becomes entirely
803+
# invalid (e.g. new query fails validation). ``dim_is_invalid=True`` means
804+
# we couldn't determine which columns changed — treat all holders as broken.
805+
dim_nodes_stage3: list[tuple[Node, set[str], bool]] = []
806+
for name, prop in state.propagated_change.items():
807+
node = state.node_cache.get(name)
808+
if node is None or node.type != NodeType.DIMENSION:
809+
continue
810+
if prop.columns_removed:
811+
dim_nodes_stage3.append((node, prop.columns_removed, False))
812+
elif prop.is_invalid or (
813+
prop.proposed_columns is not None and not prop.proposed_columns
814+
):
815+
# Dimension became invalid — all dimension-link holders are affected
816+
dim_nodes_stage3.append((node, set(), True))
817+
818+
if dim_nodes_stage3:
808819
holder_names: set[str] = set()
809-
for dim_node in dim_nodes_losing_cols:
820+
for dim_node, _, _ in dim_nodes_stage3:
810821
holder_displays, _ = await get_dimension_inbound_bfs(
811822
session,
812823
dim_node,
@@ -819,7 +830,7 @@ async def _seed_bfs(
819830
list(holder_names),
820831
options=list(_node_output_options()),
821832
)
822-
dim_node_names = [n.name for n in dim_nodes_losing_cols]
833+
dim_node_names = [n.name for n, _, _ in dim_nodes_stage3]
823834
for holder in holders:
824835
if holder.name not in state.propagated_change: # pragma: no branch
825836
state.propagated_change[holder.name] = _PropagatedChange(
@@ -831,6 +842,40 @@ async def _seed_bfs(
831842
state.node_cache[holder.name] = holder
832843
state.frontier.add(holder.name)
833844
state.visited.add(holder.name)
845+
846+
# Determine whether the holder's dimension link is broken:
847+
# - dimension invalid entirely → all links to it are broken
848+
# - specific PK-side (join-target) columns removed → link is broken
849+
for dim_node, removed_cols, dim_is_invalid in dim_nodes_stage3:
850+
for link in holder.current.dimension_links:
851+
if link.dimension.name != dim_node.name:
852+
continue
853+
broken_pk_cols = (
854+
set()
855+
if dim_is_invalid
856+
else _dim_pk_col_names(link) & removed_cols
857+
)
858+
if dim_is_invalid or broken_pk_cols:
859+
reason = (
860+
f"Dimension link to {dim_node.name} is broken: "
861+
"dimension node is invalid"
862+
if dim_is_invalid
863+
else (
864+
f"Dimension link to {dim_node.name} is broken: "
865+
f"join-target columns removed: "
866+
f"{', '.join(sorted(broken_pk_cols))}"
867+
)
868+
)
869+
_record_impact(
870+
state.impacted,
871+
holder,
872+
impact_type="dimension_link",
873+
caused_by=[dim_node.name],
874+
reason=reason,
875+
)
876+
state.propagated_change[holder.name].is_invalid = True
877+
break
878+
834879
await _check_cube_dim_link_impacts(
835880
session=session,
836881
node_name=holder.name,
@@ -902,10 +947,14 @@ async def _propagate_column_change(
902947
caused_by=[parent_name],
903948
reason=_column_reason(prop),
904949
)
950+
# Case 3: if this child has dimension links whose FK columns are now
951+
# absent from its proposed output, those links are broken too.
952+
new_col_names = {c.name for c in new_cols} if new_cols else set()
953+
broken_links = _broken_fk_dim_links(child, new_col_names)
905954
_merge_propagated(
906955
state.propagated_change,
907956
child_name,
908-
dim_links_removed=set(),
957+
dim_links_removed=broken_links,
909958
is_deleted=False,
910959
is_invalid=len(new_cols) == 0,
911960
caused_by=[parent_name],
@@ -1112,3 +1161,39 @@ def _column_reason(change: _PropagatedChange) -> str:
11121161
def _dim_link_reason(change: _PropagatedChange) -> str:
11131162
dims = sorted(change.dim_links_removed)
11141163
return f"Dimension links removed: {', '.join(dims)}"
1164+
1165+
1166+
def _dim_pk_col_names(link: "DimensionLink") -> set[str]:
1167+
"""Return the dimension-side column names used as join targets in this link.
1168+
1169+
Parses the link's join SQL to find which columns on the dimension node appear
1170+
in the equality conditions. Returns an empty set if parsing fails.
1171+
"""
1172+
try:
1173+
dim_prefix = link.dimension.name + SEPARATOR
1174+
return {
1175+
pk_ident[len(dim_prefix) :]
1176+
if pk_ident.startswith(dim_prefix)
1177+
else pk_ident.rsplit(SEPARATOR, 1)[-1]
1178+
for pk_ident in link.foreign_keys.values()
1179+
if pk_ident is not None
1180+
}
1181+
except Exception:
1182+
return set()
1183+
1184+
1185+
def _broken_fk_dim_links(child: "Node", new_col_names: set[str]) -> set[str]:
1186+
"""Return dimension node names whose FK link on ``child`` is now broken.
1187+
1188+
A link is broken when one or more of the child's FK columns used in the
1189+
join condition are absent from ``new_col_names`` (the proposed output columns
1190+
after an upstream change).
1191+
"""
1192+
broken: set[str] = set()
1193+
for link in child.current.dimension_links:
1194+
try:
1195+
if link.foreign_key_column_names - new_col_names:
1196+
broken.add(link.dimension.name)
1197+
except Exception:
1198+
pass
1199+
return broken

datajunction-server/datajunction_server/internal/validation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def update_ast_column_types(node: ast.Node) -> None:
3333
raw ColumnType strings.
3434
"""
3535
if isinstance(node, ast.Column) and node._table and hasattr(node._table, "dj_node"):
36-
for db_col in node._table.dj_node.columns:
36+
dj_node = node._table.dj_node # type: ignore[attr-defined]
37+
for db_col in dj_node.columns:
3738
if db_col.name == node.name.name:
3839
node._type = db_col.type
3940
break

0 commit comments

Comments
 (0)