33from collections import defaultdict
44from collections .abc import Awaitable
55from collections .abc import Callable
6+ from dataclasses import dataclass
7+ from dataclasses import field
68from typing import Any
79from uuid import uuid4
810
4345RollbackCallback = Callable [['IndexDatasource' , MessageType , int , int ], Awaitable [None ]]
4446
4547
48+ @dataclass
49+ class NodeHead :
50+ event : asyncio .Event = field (default_factory = asyncio .Event )
51+ hash : str | None = None
52+ timestamp : int | None = None
53+
54+
4655class EvmNodeDatasource (IndexDatasource [EvmNodeDatasourceConfig ]):
4756 # TODO: Make dynamic
4857 _default_http_config = HttpConfig (ratelimit_sleep = 30 )
@@ -54,8 +63,7 @@ def __init__(self, config: EvmNodeDatasourceConfig, merge_subscriptions: bool =
5463 self ._realtime : asyncio .Event = asyncio .Event ()
5564 self ._requests : dict [str , tuple [asyncio .Event , Any ]] = {}
5665 self ._subscription_ids : dict [str , EvmNodeSubscription ] = {}
57- self ._head_hashes : dict [int , str ] = {}
58- self ._head_events : defaultdict [int , asyncio .Event ] = defaultdict (asyncio .Event )
66+ self ._heads : defaultdict [int , NodeHead ] = defaultdict (NodeHead )
5967
6068 self ._on_connected_callbacks : set [EmptyCallback ] = set ()
6169 self ._on_disconnected_callbacks : set [EmptyCallback ] = set ()
@@ -267,28 +275,30 @@ async def _on_message(self, message: Message) -> None:
267275 raise DatasourceError (f'Unknown message: { data } ' , self .name )
268276
269277 async def _handle_subscription (self , subscription : EvmNodeSubscription , data : Any ) -> None :
270- msg_type = MessageType ()
271-
272278 if isinstance (subscription , EvmNodeNewHeadsSubscription ):
273279 head = EvmNodeHeadData .from_json (data )
274280 level = int (head .number , 16 )
275281
276- known_hash = self ._head_hashes .get (level , None )
277- current_level = max (self ._head_hashes .keys () or (level ,))
278- self ._head_hashes [level ] = head .hash
279- if known_hash != head .hash :
280- await self .emit_rollback (msg_type , from_level = current_level , to_level = level - 1 )
282+ known_hash = self ._heads [level ].hash
283+ if known_hash not in (head .hash , None ):
284+ await self .emit_rollback (
285+ MessageType (),
286+ from_level = max (self ._heads .keys () or (level ,)),
287+ to_level = level - 1 ,
288+ )
281289
290+ self ._heads [level ].hash = head .hash
291+ self ._heads [level ].timestamp = int (head .timestamp , 16 )
282292 await self .emit_head (head )
283- self ._head_events [level ].set ()
293+ self ._heads [level ]. event .set ()
284294
285295 elif isinstance (subscription , EvmNodeLogsSubscription ):
286- # FIXME: Take from the head instead.
287- timestamp = int (time .time ())
296+ level = int (data ['blockNumber' ], 16 )
297+ await self ._heads [level ].event .wait ()
298+ timestamp = self ._heads [level ].timestamp
299+ if timestamp is None :
300+ raise FrameworkException ('Head cached but timestamp is None' )
288301 logs = EvmNodeLogData .from_json (data , timestamp )
289- level = int (logs .block_number , 16 )
290-
291- await self ._head_events [level ].wait ()
292302 await self .emit_logs (logs )
293303
294304 elif isinstance (subscription , EvmNodeSyncingData ):
0 commit comments