@@ -416,6 +416,7 @@ def run_merged_intervals(
416416 start : t .Optional [TimeLike ] = None ,
417417 end : t .Optional [TimeLike ] = None ,
418418 allow_destructive_snapshots : t .Optional [t .Set [str ]] = None ,
419+ selected_models : t .Optional [t .Set [str ]] = None ,
419420 allow_additive_snapshots : t .Optional [t .Set [str ]] = None ,
420421 selected_snapshot_ids : t .Optional [t .Set [SnapshotId ]] = None ,
421422 run_environment_statements : bool = False ,
@@ -445,7 +446,13 @@ def run_merged_intervals(
445446 if not selected_snapshots :
446447 selected_snapshots = list (merged_intervals )
447448
448- snapshot_dag = snapshots_to_dag (selected_snapshots )
449+ # Build the full DAG from all snapshots to preserve transitive dependencies
450+ full_dag = snapshots_to_dag (self .snapshots .values ())
451+
452+ # Create a subdag that includes the selected snapshots and all their upstream dependencies
453+ # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected
454+ selected_snapshot_ids_set = {s .snapshot_id for s in selected_snapshots }
455+ snapshot_dag = full_dag .subdag (* selected_snapshot_ids_set )
449456
450457 batched_intervals = self .batch_intervals (
451458 merged_intervals , deployability_index , environment_naming_info , dag = snapshot_dag
@@ -472,6 +479,7 @@ def run_merged_intervals(
472479 start = start ,
473480 end = end ,
474481 execution_time = execution_time ,
482+ selected_models = selected_models ,
475483 )
476484
477485 # We only need to create physical tables if the snapshot is not representative or if it
@@ -533,6 +541,7 @@ def run_node(node: SchedulingUnit) -> None:
533541 allow_destructive_snapshots = allow_destructive_snapshots ,
534542 allow_additive_snapshots = allow_additive_snapshots ,
535543 target_table_exists = snapshot .snapshot_id not in snapshots_to_create ,
544+ selected_models = selected_models ,
536545 )
537546
538547 evaluation_duration_ms = now_timestamp () - execution_start_ts
@@ -602,6 +611,7 @@ def run_node(node: SchedulingUnit) -> None:
602611 start = start ,
603612 end = end ,
604613 execution_time = execution_time ,
614+ selected_models = selected_models ,
605615 )
606616
607617 self .state_sync .recycle ()
@@ -642,20 +652,11 @@ def _dag(
642652 upstream_dependencies : t .List [SchedulingUnit ] = []
643653
644654 for p_sid in snapshot .parents :
645- if p_sid in self .snapshots :
646- p_intervals = intervals_per_snapshot .get (p_sid .name , [])
647-
648- if not p_intervals and p_sid in original_snapshots_to_create :
649- upstream_dependencies .append (CreateNode (snapshot_name = p_sid .name ))
650- elif len (p_intervals ) > 1 :
651- upstream_dependencies .append (DummyNode (snapshot_name = p_sid .name ))
652- else :
653- for i , interval in enumerate (p_intervals ):
654- upstream_dependencies .append (
655- EvaluateNode (
656- snapshot_name = p_sid .name , interval = interval , batch_index = i
657- )
658- )
655+ upstream_dependencies .extend (
656+ self ._find_upstream_dependencies (
657+ p_sid , intervals_per_snapshot , original_snapshots_to_create
658+ )
659+ )
659660
660661 batch_concurrency = snapshot .node .batch_concurrency
661662 batch_size = snapshot .node .batch_size
@@ -699,6 +700,36 @@ def _dag(
699700 )
700701 return dag
701702
703+ def _find_upstream_dependencies (
704+ self ,
705+ parent_sid : SnapshotId ,
706+ intervals_per_snapshot : t .Dict [str , Intervals ],
707+ snapshots_to_create : t .Set [SnapshotId ],
708+ ) -> t .List [SchedulingUnit ]:
709+ if parent_sid not in self .snapshots :
710+ return []
711+
712+ p_intervals = intervals_per_snapshot .get (parent_sid .name , [])
713+
714+ if p_intervals :
715+ if len (p_intervals ) > 1 :
716+ return [DummyNode (snapshot_name = parent_sid .name )]
717+ interval = p_intervals [0 ]
718+ return [EvaluateNode (snapshot_name = parent_sid .name , interval = interval , batch_index = 0 )]
719+ if parent_sid in snapshots_to_create :
720+ return [CreateNode (snapshot_name = parent_sid .name )]
721+ # This snapshot has no intervals and doesn't need creation which means
722+ # that it can be a transitive dependency
723+ transitive_deps : t .List [SchedulingUnit ] = []
724+ parent_snapshot = self .snapshots [parent_sid ]
725+ for grandparent_sid in parent_snapshot .parents :
726+ transitive_deps .extend (
727+ self ._find_upstream_dependencies (
728+ grandparent_sid , intervals_per_snapshot , snapshots_to_create
729+ )
730+ )
731+ return transitive_deps
732+
702733 def _run_or_audit (
703734 self ,
704735 environment : str | EnvironmentNamingInfo ,
@@ -808,6 +839,7 @@ def _run_or_audit(
808839 run_environment_statements = run_environment_statements ,
809840 audit_only = audit_only ,
810841 auto_restatement_triggers = auto_restatement_triggers ,
842+ selected_models = {s .node .dbt_name for s in merged_intervals if s .node .dbt_name },
811843 )
812844
813845 return CompletionStatus .FAILURE if errors else CompletionStatus .SUCCESS
0 commit comments