1- from re import compile
1+ from json import loads as json_loads
22
33from snowddl .blueprint import DynamicTableBlueprint
44from snowddl .resolver .abc_schema_object_resolver import AbstractSchemaObjectResolver , ResolveResult , ObjectType
55
6- cluster_by_syntax_re = compile (r"^(\w+)?\((.*)\)$" )
7-
86
97class DynamicTableResolver (AbstractSchemaObjectResolver ):
10- # Dynamic tables are available for all accounts during preview, including STANDARD edition
11- # skip_min_edition = Edition.ENTERPRISE
128 skip_on_empty_blueprints = True
139
10+ unit_to_seconds_multiplier = {
11+ "second" : 1 ,
12+ "seconds" : 1 ,
13+ "minute" : 60 ,
14+ "minutes" : 60 ,
15+ "hour" : 3600 ,
16+ "hours" : 3600 ,
17+ "day" : 86400 ,
18+ "days" : 86400 ,
19+ }
20+
1421 def get_object_type (self ) -> ObjectType :
1522 return ObjectType .DYNAMIC_TABLE
1623
1724 def get_existing_objects_in_schema (self , schema : dict ):
1825 existing_objects = {}
1926
2027 cur = self .engine .execute_meta (
21- "SHOW DYNAMIC TABLES IN SCHEMA {database:i}.{schema:i}" ,
28+ "SHOW AS RESOURCE DYNAMIC TABLES IN SCHEMA {database:i}.{schema:i}" ,
2229 {
2330 "database" : schema ["database" ],
2431 "schema" : schema ["schema" ],
2532 },
2633 )
2734
2835 for r in cur :
36+ r = json_loads (r ["As Resource" ])
37+
2938 existing_objects [f"{ r ['database_name' ]} .{ r ['schema_name' ]} .{ r ['name' ]} " ] = {
3039 "database" : r ["database_name" ],
3140 "schema" : r ["schema_name" ],
3241 "name" : r ["name" ],
3342 "owner" : r ["owner" ],
34- # Extract SQL query text only, skip the initial "CREATE DYNAMIC TABLE ..." part
35- # Snowflake modifies original SQL text in this column, it cannot be compared directly
36- "text" : r ["text" ].partition ("\n AS\n " )[2 ].rstrip (";" ),
37- "cluster_by" : r ["cluster_by" ] if r ["cluster_by" ] else None ,
43+ "is_transient" : r ["kind" ] == "TRANSIENT" ,
44+ "retention_time" : r ["data_retention_time_in_days" ],
45+ "columns" : r ["columns" ],
46+ "text" : r ["query" ].rstrip (";" ),
47+ "cluster_by" : r ["cluster_by" ],
3848 "target_lag" : r ["target_lag" ],
3949 "refresh_mode" : r ["refresh_mode" ],
4050 "warehouse" : r ["warehouse" ],
41- "comment" : r ["comment" ] if r [ "comment" ] else None ,
51+ "comment" : r ["comment" ],
4252 }
4353
4454 return existing_objects
@@ -66,16 +76,25 @@ def create_object(self, bp: DynamicTableBlueprint):
6676
6777 return ResolveResult .CREATE
6878
69- def _compare_cluster_by (self , bp : DynamicTableBlueprint , row : dict ):
70- bp_cluster_by = ", " .join (bp .cluster_by ) if bp .cluster_by else None
71- snow_cluster_by = cluster_by_syntax_re .sub (r"\2" , row ["cluster_by" ]) if row ["cluster_by" ] else None
72-
73- return bp_cluster_by == snow_cluster_by
74-
7579 def compare_object (self , bp : DynamicTableBlueprint , row : dict ):
7680 result = ResolveResult .NOCHANGE
81+ replace_reasons = []
82+
83+ if bp .columns and [str (c .name ) for c in bp .columns ] != [str (c ["name" ]) for c in row ["columns" ]]:
84+ replace_reasons .append ("Column definition was changed" )
7785
7886 if bp .text != row ["text" ]:
87+ replace_reasons .append ("SQL text was changed" )
88+
89+ if bp .is_transient is True and row ["is_transient" ] is False :
90+ replace_reasons .append ("Dynamic table type was changed to TRANSIENT" )
91+ elif bp .is_transient is False and row ["is_transient" ] is True :
92+ replace_reasons .append ("Dynamic table type was changed to PERMANENT" )
93+
94+ if bp .refresh_mode and bp .refresh_mode != "AUTO" and bp .refresh_mode != row ["refresh_mode" ]:
95+ replace_reasons .append (f"Refresh mode was changed to { bp .refresh_mode } " )
96+
97+ if replace_reasons :
7998 query = self .engine .query_builder ()
8099 query .append ("CREATE OR REPLACE" )
81100
@@ -90,11 +109,11 @@ def compare_object(self, bp: DynamicTableBlueprint, row: dict):
90109 )
91110
92111 query .append (self ._build_common_dynamic_table_sql (bp ))
93- self .engine .execute_unsafe_ddl (query )
112+ self .engine .execute_unsafe_ddl (" \n " . join ( f"-- { r } " for r in replace_reasons ) + " \n " + str ( query ) )
94113
95114 return ResolveResult .REPLACE
96115
97- if bp . target_lag != row [ "target_lag" ] :
116+ if not self . _compare_target_lag ( bp , row ) :
98117 self .engine .execute_safe_ddl (
99118 "ALTER DYNAMIC TABLE {full_name:i} SET TARGET_LAG = {target_lag}" ,
100119 {
@@ -133,6 +152,17 @@ def compare_object(self, bp: DynamicTableBlueprint, row: dict):
133152
134153 result = ResolveResult .ALTER
135154
155+ if bp .retention_time is not None and bp .retention_time != row ["retention_time" ]:
156+ self .engine .execute_unsafe_ddl (
157+ "ALTER DYNAMIC TABLE {full_name:i} SET DATA_RETENTION_TIME_IN_DAYS = {retention_time:d}" ,
158+ {
159+ "full_name" : bp .full_name ,
160+ "retention_time" : bp .retention_time ,
161+ },
162+ )
163+
164+ result = ResolveResult .ALTER
165+
136166 if bp .comment != row ["comment" ]:
137167 self .engine .execute_safe_ddl (
138168 "ALTER DYNAMIC TABLE {full_name:i} SET COMMENT = {comment}" ,
@@ -144,6 +174,30 @@ def compare_object(self, bp: DynamicTableBlueprint, row: dict):
144174
145175 result = ResolveResult .ALTER
146176
177+ for idx , c in enumerate (row ["columns" ]):
178+ bp_col_comment = bp .columns [idx ].comment if bp .columns else None
179+
180+ if bp_col_comment != c ["comment" ]:
181+ if bp_col_comment :
182+ self .engine .execute_safe_ddl (
183+ "ALTER DYNAMIC TABLE {full_name:i} MODIFY COLUMN {column_name:i} COMMENT {comment}" ,
184+ {
185+ "full_name" : bp .full_name ,
186+ "column_name" : c ["name" ],
187+ "comment" : bp_col_comment ,
188+ },
189+ )
190+ else :
191+ self .engine .execute_safe_ddl (
192+ "ALTER DYNAMIC TABLE {full_name:i} MODIFY COLUMN {column_name:i} UNSET COMMENT" ,
193+ {
194+ "full_name" : bp .full_name ,
195+ "column_name" : c ["name" ],
196+ },
197+ )
198+
199+ result = ResolveResult .ALTER
200+
147201 return result
148202
149203 def drop_object (self , row : dict ):
@@ -241,3 +295,25 @@ def _build_common_dynamic_table_sql(self, bp: DynamicTableBlueprint):
241295 query .append_nl (bp .text )
242296
243297 return query
298+
299+ def _compare_cluster_by (self , bp : DynamicTableBlueprint , row : dict ):
300+ bp_cluster_by = ", " .join (bp .cluster_by ).upper () if bp .cluster_by else None
301+ snow_cluster_by = ", " .join (row ["cluster_by" ]).upper () if row ["cluster_by" ] else None
302+
303+ return bp_cluster_by == snow_cluster_by
304+
305+ def _compare_target_lag (self , bp : DynamicTableBlueprint , row : dict ):
306+ if bp .target_lag == "DOWNSTREAM" :
307+ return row ["target_lag" ]["type" ] == "DOWNSTREAM"
308+
309+ num , _ , unit = bp .target_lag .partition (" " )
310+
311+ num = int (num )
312+ unit = unit .lower ()
313+
314+ num_in_seconds = num * self .unit_to_seconds_multiplier [unit ]
315+
316+ if row ["target_lag" ]["type" ] == "USER_DEFINED" and row ["target_lag" ]["seconds" ] == num_in_seconds :
317+ return True
318+
319+ return False
0 commit comments