1919from typing import cast
2020
2121from apscheduler .events import EVENT_JOB_ERROR # type: ignore
22+ from prometheus_client import start_http_server # type: ignore
2223from tortoise .exceptions import OperationalError
2324from tortoise .transactions import get_connection
2425
4647from dipdup .index import HeadIndex
4748from dipdup .index import Index
4849from dipdup .index import OperationIndex
49- from dipdup .index import block_cache
5050from dipdup .index import extract_operation_subgroups
51- from dipdup .index import head_cache
5251from dipdup .models import BigMapData
5352from dipdup .models import Contract
5453from dipdup .models import Head
5756from dipdup .models import IndexStatus
5857from dipdup .models import OperationData
5958from dipdup .models import Schema
59+ from dipdup .prometheus import Metrics
6060from dipdup .scheduler import add_job
6161from dipdup .scheduler import create_scheduler
6262from dipdup .utils import slowdown
@@ -74,18 +74,18 @@ def __init__(self, ctx: DipDupContext) -> None:
7474 self ._logger = logging .getLogger ('dipdup' )
7575 self ._indexes : Dict [str , Index ] = {}
7676 self ._contracts : Set [ContractConfig ] = set ()
77- self ._stopped : bool = False
7877 self ._tasks : Deque [asyncio .Task ] = deque ()
7978
8079 self ._entrypoint_filter : Set [Optional [str ]] = set ()
8180 self ._address_filter : Set [str ] = set ()
8281
83- async def run (
84- self ,
85- spawn_datasources_event : Event ,
86- start_scheduler_event : Event ,
87- early_realtime : bool = False ,
88- ) -> None :
82+ async def run (self , spawn_datasources_event : Event , start_scheduler_event : Event , early_realtime : bool = False ) -> None :
83+ tasks = [self ._run (spawn_datasources_event , start_scheduler_event , early_realtime )]
84+ if self ._ctx .config .prometheus :
85+ tasks .append (self ._update_metrics (self ._ctx .config .prometheus .update_interval ))
86+ await gather (* tasks )
87+
88+ async def _run (self , spawn_datasources_event : Event , start_scheduler_event : Event , early_realtime : bool = False ) -> None :
8989 self ._logger .info ('Starting index dispatcher' )
9090 await self ._subscribe_to_datasource_events ()
9191 await self ._load_index_states ()
@@ -96,7 +96,7 @@ async def run(
9696 if isinstance (index , OperationIndex ):
9797 self ._apply_filters (index ._config )
9898
99- while not self . _stopped :
99+ while True :
100100 if not spawn_datasources_event .is_set ():
101101 if self ._every_index_is (IndexStatus .REALTIME ) or early_realtime :
102102 spawn_datasources_event .set ()
@@ -123,7 +123,7 @@ async def run(
123123 self ._apply_filters (index ._config )
124124
125125 if not indexes_spawned and self ._every_index_is (IndexStatus .ONESHOT ):
126- self . stop ()
126+ break
127127
128128 if self ._every_index_is (IndexStatus .REALTIME ) and not indexes_spawned :
129129 if not on_synchronized_fired :
@@ -136,8 +136,19 @@ async def run(
136136 else :
137137 on_synchronized_fired = False
138138
139- def stop (self ) -> None :
140- self ._stopped = True
139+ async def _update_metrics (self , update_interval : float ) -> None :
140+ while True :
141+ await asyncio .sleep (update_interval )
142+
143+ active , synced , realtime = 0 , 0 , 0
144+ for index in tuple (self ._indexes .values ()) + tuple (pending_indexes ):
145+ active += 1
146+ if index .synchronized :
147+ synced += 1
148+ if index .realtime :
149+ realtime += 1
150+
151+ Metrics .set_indexes_count (active , synced , realtime )
141152
142153 def _apply_filters (self , index_config : OperationIndexConfig ) -> None :
143154 self ._address_filter .update (index_config .address_filter )
@@ -215,10 +226,6 @@ async def _process(index_state: IndexState) -> None:
215226 tasks = (create_task (_process (index_state )) for index_state in await IndexState .all ())
216227 await gather (* tasks )
217228
218- # NOTE: Cached blocks used only on index state init
219- block_cache .clear ()
220- head_cache .clear ()
221-
222229 async def _on_head (self , datasource : TzktDatasource , head : HeadBlockData ) -> None :
223230 # NOTE: Do not await query results - blocked database connection may cause Websocket timeout.
224231 self ._tasks .append (
@@ -233,6 +240,8 @@ async def _on_head(self, datasource: TzktDatasource, head: HeadBlockData) -> Non
233240 ),
234241 )
235242 )
243+ if Metrics .enabled :
244+ Metrics .set_datasource_head_updated (datasource .name )
236245 for index in self ._indexes .values ():
237246 if isinstance (index , HeadIndex ) and index .datasource == datasource :
238247 index .push_head (head )
@@ -261,6 +270,8 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: Tuple[BigMapD
261270 async def _on_rollback (self , datasource : TzktDatasource , from_level : int , to_level : int ) -> None :
262271 """Perform a single level rollback when possible, otherwise call `on_rollback` hook"""
263272 self ._logger .warning ('Datasource `%s` rolled back: %s -> %s' , datasource .name , from_level , to_level )
273+ if Metrics .enabled :
274+ Metrics .set_datasource_rollback (datasource .name )
264275
265276 # NOTE: Zero difference between levels means we received no operations/big_maps on this level and thus channel level hasn't changed
266277 zero_level_rollback = from_level - to_level == 0
@@ -341,6 +352,7 @@ async def run(self) -> None:
341352 await self ._set_up_database (stack )
342353 await self ._set_up_datasources (stack )
343354 await self ._set_up_hooks (tasks )
355+ await self ._set_up_prometheus ()
344356
345357 await self ._initialize_schema ()
346358 await self ._initialize_datasources ()
@@ -446,6 +458,11 @@ async def _set_up_hooks(self, tasks: Optional[Set[Task]] = None) -> None:
446458 if tasks :
447459 tasks .add (create_task (self ._ctx .callbacks .run ()))
448460
461+ async def _set_up_prometheus (self ) -> None :
462+ if self ._config .prometheus :
463+ Metrics .enabled = True
464+ start_http_server (self ._config .prometheus .port , self ._config .prometheus .host )
465+
449466 async def _set_up_hasura (self , stack : AsyncExitStack ) -> None :
450467 if not self ._config .hasura :
451468 return
0 commit comments