@@ -147,6 +147,7 @@ async def sync_index(self, data: MCSyncData):
147147 status = CollIndexStatus (** data .parent .get ("status" , {}))
148148
149149 coll_id = spec .id
150+ oid = spec .oid
150151 redis_name = f"redis-coll-{ coll_id } "
151152 new_children = []
152153
@@ -159,7 +160,7 @@ async def sync_index(self, data: MCSyncData):
159160 if data .finalizing :
160161 is_done = False
161162 if status .state == "saved" and status .index .savedAt :
162- await self .set_state ("idle" , status , coll_id )
163+ await self .set_state ("idle" , status , coll_id , oid )
163164 is_done = True
164165 elif status .state == "idle" and status .index .notFound :
165166 is_done = True
@@ -168,7 +169,7 @@ async def sync_index(self, data: MCSyncData):
168169 is_done = True
169170 else :
170171 try :
171- coll = await self .coll_ops .get_collection_raw (spec . id , spec . oid )
172+ coll = await self .coll_ops .get_collection_raw (coll_id , oid )
172173 # if index state is not set, index has been deleted
173174 # also delete immediately
174175 if not coll .get ("indexState" ):
@@ -184,7 +185,7 @@ async def sync_index(self, data: MCSyncData):
184185 is_done = True
185186
186187 if is_done :
187- print (f"CollIndex removed: { spec .id } " )
188+ print (f"CollIndex removed: { coll_id .id } " )
188189 return {
189190 "status" : status .dict (),
190191 "children" : [],
@@ -195,19 +196,19 @@ async def sync_index(self, data: MCSyncData):
195196 # determine if index was previously saved before initing redis
196197 if not redis_pod :
197198 if not status .indexLastSavedAt :
198- res = await self .coll_ops .get_dedupe_index_saved (spec . id )
199+ res = await self .coll_ops .get_dedupe_index_saved (coll_id )
199200 if res :
200201 status .indexLastSavedAt = date_to_str (res )
201202
202203 if self .is_expired (status ) or data .finalizing :
203204 # do actual deletion here
204205 if not data .finalizing :
205- self .run_task (self .do_delete (spec . id ))
206+ self .run_task (self .do_delete (coll_id ))
206207
207208 # Saving process
208209 # 1. run bgsave while redis is active
209210 if status .index .running :
210- await self .do_save_redis (spec . id , status )
211+ await self .do_save_redis (coll_id , oid , status )
211212
212213 elif status .index .finished and not status .index .savedAt :
213214 await self .k8s .send_signal_to_pod (redis_name , "SIGUSR1" , "save" )
@@ -217,7 +218,7 @@ async def sync_index(self, data: MCSyncData):
217218 await self .mark_index_saved (redis_name , spec , status )
218219
219220 else :
220- await self .update_state (data , spec . id , status )
221+ await self .update_state (data , coll_id , oid , status )
221222
222223 # pylint: disable=broad-exception-caught
223224 except Exception as e :
@@ -289,7 +290,9 @@ def sync_redis_pod_status(self, pod, status: CollIndexStatus):
289290 except :
290291 pass
291292
292- async def update_state (self , data , coll_id : UUID , status : CollIndexStatus ):
293+ async def update_state (
294+ self , data , coll_id : UUID , oid : UUID , status : CollIndexStatus
295+ ):
293296 """update state"""
294297 desired_state = status .state
295298 if not status .index .loaded :
@@ -317,7 +320,7 @@ async def update_state(self, data, coll_id: UUID, status: CollIndexStatus):
317320 desired_state = "initing"
318321
319322 if desired_state != status .state :
320- await self .set_state (desired_state , status , coll_id )
323+ await self .set_state (desired_state , status , coll_id , oid )
321324
322325 def is_expired (self , status : CollIndexStatus ):
323326 """return true if collindex is considered expired and should be deleted"""
@@ -343,21 +346,30 @@ def is_last_active_exceeds(
343346 return False
344347
345348 async def set_state (
346- self , state : TYPE_DEDUPE_INDEX_STATES , status : CollIndexStatus , coll_id : UUID
349+ self ,
350+ state : TYPE_DEDUPE_INDEX_STATES ,
351+ status : CollIndexStatus ,
352+ coll_id : UUID ,
353+ oid : UUID ,
347354 ):
348355 """set state after updating db"""
349356 print (f"Setting coll index state { status .state } -> { state } { coll_id } " )
350357 status .state = state
351358 status .lastStateChangeAt = date_to_str (dt_now ())
352359
353- await self .coll_ops .update_dedupe_index_info (coll_id , state , if_exists = True )
360+ # self.run_task(self.coll_ops.update_dedupe_index_info(
361+ # coll_id, oid, state, if_exists=True
362+ # ))
363+ await self .coll_ops .update_dedupe_index_info (
364+ coll_id , oid , state , if_exists = True
365+ )
354366
355367 async def do_delete (self , coll_id : UUID ):
356368 """delete the CollIndex object"""
357369 print (f"Deleting collindex { coll_id } " )
358370 await self .k8s .delete_custom_object (f"collindex-{ coll_id } " , "collindexes" )
359371
360- async def do_save_redis (self , coll_id : UUID , status : CollIndexStatus ):
372+ async def do_save_redis (self , coll_id : UUID , oid : UUID , status : CollIndexStatus ):
361373 """shutdown save redis"""
362374 try :
363375 redis = await self .k8s .get_redis_connected (f"coll-{ coll_id } " )
@@ -367,14 +379,14 @@ async def do_save_redis(self, coll_id: UUID, status: CollIndexStatus):
367379 if status .state not in ("saving" , "saved" ):
368380 await redis .bgsave (False )
369381
370- await self .set_state ("saving" , status , coll_id )
382+ await self .set_state ("saving" , status , coll_id , oid )
371383
372384 if await self .is_bgsave_done (redis ):
373385 await redis .shutdown ()
374386
375387 # pylint: disable=broad-exception-caught
376388 except Exception :
377- await self .set_state ("ready" , status , coll_id )
389+ await self .set_state ("ready" , status , coll_id , oid )
378390 traceback .print_exc ()
379391
380392 async def is_bgsave_done (self , redis : Redis ) -> bool :
@@ -575,16 +587,6 @@ async def update_saved_dedupe_index_state_in_db(
575587 storage = org .storage ,
576588 )
577589
578- prev_coll = await self .coll_ops .update_dedupe_index_info (
579- coll_id , "idle" , index_file , finished_at
580- )
581-
582- # Update org storage totals with size of index file, or difference
583- # from previous saved index file if one existed
584- size_diff = size
585- if prev_coll and prev_coll .indexFile :
586- size_diff = size - prev_coll .indexFile .size
587-
588- await self .coll_ops .orgs .inc_org_bytes_stored_field (
589- oid , "bytesStoredDedupeIndexes" , size_diff
590+ await self .coll_ops .update_dedupe_index_info (
591+ coll_id , oid , "idle" , index_file , finished_at
590592 )
0 commit comments