@@ -73,6 +73,24 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
7373 async def _process_queue (self ) -> None :
7474 ...
7575
76+ async def _enter_sync_state (self , last_level : int ) -> Optional [int ]:
77+ state = await self .get_state ()
78+ first_level = state .level
79+ if first_level == last_level :
80+ return None
81+ if first_level > last_level :
82+ raise RuntimeError (f'Attempt to synchronize index from level { first_level } to level { last_level } ' )
83+ self ._logger .info ('Synchronizing index to level %s' , last_level )
84+ state .hash = None # type: ignore
85+ await state .save ()
86+ return first_level
87+
88+ async def _exit_sync_state (self , last_level : int ) -> None :
89+ self ._logger .info ('Index is synchronized to level %s' , last_level )
90+ state = await self .get_state ()
91+ state .level = last_level # type: ignore
92+ await state .save ()
93+
7694 async def _initialize_index_state (self ) -> None :
7795 self ._logger .info ('Getting index state' )
7896 index_config_hash = self ._config .hash ()
@@ -94,15 +112,12 @@ async def _initialize_index_state(self) -> None:
94112 await self ._ctx .reindex ()
95113
96114 self ._logger .info ('%s' , f'{ state .level = } { state .hash = } ' .replace ('state.' , '' ))
97- # NOTE: No need to check genesis block
98- if state .level :
115+ # NOTE: No need to check indexes which are not synchronized.
116+ if state .level and state . hash :
99117 block = await self ._datasource .get_block (state .level )
100- if state .hash :
101- if state .hash != block .hash :
102- self ._logger .warning ('Block hash mismatch (missed rollback while dipdup was stopped), reindexing' )
103- await self ._ctx .reindex ()
104- else :
105- state .hash = block .hash # type: ignore
118+ if state .hash != block .hash :
119+ self ._logger .warning ('Block hash mismatch (missed rollback while dipdup was stopped), reindexing' )
120+ await self ._ctx .reindex ()
106121
107122 await state .save ()
108123 self ._state = state
@@ -137,15 +152,11 @@ async def _process_queue(self) -> None:
137152
138153 async def _synchronize (self , last_level : int , cache : bool = False ) -> None :
139154 """Fetch operations via Fetcher and pass to message callback"""
140- state = await self .get_state ()
141- first_level = state .level
142- if first_level == last_level :
155+ first_level = await self ._enter_sync_state (last_level )
156+ if first_level is None :
143157 return
144- if first_level > last_level :
145- raise RuntimeError (first_level , last_level )
146158
147159 self ._logger .info ('Fetching operations from level %s to %s' , first_level , last_level )
148-
149160 transaction_addresses = await self ._get_transaction_addresses ()
150161 origination_addresses = await self ._get_origination_addresses ()
151162
@@ -169,11 +180,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
169180 async for level , operations in fetcher .fetch_operations_by_level ():
170181 await self ._process_level_operations (level , operations )
171182
172- state .level = last_level # type: ignore
173- # FIXME: Block hashes are not available during synchronization
174- state .hash = (await self ._datasource .get_block (last_level )).hash # type: ignore
175- await state .save ()
176- self ._logger .info ('Index is synchronized to level %s' , last_level )
183+ await self ._exit_sync_state (last_level )
177184
178185 async def _process_level_operations (self , level : int , operations : List [OperationData ], block : Optional [HeadBlockData ] = None ) -> None :
179186 state = await self .get_state ()
@@ -416,14 +423,11 @@ async def _process_queue(self):
416423
417424 async def _synchronize (self , last_level : int , cache : bool = False ) -> None :
418425 """Fetch operations via Fetcher and pass to message callback"""
419- state = await self .get_state ()
420- first_level = state .level
421- if first_level == last_level :
426+ first_level = await self ._enter_sync_state (last_level )
427+ if first_level is None :
422428 return
423- if first_level > last_level :
424- raise RuntimeError (first_level , last_level )
425429
426- self ._logger .info ('Fetching operations from level %s to %s' , first_level , last_level )
430+ self ._logger .info ('Fetching big map diffs from level %s to %s' , first_level , last_level )
427431
428432 big_map_addresses = await self ._get_big_map_addresses ()
429433 big_map_paths = await self ._get_big_map_paths ()
@@ -440,11 +444,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
440444 async for level , big_maps in fetcher .fetch_big_maps_by_level ():
441445 await self ._process_level_big_maps (level , big_maps )
442446
443- state .level = last_level # type: ignore
444- # FIXME: Block hashes are not available during synchronization
445- state .hash = (await self ._datasource .get_block (last_level )).hash # type: ignore
446- await state .save ()
447- self ._logger .info ('Index is synchronized to level %s' , last_level )
447+ await self ._exit_sync_state (last_level )
448448
449449 async def _process_level_big_maps (self , level : int , big_maps : List [BigMapData ]):
450450 state = await self .get_state ()
0 commit comments