1- from snowddl .blueprint import TableBlueprint
1+ from snowddl .blueprint import TableBlueprint , SchemaObjectIdent
22from snowddl .resolver .abc_schema_object_resolver import AbstractResolver , ResolveResult , ObjectType
33
44
@@ -26,6 +26,7 @@ def get_existing_objects(self):
2626
2727 def get_databases_for_clone (self ):
2828 databases_for_clone = {}
29+ clone_source_env_prefix = self .engine .settings .clone_source_env_prefix
2930
3031 cur = self .engine .execute_meta ("SHOW DATABASES" )
3132
@@ -34,12 +35,24 @@ def get_databases_for_clone(self):
3435 if r ["origin" ]:
3536 continue
3637
37- # Skip databases without destination for cloning
38- if f"{ self .config .env_prefix } { r ['name' ]} " not in self .engine .schema_cache .databases :
38+ src_database = str (r ["name" ])
39+
40+ if clone_source_env_prefix :
41+ # Skip everything which does not start with source prefix
42+ if not src_database .startswith (clone_source_env_prefix ):
43+ continue
44+
45+ dst_database = f"{ self .config .env_prefix } { src_database .removeprefix (clone_source_env_prefix )} "
46+ else :
47+ dst_database = f"{ self .config .env_prefix } { src_database } "
48+
49+ # Skip every source database without destination database for cloning
50+ if dst_database not in self .engine .schema_cache .databases :
3951 continue
4052
41- databases_for_clone [r ["name" ]] = {
42- "database" : r ["name" ],
53+ databases_for_clone [dst_database ] = {
54+ "src_database" : src_database ,
55+ "dst_database" : dst_database ,
4356 }
4457
4558 return databases_for_clone
@@ -48,9 +61,9 @@ def get_schemas_for_clone(self, database):
4861 schemas_for_clone = {}
4962
5063 cur = self .engine .execute_meta (
51- "SHOW SCHEMAS IN DATABASE {database :i}" ,
64+ "SHOW SCHEMAS IN DATABASE {src_database :i}" ,
5265 {
53- "database " : database ["database " ],
66+ "src_database " : database ["src_database " ],
5467 },
5568 )
5669
@@ -59,12 +72,15 @@ def get_schemas_for_clone(self, database):
5972 if r ["name" ] == "INFORMATION_SCHEMA" :
6073 continue
6174
62- # Skip schemas without destination for cloning
63- if f"{ self .config .env_prefix } { r ['database_name' ]} .{ r ['name' ]} " not in self .engine .schema_cache .schemas :
75+ dst_schema = f"{ database ['dst_database' ]} .{ r ['name' ]} "
76+
77+ # Skip every source schema without destination schema for cloning
78+ if dst_schema not in self .engine .schema_cache .schemas :
6479 continue
6580
66- schemas_for_clone [f"{ r ['database_name' ]} .{ r ['name' ]} " ] = {
67- "database" : r ["database_name" ],
81+ schemas_for_clone [dst_schema ] = {
82+ "src_database" : database ["src_database" ],
83+ "dst_database" : database ["dst_database" ],
6884 "schema" : r ["name" ],
6985 }
7086
@@ -74,9 +90,9 @@ def get_tables_for_clone(self, schema):
7490 tables_for_clone = {}
7591
7692 cur = self .engine .execute_meta (
77- "SHOW TABLES IN SCHEMA {database :i}.{schema:i}" ,
93+ "SHOW TABLES IN SCHEMA {src_database :i}.{schema:i}" ,
7894 {
79- "database " : schema ["database " ],
95+ "src_database " : schema ["src_database " ],
8096 "schema" : schema ["schema" ],
8197 },
8298 )
@@ -92,8 +108,9 @@ def get_tables_for_clone(self, schema):
92108 ):
93109 continue
94110
95- tables_for_clone [f"{ self .config .env_prefix } { r ['database_name' ]} .{ r ['schema_name' ]} .{ r ['name' ]} " ] = {
96- "database" : r ["database_name" ],
111+ tables_for_clone [f"{ schema ['dst_database' ]} .{ r ['schema_name' ]} .{ r ['name' ]} " ] = {
112+ "src_database" : schema ["src_database" ],
113+ "dst_database" : schema ["dst_database" ],
97114 "schema" : r ["schema_name" ],
98115 "name" : r ["name" ],
99116 "is_transient" : r ["kind" ] == "TRANSIENT" ,
@@ -119,27 +136,30 @@ def drop_object(self, row: dict):
119136 query .append ("TRANSIENT" )
120137
121138 query .append (
122- "TABLE IF NOT EXISTS {database_with_prefix :i}.{schema:i}.{table_name :i}" ,
139+ "TABLE IF NOT EXISTS {dst_database :i}.{schema:i}.{name :i}" ,
123140 {
124- "database_with_prefix " : f" { self . config . env_prefix } { row ['database' ] } " ,
141+ "dst_database " : row ["dst_database" ] ,
125142 "schema" : row ["schema" ],
126- "table_name " : row ["name" ],
143+ "name " : row ["name" ],
127144 },
128145 )
129146
130147 query .append_nl (
131- "CLONE {database :i}.{schema:i}.{table_name :i}" ,
148+ "CLONE {src_database :i}.{schema:i}.{name :i}" ,
132149 {
133- "database " : row ["database " ],
150+ "src_database " : row ["src_database " ],
134151 "schema" : row ["schema" ],
135- "table_name " : row ["name" ],
152+ "name " : row ["name" ],
136153 },
137154 )
138155
139156 cur = self .engine .execute_clone (query )
140157 r = cur .fetchone ()
141158
142159 if str (r ["status" ]).endswith ("successfully created." ):
160+ self ._drop_existing_policy_refs (
161+ ObjectType .TABLE , SchemaObjectIdent ("" , row ["dst_database" ], row ["schema" ], row ["name" ])
162+ )
143163 return ResolveResult .CREATE
144164
145165 return ResolveResult .NOCHANGE
@@ -155,3 +175,59 @@ def _is_skipped(self):
155175 return True
156176
157177 return False
178+
179+ def _drop_existing_policy_refs (self , object_type : ObjectType , object_name : SchemaObjectIdent ):
180+ cur = self .engine .execute_meta (
181+ "SELECT * FROM TABLE(snowflake.information_schema.policy_references(ref_entity_domain => {object_type}, ref_entity_name => {object_name}))" ,
182+ {
183+ "object_type" : object_type .singular_for_ref ,
184+ "object_name" : object_name ,
185+ },
186+ )
187+
188+ for r in cur :
189+ if r ["POLICY_KIND" ] == "AGGREGATION_POLICY" :
190+ self .engine .execute_clone (
191+ "ALTER {object_type:r} {object_name:i} UNSET AGGREGATION POLICY" ,
192+ {
193+ "object_type" : object_type .singular_for_ref ,
194+ "object_name" : object_name ,
195+ },
196+ )
197+
198+ elif r ["POLICY_KIND" ] == "MASKING_POLICY" :
199+ self .engine .execute_clone (
200+ "ALTER {object_type:r} {object_name:i} MODIFY COLUMN {column:i} UNSET MASKING POLICY" ,
201+ {
202+ "object_type" : object_type .singular_for_ref ,
203+ "object_name" : object_name ,
204+ "column" : r ["REF_COLUMN_NAME" ],
205+ },
206+ )
207+
208+ elif r ["POLICY_KIND" ] == "PROJECTION_POLICY" :
209+ self .engine .execute_clone (
210+ "ALTER {object_type:r} {object_name:i} MODIFY COLUMN {column:i} UNSET PROJECTION POLICY" ,
211+ {
212+ "object_type" : object_type .singular_for_ref ,
213+ "object_name" : object_name ,
214+ "column" : r ["REF_COLUMN_NAME" ],
215+ },
216+ )
217+
218+ elif r ["POLICY_KIND" ] == "ROW_ACCESS_POLICY" :
219+ policy_name = SchemaObjectIdent ("" , r ["POLICY_DB" ], r ["POLICY_SCHEMA" ], r ["POLICY_NAME" ])
220+
221+ self .engine .execute_clone (
222+ "ALTER {object_type:r} {object_name:i} DROP ROW ACCESS POLICY {policy_name:i}" ,
223+ {
224+ "object_type" : object_type .singular_for_ref ,
225+ "object_name" : object_name ,
226+ "policy_name" : policy_name ,
227+ },
228+ )
229+
230+ else :
231+ self .engine .logger .warning (
232+ f"Detected unknown policy type [{ r ['POLICY_KIND' ]} attached to cloned object [{ object_name } ]"
233+ )
0 commit comments