@@ -589,9 +589,17 @@ async def handle( # type: ignore
589589 )
590590
591591 async with restore_internal_tables_context (
592- apps , namespace , name , conn_factory , repository , snapshot , logger
592+ apps ,
593+ namespace ,
594+ name ,
595+ conn_factory ,
596+ repository ,
597+ snapshot ,
598+ logger ,
599+ restore_type ,
600+ tables ,
593601 ) as internal_tables :
594- await internal_tables .remove_duplicated_tables ( restore_type , tables )
602+ await internal_tables .rename_duplicated_tables ( )
595603
596604 try :
597605 await self ._start_restore_snapshot (
@@ -1121,16 +1129,27 @@ async def handle( # type: ignore
11211129
11221130@asynccontextmanager
11231131async def restore_internal_tables_context (
1124- apps , namespace , name , conn_factory , repository , snapshot , logger
1132+ apps ,
1133+ namespace ,
1134+ name ,
1135+ conn_factory ,
1136+ repository ,
1137+ snapshot ,
1138+ logger ,
1139+ restore_type ,
1140+ tables ,
11251141):
1126- logger .info ("Suspending grand-central operations before restoring internal tables" )
1127- await suspend_or_start_grand_central (apps , namespace , name , suspend = True )
11281142 internal_tables = RestoreInternalTables (conn_factory , repository , snapshot , logger )
1143+ await internal_tables .set_gc_tables (restore_type , tables )
1144+ if internal_tables .is_enabled ():
1145+ logger .info ("Suspending GC operations before restoring internal tables" )
1146+ await suspend_or_start_grand_central (apps , namespace , name , suspend = True )
11291147 try :
11301148 yield internal_tables
11311149 finally :
1132- logger .info ("Resuming grand-central operations after restoring internal tables" )
1133- await suspend_or_start_grand_central (apps , namespace , name , suspend = False )
1150+ if internal_tables .is_enabled ():
1151+ logger .info ("Resuming GC operations after restoring internal tables" )
1152+ await suspend_or_start_grand_central (apps , namespace , name , suspend = False )
11341153
11351154
11361155class RestoreInternalTables :
@@ -1147,33 +1166,34 @@ def __init__(
11471166 self .snapshot : str = snapshot
11481167 self .logger : logging .Logger = logger
11491168
1150- self .gc_tables_renamed : bool = False
11511169 self .gc_tables : list [str ] = []
11521170
1153- async def remove_duplicated_tables (
1154- self , restore_type , tables : Optional [List [str ]] = None
1155- ):
1156- if restore_type == SnapshotRestoreType .ALL .value :
1157- await self ._remove_duplicated_tables ()
1158- elif restore_type == SnapshotRestoreType .TABLES .value :
1159- await self ._remove_duplicated_tables (tables )
1171+ def is_enabled (self ) -> bool :
1172+ return True if self .gc_tables else False
11601173
1161- async def _remove_duplicated_tables (self , tables : Optional [List [str ]] = None ):
1174+ async def set_gc_tables (
1175+ self , restore_type : str , tables : Optional [list [str ]] = None
1176+ ):
11621177 """
1163- If the snapshot contains grand-central tables, rename them if they exist
1164- in the cluster in order to recreate the new ones from the snapshot.
1178+ Retrieve the grand central tables from the snapshot to be restored.
11651179 """
1166- self .gc_tables_renamed = True
1180+
1181+ if restore_type not in [
1182+ SnapshotRestoreType .ALL .value ,
1183+ SnapshotRestoreType .TABLES .value ,
1184+ ]:
1185+ return
1186+
1187+ if restore_type == SnapshotRestoreType .TABLES .value and tables is not None :
1188+ gc_tables = [table for table in tables if table .startswith ("gc." )]
1189+ tables_str = "," .join (f"'{ table } '" for table in gc_tables )
1190+ where_stmt = f"t IN ({ tables_str } )"
1191+ else :
1192+ where_stmt = "t LIKE 'gc.%%'"
1193+
11671194 try :
11681195 async with self .conn_factory () as conn :
11691196 async with conn .cursor (timeout = 120 ) as cursor :
1170- if tables is not None :
1171- gc_tables = [t for t in tables if t .startswith ("gc." )]
1172- tables_str = "," .join (f"'{ table } '" for table in gc_tables )
1173- where_stmt = f"t IN ({ tables_str } )"
1174- else :
1175- where_stmt = "t LIKE 'gc.%%'"
1176-
11771197 await cursor .execute (
11781198 "WITH tables AS ("
11791199 " SELECT unnest(tables) AS t "
@@ -1183,25 +1203,53 @@ async def _remove_duplicated_tables(self, tables: Optional[List[str]] = None):
11831203 f"SELECT * FROM tables WHERE { where_stmt } ;" ,
11841204 (self .repository , self .snapshot ),
11851205 )
1186- tables = await cursor .fetchall ()
1187- self .gc_tables = [table [0 ] for table in tables ] if tables else []
1206+ snapshot_gc_tables = await cursor .fetchall ()
11881207
1189- for table in self .gc_tables :
1190- self .logger .info (f"Renaming GC table: { table } to { table } _temp" )
1208+ if snapshot_gc_tables :
1209+ await cursor .execute ('SHOW TABLES FROM "gc";' )
1210+ existing_gc_tables = await cursor .fetchall ()
1211+
1212+ if existing_gc_tables :
1213+ existing_gc_tables = [
1214+ table [0 ] for table in existing_gc_tables
1215+ ]
1216+ for (table ,) in snapshot_gc_tables :
1217+ if table in existing_gc_tables :
1218+ self .gc_tables .append (table )
1219+
1220+ except DatabaseError as e :
1221+ self .logger .warning (
1222+ "DatabaseError in RestoreInternalTables.set_gc_tables" ,
1223+ exc_info = e ,
1224+ )
1225+ raise kopf .PermanentError ("internal tables couldn't be retrieved." )
1226+
1227+ async def _rename_table (self , cursor , old_name : str , new_name : str ):
1228+ self .logger .info (f"Renaming GC table: { old_name } to { new_name } " )
1229+ try :
1230+ await cursor .execute (f"ALTER TABLE { old_name } RENAME TO { new_name } ;" )
1231+ except UndefinedTable :
1232+ self .logger .warning (f"Table { old_name } does not exist. Skipping." )
1233+ pass
1234+
1235+ async def rename_duplicated_tables (self ):
1236+ """
1237+ If the snapshot contains grand central tables, rename them if they exist
1238+ in the cluster in order to recreate the new ones from the snapshot.
1239+ """
1240+ if not self .gc_tables :
1241+ return
1242+
1243+ try :
1244+ async with self .conn_factory () as conn :
1245+ async with conn .cursor (timeout = 120 ) as cursor :
1246+ for table in self .gc_tables .copy ():
11911247 table_name = quote_table (table , cursor )
11921248 temp_table_name = table_without_schema (f"{ table } _temp" , cursor )
1193- try :
1194- await cursor .execute (
1195- f"ALTER TABLE { table_name } RENAME TO { temp_table_name } ;"
1196- )
1197- except UndefinedTable :
1198- self .logger .warning (
1199- f"Table { table } does not exist. Skipping."
1200- )
1201- pass
1249+ await self ._rename_table (cursor , table_name , temp_table_name )
12021250 except DatabaseError as e :
12031251 self .logger .warning (
1204- "DatabaseError in RestoreInternalTables._remove_duplicated_tables " ,
1252+ "DatabaseError in RestoreInternalTables.rename_duplicated_tables " ,
12051253 exc_info = e ,
12061254 )
12071255 raise kopf .PermanentError ("internal tables couldn't be renamed." )
@@ -1211,25 +1259,17 @@ async def restore_tables(self):
12111259 If the restore operation failed, rename back the gc tables
12121260 to their original names.
12131261 """
1214- if self .gc_tables_renamed is False :
1262+ if not self .gc_tables :
12151263 return
12161264
12171265 try :
12181266 async with self .conn_factory () as conn :
12191267 async with conn .cursor (timeout = 120 ) as cursor :
12201268 for table in self .gc_tables :
1221- self .logger .info (f"Renaming GC table: { table } _temp to { table } " )
12221269 table_name = table_without_schema (table , cursor )
12231270 temp_table_name = quote_table (f"{ table } _temp" , cursor )
1224- try :
1225- await cursor .execute (
1226- f"ALTER TABLE { temp_table_name } RENAME TO { table_name } ;"
1227- )
1228- except UndefinedTable :
1229- self .logger .warning (
1230- f"Table { temp_table_name } does not exist. Skipping."
1231- )
1232- pass
1271+ await self ._rename_table (cursor , temp_table_name , table_name )
1272+
12331273 except DatabaseError as e :
12341274 self .logger .warning (
12351275 "DatabaseError in RestoreInternalTables.restore_tables" , exc_info = e
@@ -1240,15 +1280,15 @@ async def cleanup_tables(self):
12401280 """
12411281 After a successful restore, the temporary renamed gc tables can be dropped.
12421282 """
1243- if self .gc_tables_renamed is False :
1283+ if not self .gc_tables :
12441284 return
12451285
12461286 try :
12471287 async with self .conn_factory () as conn :
12481288 async with conn .cursor (timeout = 120 ) as cursor :
12491289 for table in self .gc_tables :
1250- self .logger .info (f"Dropping old GC table: { table } _temp" )
12511290 temp_table_name = quote_table (f"{ table } _temp" , cursor )
1291+ self .logger .info (f"Dropping old GC table: { temp_table_name } " )
12521292 try :
12531293 await cursor .execute (f"DROP TABLE { temp_table_name } ;" )
12541294 except UndefinedTable :
0 commit comments