@@ -147,6 +147,7 @@ async def sync_index(self, data: MCSyncData):
147147 status = CollIndexStatus (** data .parent .get ("status" , {}))
148148
149149 coll_id = spec .id
150+ oid = spec .oid
150151 redis_name = f"redis-coll-{ coll_id } "
151152 new_children = []
152153
@@ -159,7 +160,7 @@ async def sync_index(self, data: MCSyncData):
159160 if data .finalizing :
160161 is_done = False
161162 if status .state == "saved" and status .index .savedAt :
162- await self .set_state ("idle" , status , coll_id )
163+ await self .set_state ("idle" , status , coll_id , oid )
163164 is_done = True
164165 elif status .state == "idle" and status .index .notFound :
165166 is_done = True
@@ -168,33 +169,33 @@ async def sync_index(self, data: MCSyncData):
168169 is_done = True
169170 else :
170171 try :
171- await self .coll_ops .get_collection_raw (spec . id , spec . oid )
172+ await self .coll_ops .get_collection_raw (coll_id , oid )
172173 # pylint: disable=bare-except
173174 except :
174175 # collection not found, delete index
175176 is_done = True
176177
177178 if is_done :
178- print (f"CollIndex removed: { spec . id } " )
179+ print (f"CollIndex removed: { coll_id } " )
179180 return {"status" : status .dict (), "children" : [], "finalized" : True }
180181
181182 try :
182183 # determine if index was previously saved before initing redis
183184 if not redis_pod :
184185 if not status .indexLastSavedAt :
185- res = await self .coll_ops .get_dedupe_index_saved (spec . id )
186+ res = await self .coll_ops .get_dedupe_index_saved (coll_id )
186187 if res :
187188 status .indexLastSavedAt = date_to_str (res )
188189
189190 if self .is_expired (status ) or data .finalizing :
190191 # do actual deletion here
191192 if not data .finalizing :
192- self .run_task (self .do_delete (spec . id ))
193+ self .run_task (self .do_delete (coll_id ))
193194
194195 # Saving process
195196 # 1. run bgsave while redis is active
196197 if status .index .running :
197- await self .do_save_redis (spec . id , status )
198+ await self .do_save_redis (coll_id , oid , status )
198199
199200 elif status .index .finished and not status .index .savedAt :
200201 await self .k8s .send_signal_to_pod (redis_name , "SIGUSR1" , "save" )
@@ -204,7 +205,7 @@ async def sync_index(self, data: MCSyncData):
204205 await self .mark_index_saved (redis_name , spec , status )
205206
206207 else :
207- await self .update_state (data , spec . id , status )
208+ await self .update_state (data , coll_id , oid , status )
208209
209210 # pylint: disable=broad-exception-caught
210211 except Exception as e :
@@ -276,7 +277,9 @@ def sync_redis_pod_status(self, pod, status: CollIndexStatus):
276277 except :
277278 pass
278279
279- async def update_state (self , data , coll_id : UUID , status : CollIndexStatus ):
280+ async def update_state (
281+ self , data , coll_id : UUID , oid : UUID , status : CollIndexStatus
282+ ):
280283 """update state"""
281284 desired_state = status .state
282285 if not status .index .loaded :
@@ -304,7 +307,7 @@ async def update_state(self, data, coll_id: UUID, status: CollIndexStatus):
304307 desired_state = "initing"
305308
306309 if desired_state != status .state :
307- await self .set_state (desired_state , status , coll_id )
310+ await self .set_state (desired_state , status , coll_id , oid )
308311
309312 def is_expired (self , status : CollIndexStatus ):
310313 """return true if collindex is considered expired and should be deleted"""
@@ -330,22 +333,26 @@ def is_last_active_exceeds(
330333 return False
331334
332335 async def set_state (
333- self , state : TYPE_DEDUPE_INDEX_STATES , status : CollIndexStatus , coll_id : UUID
336+ self ,
337+ state : TYPE_DEDUPE_INDEX_STATES ,
338+ status : CollIndexStatus ,
339+ coll_id : UUID ,
340+ oid : UUID ,
334341 ):
335342 """set state after updating db"""
336343 print (f"Setting coll index state { status .state } -> { state } { coll_id } " )
337344 status .state = state
338345 status .lastStateChangeAt = date_to_str (dt_now ())
339346
340- # self.run_task(self.coll_ops.update_dedupe_index_info(coll_id, state))
341- await self .coll_ops .update_dedupe_index_info (coll_id , state )
347+ # self.run_task(self.coll_ops.update_dedupe_index_info(coll_id, oid, state))
348+ await self .coll_ops .update_dedupe_index_info (coll_id , oid , state )
342349
343350 async def do_delete (self , coll_id : UUID ):
344351 """delete the CollIndex object"""
345352 print (f"Deleting collindex { coll_id } " )
346353 await self .k8s .delete_custom_object (f"collindex-{ coll_id } " , "collindexes" )
347354
348- async def do_save_redis (self , coll_id : UUID , status : CollIndexStatus ):
355+ async def do_save_redis (self , coll_id : UUID , oid : UUID , status : CollIndexStatus ):
349356 """shutdown save redis"""
350357 try :
351358 redis = await self .k8s .get_redis_connected (f"coll-{ coll_id } " )
@@ -355,14 +362,14 @@ async def do_save_redis(self, coll_id: UUID, status: CollIndexStatus):
355362 if status .state not in ("saving" , "saved" ):
356363 await redis .bgsave (False )
357364
358- await self .set_state ("saving" , status , coll_id )
365+ await self .set_state ("saving" , status , coll_id , oid )
359366
360367 if await self .is_bgsave_done (redis ):
361368 await redis .shutdown ()
362369
363370 # pylint: disable=broad-exception-caught
364371 except Exception :
365- await self .set_state ("ready" , status , coll_id )
372+ await self .set_state ("ready" , status , coll_id , oid )
366373 traceback .print_exc ()
367374
368375 async def is_bgsave_done (self , redis : Redis ) -> bool :
@@ -563,16 +570,6 @@ async def update_saved_dedupe_index_state_in_db(
563570 storage = org .storage ,
564571 )
565572
566- prev_coll = await self .coll_ops .update_dedupe_index_info (
567- coll_id , "idle" , index_file , finished_at
568- )
569-
570- # Update org storage totals with size of index file, or difference
571- # from previous saved index file if one existed
572- size_diff = size
573- if prev_coll and prev_coll .indexFile :
574- size_diff = size - prev_coll .indexFile .size
575-
576- await self .coll_ops .orgs .inc_org_bytes_stored_field (
577- oid , "bytesStoredDedupeIndexes" , size_diff
573+ await self .coll_ops .update_dedupe_index_info (
574+ coll_id , oid , "idle" , index_file , finished_at
578575 )
0 commit comments