2323from .baseoperator import BaseOperator
2424
2525
26+ # ============================================================================
27+ class IndexPodState (BaseModel ):
28+ """redis pod status"""
29+
30+ notFound : bool = False
31+ running : bool = False
32+ finished : bool = False
33+ loaded : bool = False
34+ savedAt : str = ""
35+
36+
2637# ============================================================================
2738class CollIndexStatus (BaseModel ):
2839 """CollIndex Status"""
@@ -37,6 +48,9 @@ class CollIndexStatus(BaseModel):
3748
3849 finishedAt : str = ""
3950
51+ # redis pod states
52+ index : IndexPodState = IndexPodState ()
53+
4054
4155# ============================================================================
4256class CollIndexSpec (BaseModel ):
@@ -107,25 +121,25 @@ async def sync_index(self, data: MCSyncData):
107121 spec = CollIndexSpec (** data .parent .get ("spec" , {}))
108122 status = CollIndexStatus (** data .parent .get ("status" , {}))
109123
110- coll_id = str ( spec .id )
111- redis_name = "redis-coll-" + coll_id
124+ coll_id = spec .id
125+ redis_name = f "redis-coll-{ coll_id } "
112126 new_children = []
113127
114128 redis_pod = data .children [POD ].get (redis_name )
115129
116130 # check if redis should be skipped, eg. no pod active or complete
117- skip_redis = self .skip_redis (redis_pod )
131+ self .sync_redis_pod_status (redis_pod , status )
118132
119133 # allow deletion only if idle
120134 if data .finalizing :
121135 is_done = False
122- if status .state in ( "idle" , "saving" ) and not redis_pod :
123- # likely reentrant call still set to saving, just switch to idle
124- if status . state == "saving" :
125- status .state = "idle"
136+ if status .state == "saved" and status . index . savedAt :
137+ await self . set_state ( "idle" , status , coll_id )
138+ is_done = True
139+ elif status .state == "idle" and status . index . notFound :
126140 is_done = True
127141 # never inited, just remove
128- elif status .state == "initing" and skip_redis :
142+ elif status .state == "initing" and status . index . notFound :
129143 is_done = True
130144 else :
131145 try :
@@ -154,15 +168,18 @@ async def sync_index(self, data: MCSyncData):
154168
155169 # Saving process
156170 # 1. run bgsave while redis is active
157- if not skip_redis :
158- await self .do_redis_save (spec .id , status )
171+ if status .index .running :
172+ await self .do_save_redis (spec .id , status )
173+
174+ elif status .index .finished and not status .index .savedAt :
175+ await self .k8s .send_signal_to_pod (redis_name , "SIGUSR1" , "save" )
159176
160177 # 2. once redis has shutdown, check if fully finished
161- else :
162- await self .check_redis_saved (redis_name , redis_pod , spec , status )
178+ elif status . index . savedAt and status . index . savedAt != status . finishedAt :
179+ await self .mark_index_saved (redis_name , spec , status )
163180
164181 else :
165- await self .update_state (skip_redis , data , spec .id , status )
182+ await self .update_state (data , spec .id , status )
166183
167184 # pylint: disable=broad-exception-caught
168185 except Exception as e :
@@ -187,32 +204,57 @@ async def sync_index(self, data: MCSyncData):
187204 "resyncAfterSeconds" : resync_after ,
188205 }
189206
190- def skip_redis (self , pod ):
207+ def sync_redis_pod_status (self , pod , status : CollIndexStatus ):
191208 """skip redis if no pod or redis container exited"""
192209 if not pod :
193- return True
210+ status .index = IndexPodState (notFound = True )
211+ return
194212
195- if pod [ "status" ]. get ( "phase" ) != "Running" :
196- return True
213+ index = status . index
214+ index . running = pod [ "status" ]. get ( "phase" ) == "Running"
197215
216+ terminated = None
198217 try :
199- if (
200- pod ["status" ]["containerStatuses" ][0 ]["state" ]["terminated" ]["reason" ]
201- == "Completed"
202- ):
203- return True
218+ terminated = pod ["status" ]["containerStatuses" ][0 ]["state" ].get (
219+ "terminated"
220+ )
221+ if terminated :
222+ index .running = False
223+ if terminated .get ("reason" ) == "Completed" :
224+ index .finished = True
204225 # pylint: disable=bare-except
205226 except :
206227 pass
207228
208- return False
229+ # redis pod likely running
230+ if "initContainerStatuses" not in pod ["status" ]:
231+ index .loaded = True
209232
210- async def update_state (
211- self , skip_redis : bool , data , coll_id : UUID , status : CollIndexStatus
212- ):
233+ else :
234+ try :
235+ index .loaded = (
236+ pod ["status" ]["initContainerStatuses" ][0 ]["state" ]["terminated" ][
237+ "reason"
238+ ]
239+ == "Completed"
240+ )
241+ # pylint: disable=bare-except
242+ except :
243+ pass
244+
245+ if pod ["status" ].get ("phase" ) == "Succeeded" :
246+ try :
247+ index .savedAt = pod ["status" ]["containerStatuses" ][1 ]["state" ][
248+ "terminated"
249+ ]["finishedAt" ]
250+ # pylint: disable=bare-except
251+ except :
252+ pass
253+
254+ async def update_state (self , data , coll_id : UUID , status : CollIndexStatus ):
213255 """update state"""
214256 desired_state = status .state
215- if skip_redis :
257+ if not status . index . loaded :
216258 desired_state = "initing"
217259
218260 # first, handle any import or purge jobs
@@ -231,7 +273,7 @@ async def update_state(
231273 desired_state = "ready"
232274
233275 # update stats if redis is available
234- if not skip_redis :
276+ if status . index . running :
235277 await self .update_stats_from_redis (status , coll_id )
236278
237279 if desired_state != status .state :
@@ -245,7 +287,7 @@ def is_expired(self, status: CollIndexStatus):
245287 if self .is_last_active_exceeds (status , self .idle_expire_time ):
246288 return True
247289
248- if status .state == "saving" :
290+ if status .state in ( "saving" , "saved" ) :
249291 return True
250292
251293 return False
@@ -271,19 +313,19 @@ async def set_state(
271313 # self.run_task(self.coll_ops.update_dedupe_index_info(coll_id, state))
272314 await self .coll_ops .update_dedupe_index_info (coll_id , state )
273315
274- async def do_delete (self , index_id : UUID ):
316+ async def do_delete (self , coll_id : UUID ):
275317 """delete the CollIndex object"""
276- print (f"Deleting collindex { index_id } " )
277- await self .k8s .delete_custom_object (f"collindex-{ index_id } " , "collindexes" )
318+ print (f"Deleting collindex { coll_id } " )
319+ await self .k8s .delete_custom_object (f"collindex-{ coll_id } " , "collindexes" )
278320
279- async def do_redis_save (self , coll_id : UUID , status : CollIndexStatus ):
321+ async def do_save_redis (self , coll_id : UUID , status : CollIndexStatus ):
280322 """shutdown save redis"""
281323 try :
282- redis = await self .k8s .get_redis_connected ("coll-" + str ( coll_id ) )
324+ redis = await self .k8s .get_redis_connected (f "coll-{ coll_id } " )
283325 if not redis :
284326 return
285327
286- if status .state != "saving" :
328+ if status .state not in ( "saving" , "saved" ) :
287329 await redis .bgsave (False )
288330
289331 await self .set_state ("saving" , status , coll_id )
@@ -360,7 +402,7 @@ def get_related(self, data: MCBaseRequest):
360402
361403 async def load_redis (
362404 self ,
363- index_id : str ,
405+ coll_id : UUID ,
364406 name : str ,
365407 spec : CollIndexSpec ,
366408 status : CollIndexStatus ,
@@ -369,7 +411,7 @@ async def load_redis(
369411 params = {}
370412 params .update (self .shared_params )
371413 params ["name" ] = name
372- params ["id" ] = index_id
414+ params ["id" ] = str ( coll_id )
373415 params ["init_redis" ] = True
374416
375417 params ["load_dump" ] = bool (status .indexLastSavedAt )
@@ -399,40 +441,25 @@ def get_index_storage_filename(self, coll_id: UUID, org: Organization):
399441 storage_path = org .storage .get_storage_extra_path (str (org .id ))
400442 return storage_path + f"dedupe-index/{ coll_id } "
401443
402- async def check_redis_saved (
444+ async def mark_index_saved (
403445 self ,
404446 redis_name : str ,
405- redis_pod ,
406447 spec : CollIndexSpec ,
407448 status : CollIndexStatus ,
408449 ):
409450 """create sync job to save redis index data to s3 storage"""
410451
411- if redis_pod and redis_pod ["status" ].get ("phase" ) == "Succeeded" :
412- finished_at = None
413- finished_at_str = ""
414- try :
415- finished_at_str = redis_pod ["status" ]["initContainerStatuses" ][0 ][
416- "state"
417- ]["terminated" ]["finishedAt" ]
418- # pylint: disable=bare-except
419- except :
420- pass
452+ # update state immediately to speed up cleanup
453+ print (f"Setting coll index state { status .state } -> saved { spec .id } " )
454+ status .state = "saved"
421455
422- # update state immediately to speed up cleanup
423- print (f"Setting coll index state { status .state } -> idle { spec .id } " )
424- status .state = "idle"
456+ finished_at = str_to_date (status .index .savedAt )
425457
426- if finished_at_str :
427- if status .finishedAt == finished_at_str :
428- return
429- finished_at = str_to_date (finished_at_str )
430-
431- await self .update_saved_dedupe_index_state_in_db (
432- spec .id , spec .oid , redis_name , finished_at or dt_now ()
433- )
458+ await self .update_saved_dedupe_index_state_in_db (
459+ spec .id , spec .oid , redis_name , finished_at or dt_now ()
460+ )
434461
435- status .finishedAt = finished_at_str
462+ status .finishedAt = status . index . savedAt
436463
437464 async def update_saved_dedupe_index_state_in_db (
438465 self , coll_id : UUID , oid : UUID , pod_name : str , finished_at : datetime .datetime
0 commit comments