Skip to content

Commit ddb6bcd

Browse files
Defer starting scheduler until all indexes are in sync (#135)
1 parent 5e0dc09 commit ddb6bcd

File tree

1 file changed

+43
-30
lines changed

1 file changed

+43
-30
lines changed

src/dipdup/dipdup.py

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ def __init__(self, ctx: DipDupContext) -> None:
4646
self._contracts: Set[ContractConfig] = set()
4747
self._stopped: bool = False
4848

49-
async def run(self, spawn_datasources_event: Optional[Event]) -> None:
49+
async def run(
50+
self,
51+
spawn_datasources_event: Optional[Event],
52+
start_scheduler_event: Optional[Event],
53+
) -> None:
5054
self._logger.info('Starting index dispatcher')
5155
await self._subscribe_to_datasource_events()
5256
await self._set_datasource_heads()
@@ -57,34 +61,28 @@ async def run(self, spawn_datasources_event: Optional[Event]) -> None:
5761
async with slowdown(1.0):
5862
await gather(*tasks)
5963

60-
await self._check_states()
61-
6264
indexes_spawned = False
6365
with suppress(IndexError):
6466
while index := pending_indexes.popleft():
6567
self._indexes[index._config.name] = index
6668
indexes_spawned = True
6769
if not indexes_spawned:
68-
await self._check_states()
70+
if self._every_index_is(IndexStatus.ONESHOT):
71+
self.stop()
6972

70-
if spawn_datasources_event:
73+
if spawn_datasources_event and not spawn_datasources_event.is_set():
7174
spawn_datasources_event.set()
7275

76+
if start_scheduler_event and not start_scheduler_event.is_set():
77+
if self._every_index_is(IndexStatus.REALTIME):
78+
start_scheduler_event.set()
79+
7380
def stop(self) -> None:
7481
self._stopped = True
7582

76-
async def _check_states(self) -> None:
83+
def _every_index_is(self, status: IndexStatus) -> bool:
7784
statuses = [i.state.status for i in self._indexes.values()]
78-
79-
def _every_index_is(status: IndexStatus) -> bool:
80-
nonlocal statuses
81-
return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses)))
82-
83-
# TODO: `on_synchronized` hook? Not sure if we need it.
84-
# if _every_index_is(IndexStatus.REALTIME): ...
85-
86-
if _every_index_is(IndexStatus.ONESHOT):
87-
self.stop()
85+
return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses)))
8886

8987
async def _fetch_contracts(self) -> None:
9088
"""Add contracts spawned from context to config"""
@@ -218,13 +216,13 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
218216

219217
spawn_datasources_event: Optional[Event] = None
220218
if not oneshot:
221-
await self._set_up_scheduler(stack, tasks)
219+
start_scheduler_event = await self._set_up_scheduler(stack, tasks)
222220
spawn_datasources_event = await self._spawn_datasources(tasks)
223221

224222
for name in self._config.indexes:
225223
await self._ctx._spawn_index(name)
226224

227-
await self._set_up_index_dispatcher(tasks, spawn_datasources_event)
225+
await self._set_up_index_dispatcher(tasks, spawn_datasources_event, start_scheduler_event)
228226

229227
await gather(*tasks)
230228

@@ -327,30 +325,38 @@ async def _set_up_datasources(self, stack: AsyncExitStack) -> None:
327325
for datasource in self._datasources.values():
328326
await stack.enter_async_context(datasource)
329327

330-
async def _set_up_index_dispatcher(self, tasks: Set[Task], spawn_datasources_event: Optional[Event]) -> None:
328+
async def _set_up_index_dispatcher(
329+
self,
330+
tasks: Set[Task],
331+
spawn_datasources_event: Optional[Event],
332+
start_scheduler_event: Optional[Event],
333+
) -> None:
331334
index_dispatcher = IndexDispatcher(self._ctx)
332-
tasks.add(create_task(index_dispatcher.run(spawn_datasources_event)))
335+
tasks.add(create_task(index_dispatcher.run(spawn_datasources_event, start_scheduler_event)))
333336

334337
async def _spawn_datasources(self, tasks: Set[Task]) -> Event:
335338
event = Event()
336339

337-
async def _wrapper():
338-
self._logger.info('Waiting for IndexDispatcher to spawn datasources')
340+
async def _event_wrapper():
341+
self._logger.info('Waiting for an event to spawn datasources')
339342
await event.wait()
340343
self._logger.info('Spawning datasources')
344+
341345
_tasks = [create_task(d.run()) for d in self._datasources.values()]
342346
await gather(*_tasks)
343347

344-
tasks.add(create_task(_wrapper()))
348+
tasks.add(create_task(_event_wrapper()))
345349
return event
346350

347-
async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> None:
351+
async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> Event:
348352
job_failed = Event()
353+
event = Event()
349354
exception: Optional[Exception] = None
350355

351356
@asynccontextmanager
352357
async def _context():
353358
try:
359+
self._scheduler.start()
354360
yield
355361
finally:
356362
self._scheduler.shutdown()
@@ -365,11 +371,18 @@ async def _watchdog() -> None:
365371
await job_failed.wait()
366372
raise exception # type: ignore
367373

368-
await stack.enter_async_context(_context())
369-
tasks.add(create_task(_watchdog()))
374+
async def _event_wrapper():
375+
self._logger.info('Waiting for an event to start scheduler')
376+
await event.wait()
377+
self._logger.info('Starting scheduler')
378+
379+
tasks.add(create_task(_watchdog()))
370380

371-
for job_config in self._config.jobs.values():
372-
add_job(self._ctx, self._scheduler, job_config)
381+
for job_config in self._config.jobs.values():
382+
add_job(self._ctx, self._scheduler, job_config)
373383

374-
self._scheduler.add_listener(_hook, EVENT_JOB_ERROR)
375-
self._scheduler.start()
384+
self._scheduler.add_listener(_hook, EVENT_JOB_ERROR)
385+
await stack.enter_async_context(_context())
386+
387+
tasks.add(create_task(_event_wrapper()))
388+
return event

0 commit comments

Comments
 (0)