Skip to content

Commit b532936

Browse files
Job scheduler bugfixes and refactoring (#351)
* Job scheduler bugfixes and refactoring * Cleanup * Add missing typehints
1 parent bcdcbe3 commit b532936

File tree

5 files changed

+104
-87
lines changed

5 files changed

+104
-87
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
* tzkt: Added `originated_contract_tzips` field to `OperationData`.
88

9+
### Fixed
10+
11+
* jobs: Fixed jobs with `daemon` schedule never start.
12+
* jobs: Fixed failed jobs not throwing exceptions into the main loop.
13+
914
## 5.1.1 - 2022-05-13
1015

1116
### Fixed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ extended_default_ignore = []
9090

9191
[tool.flakehell.plugins]
9292
pyflakes = ["+*"]
93-
"flake8-*" = ["+*"]
93+
"flake8-*" = ["+*", "-C417"]
9494
flake8-docstrings = ["-*"]
9595
flake8-simplify = ["-SIM106"]
9696

src/dipdup/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ def __init__(self, package: str) -> None:
382382
self._hooks: Dict[str, HookConfig] = {}
383383

384384
async def run(self) -> None:
385+
self._logger.debug('Starting CallbackManager loop')
385386
while True:
386387
async with slowdown(1):
387388
while pending_hooks:

src/dipdup/dipdup.py

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from asyncio import gather
88
from collections import deque
99
from contextlib import AsyncExitStack
10-
from contextlib import asynccontextmanager
1110
from contextlib import suppress
1211
from typing import Awaitable
1312
from typing import Deque
@@ -16,7 +15,6 @@
1615
from typing import Set
1716
from typing import Tuple
1817

19-
from apscheduler.events import EVENT_JOB_ERROR # type: ignore
2018
from prometheus_client import start_http_server # type: ignore
2119
from tortoise.exceptions import OperationalError
2220
from tortoise.transactions import get_connection
@@ -59,8 +57,7 @@
5957
from dipdup.models import OperationData
6058
from dipdup.models import Schema
6159
from dipdup.prometheus import Metrics
62-
from dipdup.scheduler import add_job
63-
from dipdup.scheduler import create_scheduler
60+
from dipdup.scheduler import SchedulerManager
6461
from dipdup.utils import is_importable
6562
from dipdup.utils import slowdown
6663
from dipdup.utils.database import generate_schema
@@ -367,7 +364,6 @@ def __init__(self, config: DipDupConfig) -> None:
367364
datasources=self._datasources,
368365
callbacks=self._callbacks,
369366
)
370-
self._scheduler = create_scheduler(self._config.advanced.scheduler)
371367
self._codegen = DipDupCodeGenerator(self._config, self._datasources_by_config)
372368
self._schema: Optional[Schema] = None
373369

@@ -492,19 +488,19 @@ async def _set_up_database(self, stack: AsyncExitStack) -> None:
492488
async def _set_up_hooks(self, tasks: Optional[Set[Task]] = None) -> None:
493489
for hook_config in default_hooks.values():
494490
try:
495-
self._ctx.callbacks.register_hook(hook_config)
491+
self._callbacks.register_hook(hook_config)
496492
except ProjectImportError:
497493
if hook_config.callback in ('on_rollback', 'on_index_rollback'):
498-
self._logger.info(f'Hook {hook_config.callback} is not available')
494+
self._logger.info(f'Hook `{hook_config.callback}` is not available')
499495
else:
500496
raise
501497

502498
for hook_config in self._config.hooks.values():
503-
self._ctx.callbacks.register_hook(hook_config)
499+
self._callbacks.register_hook(hook_config)
504500

505501
# FIXME: Why does `is not None` check break oneshot mode?
506502
if tasks:
507-
tasks.add(create_task(self._ctx.callbacks.run()))
503+
tasks.add(create_task(self._callbacks.run()))
508504

509505
async def _set_up_prometheus(self) -> None:
510506
if self._config.prometheus:
@@ -596,41 +592,14 @@ async def _event_wrapper():
596592
return event # noqa: R504
597593

