@@ -266,7 +266,7 @@ def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource:
266266 self ._queue : Deque [OperationQueueItemT ] = deque ()
267267 self ._contract_hashes : Dict [str , Tuple [int , int ]] = {}
268268 self ._rollback_level : Optional [int ] = None
269- self ._head_hashes : Set [str ] = set ()
269+ self ._head_hashes : Dict [str , bool ] = {}
270270 self ._migration_originations : Optional [Dict [str , OperationData ]] = None
271271
272272 def push_operations (self , operation_subgroups : Tuple [OperationSubgroup , ...]) -> None :
@@ -375,12 +375,15 @@ async def _process_level_operations(self, operation_subgroups: Tuple[OperationSu
375375 levels_repr = ', ' .join (f'{ k } ={ v } ' for k , v in levels .items ())
376376 raise RuntimeError (f'Index is in a rollback state, but received operation batch with different levels: { levels_repr } ' )
377377
378- self ._logger .info ('Rolling back to previous level, verifying processed operations' )
379- received_hashes = {s .hash for s in operation_subgroups }
380- new_hashes = received_hashes - self ._head_hashes
381- missing_hashes = self ._head_hashes - received_hashes
378+ self ._logger .info ('Rolling back to the previous level, verifying processed operations' )
379+ old_head_hashes = set (self ._head_hashes )
380+ old_head_matched_hashes = {k for k , v in self ._head_hashes .items () if v }
381+ new_head_hashes = {s .hash for s in operation_subgroups }
382+ unprocessed_hashes = new_head_hashes - old_head_hashes
383+ # NOTE: We can ignore subgroups that don't match any handlers
384+ missing_hashes = old_head_matched_hashes - new_head_hashes
382385
383- self ._logger .info ('Comparing hashes: %s new, %s missing' , len (new_hashes ), len (missing_hashes ))
386+ self ._logger .info ('Comparing hashes: %s new, %s missing' , len (unprocessed_hashes ), len (missing_hashes ))
384387 if missing_hashes :
385388 self ._logger .info ('Some operations were backtracked, requesting reindexing' )
386389 await self ._ctx .reindex (
@@ -394,16 +397,17 @@ async def _process_level_operations(self, operation_subgroups: Tuple[OperationSu
394397 self ._rollback_level = None
395398 self ._head_hashes .clear ()
396399
397- operation_subgroups = tuple (filter (lambda subgroup : subgroup .hash in new_hashes , operation_subgroups ))
400+ operation_subgroups = tuple (filter (lambda subgroup : subgroup .hash in unprocessed_hashes , operation_subgroups ))
398401
399402 elif level < self .state .level :
400403 raise RuntimeError (f'Level of operation batch must be higher than index state level: { level } < { self .state .level } ' )
401404
402405 self ._logger .debug ('Processing %s operation subgroups of level %s' , len (operation_subgroups ), level )
403406 matched_handlers : Deque [MatchedOperationsT ] = deque ()
404407 for operation_subgroup in operation_subgroups :
405- self ._head_hashes .add (operation_subgroup .hash )
406- matched_handlers += await self ._match_operation_subgroup (operation_subgroup )
408+ subgroup_matched_handlers = await self ._match_operation_subgroup (operation_subgroup )
409+ matched_handlers += subgroup_matched_handlers
410+ self ._head_hashes [operation_subgroup .hash ] = bool (subgroup_matched_handlers )
407411
408412 if Metrics .enabled :
409413 Metrics .set_index_handlers_matched (len (matched_handlers ))
0 commit comments