44import typing as t
55import logging
66from dataclasses import dataclass
7+ from collections import defaultdict
78
89from rich .console import Console as RichConsole
910from rich .tree import Tree
2122 PlanEvaluator ,
2223)
2324from sqlmesh .core .state_sync import StateReader
24- from sqlmesh .core .snapshot .definition import SnapshotInfoMixin , SnapshotIdAndVersion
25+ from sqlmesh .core .snapshot .definition import (
26+ SnapshotInfoMixin ,
27+ SnapshotIdAndVersion ,
28+ model_display_name ,
29+ )
2530from sqlmesh .utils import Verbosity , rich as srich , to_snake_case
2631from sqlmesh .utils .date import to_ts
2732from sqlmesh .utils .errors import SQLMeshError
@@ -75,8 +80,8 @@ class ExplainableRestatementStage(stages.RestatementStage):
7580 of what might happen when they ask for the plan to be explained
7681 """
7782
78- snapshot_intervals_to_clear : t .Dict [str , SnapshotIntervalClearRequest ]
79- """Which snapshots from other environments would have intervals cleared as part of restatement, keyed by name"""
83+ snapshot_intervals_to_clear : t .Dict [str , t . List [ SnapshotIntervalClearRequest ] ]
84+ """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name. """
8085
8186 @classmethod
8287 def from_restatement_stage (
@@ -92,10 +97,13 @@ def from_restatement_stage(
9297 loaded_snapshots = {s .snapshot_id : s for s in stage .all_snapshots .values ()},
9398 )
9499
100+ # Group the interval clear requests by snapshot name to make them easier to write to the console
101+ snapshot_intervals_to_clear = defaultdict (list )
102+ for clear_request in all_restatement_intervals .values ():
103+ snapshot_intervals_to_clear [clear_request .snapshot .name ].append (clear_request )
104+
95105 return cls (
96- snapshot_intervals_to_clear = {
97- s .snapshot .name : s for s in all_restatement_intervals .values ()
98- },
106+ snapshot_intervals_to_clear = snapshot_intervals_to_clear ,
99107 all_snapshots = stage .all_snapshots ,
100108 )
101109
@@ -198,15 +206,30 @@ def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage
198206 def visit_restatement_stage (
199207 self , stage : t .Union [ExplainableRestatementStage , stages .RestatementStage ]
200208 ) -> Tree :
201- tree = Tree ("[bold]Invalidate data intervals as part of restatement[/bold]" )
209+ tree = Tree (
210+ "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n "
211+ "This only affects state and will not clear physical data from the tables until the next plan for each environment"
212+ )
202213
203214 if isinstance (stage , ExplainableRestatementStage ) and (
204215 snapshot_intervals := stage .snapshot_intervals_to_clear
205216 ):
206- for clear_request in snapshot_intervals .values ():
207- display_name = self ._display_name (clear_request .snapshot )
208- interval = clear_request .interval
209- tree .add (f"{ display_name } [{ to_ts (interval [0 ])} - { to_ts (interval [1 ])} ]" )
217+ for name , clear_requests in snapshot_intervals .items ():
218+ display_name = model_display_name (
219+ name , self .environment_naming_info , self .default_catalog , self .dialect
220+ )
221+ interval_start = min (cr .interval [0 ] for cr in clear_requests )
222+ interval_end = max (cr .interval [1 ] for cr in clear_requests )
223+
224+ if not interval_start or not interval_end :
225+ continue
226+
227+ node = tree .add (f"{ display_name } [{ to_ts (interval_start )} - { to_ts (interval_end )} ]" )
228+
229+ all_environment_names = sorted (
230+ set (env_name for cr in clear_requests for env_name in cr .environment_names )
231+ )
232+ node .add ("in environments: " + ", " .join (all_environment_names ))
210233
211234 return tree
212235
0 commit comments