598594
async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> Event:
599-
job_failed = Event()
595+
# NOTE: Prepare SchedulerManager
600596
event = Event()
601-
exception: Optional[Exception] = None
597+
scheduler = SchedulerManager(self._config.advanced.scheduler)
598+
run_task = create_task(scheduler.run(event))
599+
tasks.add(run_task)
602600

603-
@asynccontextmanager
604-
async def _context():
605-
try:
606-
self._scheduler.start()
607-
yield
608-
finally:
609-
self._scheduler.shutdown()
610-
611-
def _error_hook(event) -> None:
612-
nonlocal job_failed, exception
613-
exception = event.exception
614-
job_failed.set()
615-
616-
async def _watchdog() -> None:
617-
nonlocal job_failed
618-
await job_failed.wait()
619-
if not isinstance(exception, Exception):
620-
raise RuntimeError
621-
raise exception
622-
623-
async def _event_wrapper():
624-
self._logger.info('Waiting for an event to start scheduler')
625-
await event.wait()
601+
# NOTE: Register jobs
602+
for job_config in self._config.jobs.values():
603+
scheduler.add_job(self._ctx, job_config)
626604

627-
self._logger.info('Starting scheduler')
628-
self._scheduler.add_listener(_error_hook, EVENT_JOB_ERROR)
629-
await stack.enter_async_context(_context())
630-
tasks.add(create_task(_watchdog()))
631-
632-
for job_config in self._config.jobs.values():
633-
add_job(self._ctx, self._scheduler, job_config)
634-
635-
tasks.add(create_task(_event_wrapper()))
636605
return event # noqa: R504

src/dipdup/scheduler.py

Lines changed: 85 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import asyncio
22
import json
3-
from datetime import datetime
3+
import logging
44
from functools import partial
55
from typing import Any
66
from typing import Dict
77
from typing import Optional
8+
from typing import Set
89

10+
from apscheduler.events import EVENT_JOB_ERROR # type: ignore
11+
from apscheduler.events import EVENT_JOB_EXECUTED
12+
from apscheduler.events import JobEvent # type: ignore
13+
from apscheduler.job import Job # type: ignore
914
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore
1015
from apscheduler.triggers.cron import CronTrigger # type: ignore
1116
from apscheduler.triggers.interval import IntervalTrigger # type: ignore
12-
from apscheduler.util import undefined # type: ignore
1317

1418
from dipdup.config import JobConfig
1519
from dipdup.context import DipDupContext
@@ -24,57 +28,95 @@
2428
}
2529

2630

27-
def create_scheduler(config: Optional[Dict[str, Any]] = None) -> AsyncIOScheduler:
28-
if not config:
29-
return AsyncIOScheduler(DEFAULT_CONFIG)
30-
31+
def _verify_config(config: Dict[str, Any]) -> None:
32+
"""Ensure that dict is a valid `apscheduler` config"""
3133
json_config = json.dumps(config)
3234
if 'apscheduler.executors.pool' in json_config:
3335
raise ConfigurationError('`apscheduler.executors.pool` is not supported. If needed, create a pool inside a regular hook.')
3436
for key in config:
3537
if not key.startswith('apscheduler.'):
3638
raise ConfigurationError('`advanced.scheduler` config keys must start with `apscheduler.`, see apscheduler library docs')
37-
return AsyncIOScheduler(config)
3839

3940

40-
def add_job(ctx: DipDupContext, scheduler: AsyncIOScheduler, job_config: JobConfig) -> None:
41-
hook_config = job_config.hook_config
41+
class SchedulerManager:
42+
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
43+
if config:
44+
_verify_config(config)
45+
self._logger = logging.getLogger('dipdup.jobs')
46+
self._scheduler = AsyncIOScheduler(config or DEFAULT_CONFIG)
47+
self._scheduler.add_listener(self._on_error, EVENT_JOB_ERROR)
48+
self._scheduler.add_listener(self._on_executed, EVENT_JOB_EXECUTED)
49+
self._exception: Optional[Exception] = None
50+
self._exception_event: asyncio.Event = asyncio.Event()
51+
self._daemons: Set[str] = set()
4252

