11from abc import abstractmethod
22from collections import defaultdict , deque , namedtuple
3- from typing import Deque , Dict , Iterable , List , Optional , Set , Tuple , Union , cast
3+ from typing import Deque , Dict , Iterable , Optional , Sequence , Set , Tuple , Union , cast
44
55from pydantic .error_wrappers import ValidationError
66
99 BigMapHandlerConfig ,
1010 BigMapIndexConfig ,
1111 ContractConfig ,
12+ HeadHandlerConfig ,
1213 HeadIndexConfig ,
1314 OperationHandlerConfig ,
1415 OperationHandlerOriginationPatternConfig ,
3233SingleLevelRollback = namedtuple ('SingleLevelRollback' , ('level' ))
3334Operations = Tuple [OperationData , ...]
3435OperationQueueItemT = Union [Operations , SingleLevelRollback ]
36+ OperationHandlerArgumentT = Optional [Union [Transaction , Origination , OperationData ]]
37+ MatchedOperationsT = Tuple [OperationSubgroup , OperationHandlerConfig , Deque [OperationHandlerArgumentT ]]
38+ MatchedBigMapsT = Tuple [BigMapHandlerConfig , BigMapDiff ]
3539
3640# NOTE: For initializing the index state on startup
3741block_cache : Dict [int , BlockData ] = {}
3842
3943
4044class Index :
45+ """Base class for index implementations
46+
47+ Provides common interface for managing index state and switching between sync and realtime modes.
48+ """
49+
4150 _queue : Deque
4251
4352 def __init__ (self , ctx : DipDupContext , config : ResolvedIndexConfigT , datasource : TzktDatasource ) -> None :
@@ -256,10 +265,18 @@ async def _process_level_operations(self, operations: Tuple[OperationData, ...])
256265 elif level < self .state .level :
257266 raise RuntimeError (f'Level of operation batch must be higher than index state level: { level } < { self .state .level } ' )
258267
268+ self ._logger .info ('Processing %s operations of level %s' , len (operations ), level )
269+ matched_subgroups = await self ._match_operations (operations )
270+
271+ # NOTE: We still need to bump index level but don't care if it will be done in existing transaction
272+ if not matched_subgroups :
273+ await self .state .update_status (level = level )
274+ return
275+
259276 async with in_global_transaction ():
260- self . _logger . info ( 'Processing %s operations of level %s' , len ( operations ), level )
261- await self ._process_operations ( operations )
262- await self .state .update_status (self . state . status , level )
277+ for operation_subgroup , handler_config , args in matched_subgroups :
278+ await self ._call_matched_handler ( handler_config , operation_subgroup , args )
279+ await self .state .update_status (level = level )
263280
264281 async def _match_operation (self , pattern_config : OperationHandlerPatternConfigT , operation : OperationData ) -> bool :
265282 """Match single operation with pattern"""
@@ -294,9 +311,10 @@ async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT,
294311 else :
295312 raise NotImplementedError
296313
297- async def _process_operations (self , operations : Iterable [OperationData ]) -> None :
314+ async def _match_operations (self , operations : Iterable [OperationData ]) -> Deque [ MatchedOperationsT ] :
298315 """Try to match operations in cache with all patterns from indexes. Must be wrapped in transaction."""
299- self ._head_hashes = set ()
316+ self ._head_hashes .clear ()
317+ matched_subgroups : Deque [MatchedOperationsT ] = deque ()
300318 operation_subgroups : Dict [OperationSubgroup , Deque [OperationData ]] = defaultdict (deque )
301319 for operation in operations :
302320 key = OperationSubgroup (operation .hash , operation .counter )
@@ -311,7 +329,7 @@ async def _process_operations(self, operations: Iterable[OperationData]) -> None
311329 pattern_idx = 0
312330 matched_operations : Deque [Optional [OperationData ]] = deque ()
313331
314- # TODO: Ensure complex cases work, for ex. required argument after optional one
332+ # TODO: Ensure complex cases work, e.g. when optional argument is followed by required one
315333 # TODO: Add None to matched_operations where applicable (pattern is optional and operation not found)
316334 while operation_idx < len (operations ):
317335 operation , pattern_config = operations [operation_idx ], handler_config .pattern [pattern_idx ]
@@ -335,26 +353,29 @@ async def _process_operations(self, operations: Iterable[OperationData]) -> None
335353 operation_idx += 1
336354
337355 if pattern_idx == len (handler_config .pattern ):
338- await self ._on_match (operation_subgroup , handler_config , matched_operations )
356+ self ._logger .info ('%s: `%s` handler matched!' , operation_subgroup .hash , handler_config .callback )
357+
358+ args = await self ._prepare_handler_args (handler_config , matched_operations )
359+ matched_subgroups .append ((operation_subgroup , handler_config , args ))
339360
340361 matched_operations .clear ()
341362 pattern_idx = 0
342363
343364 if len (matched_operations ) >= sum (map (lambda x : 0 if x .optional else 1 , handler_config .pattern )):
344- await self ._on_match (operation_subgroup , handler_config , matched_operations )
365+ self ._logger .info ('%s: `%s` handler matched!' , operation_subgroup .hash , handler_config .callback )
366+
367+ args = await self ._prepare_handler_args (handler_config , matched_operations )
368+ matched_subgroups .append ((operation_subgroup , handler_config , args ))
369+
370+ return matched_subgroups
345371
346- async def _on_match (
372+ async def _prepare_handler_args (
347373 self ,
348- operation_subgroup : OperationSubgroup ,
349374 handler_config : OperationHandlerConfig ,
350375 matched_operations : Deque [Optional [OperationData ]],
351- ):
352- """Prepare handler arguments, parse parameter and storage. Schedule callback in executor."""
353- self ._logger .info ('%s: `%s` handler matched!' , operation_subgroup .hash , handler_config .callback )
354- if not handler_config .parent :
355- raise ConfigInitializationException
356-
357- args : List [Optional [Union [Transaction , Origination , OperationData ]]] = []
376+ ) -> Deque [OperationHandlerArgumentT ]:
377+ """Prepare handler arguments, parse parameter and storage."""
378+ args : Deque [OperationHandlerArgumentT ] = deque ()
358379 for pattern_config , operation in zip (handler_config .pattern , matched_operations ):
359380 if operation is None :
360381 args .append (None )
@@ -393,6 +414,14 @@ async def _on_match(
393414 else :
394415 raise NotImplementedError
395416
417+ return args
418+
419+ async def _call_matched_handler (
420+ self , handler_config : OperationHandlerConfig , operation_subgroup : OperationSubgroup , args : Sequence [OperationHandlerArgumentT ]
421+ ) -> None :
422+ if not handler_config .parent :
423+ raise ConfigInitializationException
424+
396425 await self ._ctx .fire_handler (
397426 handler_config .callback ,
398427 handler_config .parent .name ,
@@ -477,15 +506,25 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
477506 await self ._exit_sync_state (last_level )
478507
479508 async def _process_level_big_maps (self , big_maps : Tuple [BigMapData , ...]):
509+ if not big_maps :
510+ return
480511 level = self ._extract_level (big_maps )
481512
482513 # NOTE: le operator because single level rollbacks are not supported
483514 if level <= self .state .level :
484515 raise RuntimeError (f'Level of big map batch must be higher than index state level: { level } <= { self .state .level } ' )
485516
517+ self ._logger .info ('Processing %s big map diffs of level %s' , len (big_maps ), level )
518+ matched_big_maps = await self ._match_big_maps (big_maps )
519+
520+ # NOTE: We still need to bump index level but don't care if it will be done in existing transaction
521+ if not matched_big_maps :
522+ await self .state .update_status (level = level )
523+ return
524+
486525 async with in_global_transaction ():
487- self . _logger . info ( 'Processing %s big map diffs of level %s' , len ( big_maps ), level )
488- await self ._process_big_maps ( big_maps )
526+ for handler_config , big_map_diff in matched_big_maps :
527+ await self ._call_matched_handler ( handler_config , big_map_diff )
489528 await self .state .update_status (level = level )
490529
491530 async def _match_big_map (self , handler_config : BigMapHandlerConfig , big_map : BigMapData ) -> bool :
@@ -496,11 +535,11 @@ async def _match_big_map(self, handler_config: BigMapHandlerConfig, big_map: Big
496535 return False
497536 return True
498537
499- async def _on_match (
538+ async def _prepare_handler_args (
500539 self ,
501540 handler_config : BigMapHandlerConfig ,
502541 matched_big_map : BigMapData ,
503- ) -> None :
542+ ) -> BigMapDiff :
504543 """Prepare handler arguments, parse key and value. Schedule callback in executor."""
505544 self ._logger .info ('%s: `%s` handler matched!' , matched_big_map .operation_id , handler_config .callback )
506545 if not handler_config .parent :
@@ -524,13 +563,30 @@ async def _on_match(
524563 else :
525564 value = None
526565
527- big_map_diff = BigMapDiff ( # type: ignore
566+ return BigMapDiff (
528567 data = matched_big_map ,
529568 action = matched_big_map .action ,
530569 key = key ,
531570 value = value ,
532571 )
533572
573+ async def _match_big_maps (self , big_maps : Iterable [BigMapData ]) -> Deque [MatchedBigMapsT ]:
574+ """Try to match big map diffs in cache with all patterns from indexes."""
575+ matched_big_maps : Deque [MatchedBigMapsT ] = deque ()
576+
577+ for big_map in big_maps :
578+ for handler_config in self ._config .handlers :
579+ big_map_matched = await self ._match_big_map (handler_config , big_map )
580+ if big_map_matched :
581+ arg = await self ._prepare_handler_args (handler_config , big_map )
582+ matched_big_maps .append ((handler_config , arg ))
583+
584+ return matched_big_maps
585+
586+ async def _call_matched_handler (self , handler_config : BigMapHandlerConfig , big_map_diff : BigMapDiff ) -> None :
587+ if not handler_config .parent :
588+ raise ConfigInitializationException
589+
534590 await self ._ctx .fire_handler (
535591 handler_config .callback ,
536592 handler_config .parent .name ,
@@ -540,15 +596,6 @@ async def _on_match(
540596 big_map_diff ,
541597 )
542598
543- async def _process_big_maps (self , big_maps : Iterable [BigMapData ]) -> None :
544- """Try to match big map diffs in cache with all patterns from indexes."""
545-
546- for big_map in big_maps :
547- for handler_config in self ._config .handlers :
548- big_map_matched = await self ._match_big_map (handler_config , big_map )
549- if big_map_matched :
550- await self ._on_match (handler_config , big_map )
551-
552599 async def _get_big_map_addresses (self ) -> Set [str ]:
553600 """Get addresses to fetch big map diffs from during initial synchronization"""
554601 addresses = set ()
@@ -587,10 +634,20 @@ async def _process_queue(self) -> None:
587634 async with in_global_transaction ():
588635 self ._logger .info ('Processing head info of level %s' , level )
589636 for handler_config in self ._config .handlers :
590- if not handler_config .parent :
591- raise ConfigInitializationException
592- await self ._ctx .fire_handler (handler_config .callback , handler_config .parent .name , self .datasource , head .hash , head )
637+ await self ._call_matched_handler (handler_config , head )
593638 await self .state .update_status (level = level )
594639
640+ async def _call_matched_handler (self , handler_config : HeadHandlerConfig , head : HeadBlockData ) -> None :
641+ if not handler_config .parent :
642+ raise ConfigInitializationException
643+
644+ await self ._ctx .fire_handler (
645+ handler_config .callback ,
646+ handler_config .parent .name ,
647+ self .datasource ,
648+ head .hash ,
649+ (head ,),
650+ )
651+
595652 def push_head (self , head : HeadBlockData ) -> None :
596653 self ._queue .append (head )
0 commit comments