43-
async def _job_wrapper(ctx: DipDupContext, *args, **kwargs) -> None:
44-
nonlocal job_config, hook_config
45-
job_ctx = HookContext(
46-
config=ctx.config,
47-
datasources=ctx.datasources,
48-
callbacks=ctx.callbacks,
49-
logger=logger,
50-
hook_config=hook_config,
51-
)
53+
async def run(self, event: asyncio.Event) -> None:
54+
self._logger.info('Waiting for an event to start scheduler')
55+
await event.wait()
56+
57+
self._logger.info('Starting scheduler')
5258
try:
53-
await job_ctx.fire_hook(hook_config.callback, *args, **kwargs)
59+
self._scheduler.start()
60+
await self._exception_event.wait()
61+
if self._exception is None:
62+
raise RuntimeError('Job has failed but exception is not set')
63+
raise self._exception
5464
except asyncio.CancelledError:
5565
pass
66+
finally:
67+
self._scheduler.shutdown()
68+
69+
def add_job(self, ctx: DipDupContext, job_config: JobConfig) -> Job:
70+
if job_config.daemon:
71+
self._daemons.add(job_config.name)
72+
73+
hook_config = job_config.hook_config
74+
75+
logger = FormattedLogger(
76+
name=f'dipdup.hooks.{hook_config.callback}',
77+
fmt=job_config.name + ': {}',
78+
)
79+
80+
async def _job_wrapper(ctx: DipDupContext, *args, **kwargs) -> None:
81+
nonlocal job_config, hook_config
82+
job_ctx = HookContext(
83+
config=ctx.config,
84+
datasources=ctx.datasources,
85+
callbacks=ctx.callbacks,
86+
logger=logger,
87+
hook_config=hook_config,
88+
)
89+
try:
90+
await job_ctx.fire_hook(
91+
hook_config.callback,
92+
*args,
93+
**kwargs,
94+
)
95+
except asyncio.CancelledError:
96+
pass
97+
98+
if job_config.crontab:
99+
trigger = CronTrigger.from_crontab(job_config.crontab)
100+
elif job_config.interval:
101+
trigger = IntervalTrigger(seconds=job_config.interval)
102+
elif job_config.daemon:
103+
trigger = None
56104
else:
57-
if job_config.daemon:
58-
raise ConfigurationError('Daemon jobs are intended to run forever')
59-
60-
logger = FormattedLogger(
61-
name=f'dipdup.hooks.{hook_config.callback}',
62-
fmt=job_config.name + ': {}',
63-
)
64-
if job_config.crontab:
65-
trigger, next_run_time = CronTrigger.from_crontab(job_config.crontab), undefined
66-
elif job_config.interval:
67-
trigger, next_run_time = IntervalTrigger(seconds=job_config.interval), undefined
68-
elif job_config.daemon:
69-
trigger, next_run_time = None, datetime.now()
70-
else:
71-
raise RuntimeError
72-
73-
scheduler.add_job(
74-
func=partial(_job_wrapper, ctx=ctx),
75-
id=job_config.name,
76-
name=job_config.name,
77-
trigger=trigger,
78-
next_run_time=next_run_time,
79-
kwargs=job_config.args,
80-
)
105+
raise RuntimeError
106+
107+
return self._scheduler.add_job(
108+
func=partial(_job_wrapper, ctx=ctx),
109+
id=job_config.name,
110+
name=job_config.name,
111+
trigger=trigger,
112+
kwargs=job_config.args,
113+
)
114+
115+
def _on_error(self, event: JobEvent) -> None:
116+
self._exception = event.exception
117+
self._exception_event.set()
118+
119+
def _on_executed(self, event: JobEvent) -> None:
120+
if event.job_id in self._daemons:
121+
event.exception = ConfigurationError('Daemon jobs are intended to run forever')
122+
self._on_error(event)

0 commit comments

Comments
 (0)