Skip to content

Commit ae00e3c

Browse files
authored
Merge pull request #94 from theseriff/feature/add-sentinel-for-retry-control
Feature/add sentinel for retry control
2 parents 6660c90 + e517919 commit ae00e3c

File tree

15 files changed

+264
-44
lines changed

15 files changed

+264
-44
lines changed

.github/dependabot.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ updates:
88
github-actions:
99
patterns:
1010
- "*"
11-
target-branch: develop
11+
target-branch: main
1212
cooldown:
1313
default-days: 7
1414

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@ Jobify uses the native timer mechanisms of asyncio for efficient and precise tas
2424

2525
## Key Features
2626

27-
- [x] [**Async/Await**](https://theseriff.github.io/jobify/#why-jobify): Built from the ground up with `asyncio` in mind.
27+
- [x] [**Precision**](https://theseriff.github.io/jobify/#why-jobify): No polling! Uses native `asyncio` timers for sub-millisecond accuracy and zero idle CPU usage.
2828
- [x] [**Scheduling**](https://theseriff.github.io/jobify/schedule/): Run jobs immediately, with a delay, at a specified time, or using Cron expressions (second-level precision supported).
2929
- [x] [**Storage**](https://theseriff.github.io/jobify/app_settings/#storage): Built-in SQLite ensures scheduled jobs persist through application restarts.
3030
- [x] [**Routing**](https://theseriff.github.io/jobify/router/): Organize tasks with `JobRouter`, similar to FastAPI or Aiogram.
31-
- [x] [**Error Handling**](https://theseriff.github.io/jobify/advanced_usage/exception_handlers/): Comprehensive middleware for automatic retries, timeouts, and custom error handling.
31+
- [x] [**Inject Context**](https://theseriff.github.io/jobify/context/): Inject application state or custom dependencies directly into your tasks.
32+
- [x] [**Middlewares**](https://theseriff.github.io/jobify/app_settings/#middleware): Powerful interceptors for both job execution and the scheduling process.
33+
- [x] [**Exception Handlers**](https://theseriff.github.io/jobify/advanced_usage/exception_handlers/): Hierarchical error management at the task, router, or global level.
34+
- [x] [**Lifespan Support**](https://theseriff.github.io/jobify/app_settings/#lifespan): Manage startup and shutdown events, just like in FastAPI.
35+
- [x] [**Job Control**](https://theseriff.github.io/jobify/job/): Full control over jobs — wait for completion, cancel tasks, or check results with ease.
3236
- [x] [**Concurrency**:](https://theseriff.github.io/jobify/task_settings/#run_mode) Supports `asyncio`, `ThreadPoolExecutor`, and `ProcessPoolExecutor` for efficient task handling.
3337
- [ ] Distributed task queue. Soon.
3438
- [ ] Many different adapters to the database. Soon.

docs/advanced_usage/exception_handlers.md

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,43 @@ async def my_recovery_handler(exc: Exception, context: JobContext) -> str:
107107
@app.task(exception_handlers={ValueError: my_recovery_handler})
108108
async def my_task() -> None:
109109
raise ValueError("Oops!")
110+
```
111+
112+
### 3. Aborting Retries with NoResultError
113+
114+
Sometimes, an error can be fatal and retrying a task (even if the `retry` configuration is set) would be a waste of resources.
115+
In these cases, it is recommended to raise the `jobify.exceptions.NoResultError` exception.
116+
117+
- When this exception is raised, the job's status will be set to FAILED.
118+
- The `RetryMiddleware` component will catch this exception and stop all further retries.
119+
- No more retries will be attempted.
120+
121+
```python
122+
import asyncio
123+
124+
from jobify import JobContext, Jobify
125+
from jobify.exceptions import NoResultError
126+
127+
app = Jobify()
128+
129+
async def fatal_error_handler(exc: Exception, context: JobContext) -> None:
130+
print(f"Fatal error in job {context.job.id}: {exc}")
131+
# Signal that we should stop retries and fail the job immediately
132+
raise NoResultError
133+
134+
@app.task(retry=3, exception_handlers={ValueError: fatal_error_handler})
135+
async def my_task() -> None:
136+
raise ValueError("Corrupted data!")
137+
138+
async def main() -> None:
139+
async with app:
140+
job = await my_task.push()
141+
await job.wait()
142+
143+
print(job.status) # FAILED
144+
print(job.exception) # NoResultError
110145

111-
# ... after execution ...
112-
await job.wait()
113-
print(job.status) # SUCCESS
114-
print(job.result()) # "default_value"
146+
asyncio.run(main())
115147
```
116148

117149
## Example: Hierarchical Handling

docs/advanced_usage/system_time.md

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# System Time and Scheduling Trade-offs
2+
3+
One of the key architectural decisions in Jobify is to completely avoid
4+
**polling**. While this approach provides significant advantages in
5+
terms of performance and precision, it also introduces a specific
6+
behavior when the operating system's clock is adjusted.
7+
8+
## Polling vs. Native Timers
9+
10+
Most Python scheduling frameworks, such as APScheduler v3 and Celery,
11+
rely on a **polling loop**. Typically, this involves a continuous
12+
process like:
13+
14+
```python
15+
while True:
16+
sleep(1)
17+
check_scheduled_tasks()
18+
```
19+
20+
In this model, the scheduler repeatedly checks the current system time
21+
against a list of scheduled jobs.
22+
23+
**Jobify takes a different approach.** Instead of polling, it uses the
24+
`asyncio.loop.call_at` API. When a task is scheduled, Jobify calculates
25+
the exact moment the task should run and registers a timer directly in
26+
the event loop.
27+
28+
This allows the operating system's event notification mechanisms (such
29+
as epoll or kqueue) to wake the scheduler precisely when the task is
30+
due.
31+
32+
## Benefits of Jobify's Approach
33+
34+
This design provides several important advantages:
35+
36+
- **Zero Idle CPU Usage**:
37+
38+
> The scheduler does not run periodic checks.</br>
39+
> CPU resources are used only when a scheduled task actually needs to execute.
40+
41+
- **High Precision**:
42+
43+
> Tasks are triggered directly by the event loop's timer system,</br>
44+
> eliminating the timing jitter introduced by polling intervals.
45+
46+
- **High Scalability**:
47+
48+
> The asyncio event loop efficiently manages large numbers of timers,</br>
49+
> allowing Jobify to handle thousands of scheduled tasks with minimal overhead.
50+
51+
## The Trade-off: System Time Adjustments
52+
53+
The trade-off for this efficiency is how Jobify behaves when the
54+
**system clock (wall-clock time)** changes after a task has already been
55+
scheduled.
56+
57+
### Wall Clock vs Monotonic Time
58+
59+
Two different time concepts are involved:
60+
61+
#### Wall-clock time (`datetime.now()`)
62+
63+
Represents the system's current calendar time. This value may change due
64+
to:
65+
66+
- manual system time adjustments
67+
- NTP synchronization
68+
- daylight saving time transitions
69+
70+
#### Monotonic time (`time.monotonic()`)
71+
72+
A steadily increasing clock that is **not affected by system time
73+
changes**.\
74+
`asyncio` relies on a monotonic clock for its internal timing.
75+
76+
### What Happens Internally
77+
78+
Suppose a task is scheduled to run at **15:00**, and the current time is
79+
**14:50**.
80+
81+
Jobify performs the following steps:
82+
83+
1. Reads the current **wall-clock time** (14:50).
84+
85+
2. Computes the delay until the scheduled time (10 minutes / 600
86+
seconds).
87+
88+
3. Registers a timer with the event loop:
89+
90+
> Execute this task 600 seconds from now.
91+
92+
From this point forward, the timer is based entirely on **monotonic
93+
time**.
94+
95+
If the system clock is manually changed to **15:00** immediately after
96+
scheduling, the monotonic clock does **not** change. The event loop will
97+
still wait for the full 600 seconds before executing the task.
98+
99+
As a result, the task would run when the system clock shows **15:10**.
100+
101+
## Why This Design Was Chosen
102+
103+
This behavior is an **intentional design trade-off**.
104+
105+
By relying on native event loop timers instead of polling, Jobify gains:
106+
107+
- lower CPU overhead
108+
- higher scheduling precision
109+
- better scalability for large numbers of tasks
110+
111+
The downside is that timers already registered in the event loop do not
112+
automatically adjust when the system clock changes.
113+
114+
## Handling Time Changes
115+
116+
If the system time changes significantly while Jobify is running, simply
117+
**restart the application**.
118+
119+
Upon startup, Jobify will:
120+
121+
1. Read the current wall-clock time
122+
2. Recalculate delays for all scheduled tasks
123+
3. Register fresh timers with the event loop
124+
125+
This ensures that all tasks are scheduled correctly according to the
126+
updated system time.

docs/index.md

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,41 @@ similar to modern web frameworks like FastAPI.
66

77
## Key Features
88

9-
- **Async First**: Built on top of `asyncio`.
10-
- [**Flexible Scheduling**](schedule.md#dynamic-scheduling){ data-preview }: Run jobs immediately, after a delay, at a specific time, or via Cron expressions.
11-
- **Persistence**: Built-in SQLite storage ensures scheduled jobs survive application restarts.
12-
- [**Modular**](router.md){ data-preview }: Organize tasks using `JobRouter`s (similar to FastAPI routers).
13-
- **Resilient**: Middleware support for automatic retries, timeouts, and error handling.
14-
- [**Concurrency**](task_settings/#run_mode){ data-preview }: Support for `asyncio`, `ThreadPoolExecutor`, and `ProcessPoolExecutor`.
9+
- [**Precision**](#why-jobify){ data-preview }: No polling! Uses native `asyncio` timers for sub-millisecond accuracy and zero idle CPU usage.
10+
- [**Scheduling**](schedule.md){ data-preview }: Run jobs immediately, with a delay, at a specified time, or using Cron expressions (second-level precision supported).
11+
- [**Storage**](app_settings.md#storage){ data-preview }: Built-in SQLite ensures scheduled jobs persist through application restarts.
12+
- [**Routing**](router.md){ data-preview }: Organize tasks with `JobRouter`, similar to FastAPI or Aiogram.
13+
- [**Inject Context**](context.md){ data-preview }: Inject application state or custom dependencies directly into your tasks.
14+
- [**Middlewares**](app_settings.md#middleware){ data-preview }: Powerful interceptors for both job execution and the scheduling process.
15+
- [**Exception Handlers**](advanced_usage/exception_handlers.md){ data-preview }: Hierarchical error management at the task, router, or global level.
16+
- [**Lifespan Support**](app_settings.md#lifespan){ data-preview }: Manage startup and shutdown events, just like in FastAPI.
17+
- [**Job Control**](job.md){ data-preview }: Full control over jobs — wait for completion, cancel tasks, or check results with ease.
18+
- [**Concurrency**](task_settings.md#run_mode){ data-preview }: Supports `asyncio`, `ThreadPoolExecutor`, and `ProcessPoolExecutor` for efficient task handling.
19+
- **Distributed task queue**: Soon.
20+
- **Many different adapters to the database**: Soon.
21+
- **Many different serializers**: Soon.
1522

1623
## Comparison
1724

1825
You might have seen other libraries like `APScheduler`, `Celery`, or `Taskiq`.
1926
Below is a comparison of features to help you decide if Jobify fits your needs.
2027

21-
| Feature name | Jobify | Taskiq | APScheduler (v3) | Celery |
22-
| :------------------------------------------------------------------------ | :------------------: | :---------------: | :--------------: | :---------------: |
23-
| **Event-driven Scheduling** | ✅ (Low-level timer) | ❌ (Polling/Loop) | ❌ (Interval) | ❌ (Polling/Loop) |
24-
| **Async Native (asyncio)** ||| ❌ (Sync mostly) ||
25-
| [**Context Injection**](context.md){ data-preview } |||||
26-
| [**FastAPI-style Routing**](router.md){ data-preview } |||||
27-
| [**Middleware Support**](app_settings/#middleware){ data-preview } ||| ❌ (Events only) | ❌ (Signals) |
28-
| [**Job Cancellation**](job/#await-jobcancel){ data-preview } |||||
29-
| [**Cron Scheduling**](schedule/#cron-expressions){ data-preview } |||||
30-
| [**Misfire Policy**](schedule/#the-cron-object){ data-preview } |||||
31-
| [**Run Modes (Thread/Process)**](task_settings/#run_mode){ data-preview } |||||
32-
| **Rich Typing Support** |||||
33-
| **Zero-config Persistence** | ✅ (SQLite default) | ❌ (Needs Broker) || ❌ (Needs Broker) |
34-
| **Broker-backend execution** | ❌ (soon) ||||
28+
| Feature name | Jobify | Taskiq | APScheduler (v3) | Celery |
29+
| :----------------------------------------------------------------------------- | :------------------: | :---------------: | :--------------: | :---------------: |
30+
| **Event-driven Scheduling** | ✅ (Low-level timer) | ❌ (Polling/Loop) | ❌ (Interval) | ❌ (Polling/Loop) |
31+
| **Async Native (asyncio)** ||| ❌ (Sync mostly) ||
32+
| [**Context Injection**](context.md){ data-preview } |||||
33+
| [**FastAPI-style Routing**](router.md){ data-preview } |||||
34+
| [**Middleware Support**](app_settings.md#middleware){ data-preview } ||| ❌ (Events only) | ❌ (Signals) |
35+
| [**Lifespan Support**](app_settings.md#lifespan){ data-preview } |||||
36+
| [**Exception Handlers**](advanced_usage/exception_handlers.md){ data-preview } | ✅ (Hierarchical) ||||
37+
| [**Job Cancellation**](job.md#await-jobcancel){ data-preview } |||||
38+
| [**Cron Scheduling**](schedule.md#cron-expressions){ data-preview } | ✅ (Seconds level) | ✅ (Minutes) |||
39+
| [**Misfire Policy**](schedule.md#the-cron-object){ data-preview } |||||
40+
| [**Run Modes (Thread/Process)**](task_settings.md#run_mode){ data-preview } |||||
41+
| **Rich Typing Support** |||||
42+
| **Zero-config Persistence** | ✅ (SQLite default) | ❌ (Needs Broker) || ❌ (Needs Broker) |
43+
| **Broker-backend execution** | ❌ (soon) ||||
3544

3645
### Why Jobify?
3746

@@ -42,6 +51,9 @@ Jobify uses the low-level asyncio.loop.call_at API.
4251
2. **Precision**: Tasks are triggered precisely by the internal timer of the event loop, ensuring sub-millisecond accuracy and avoiding the "jitter" that can be associated with sleep intervals.
4352
3. **Native**: It works in harmony with OS-level event notification systems (epoll/kqueue).
4453

54+
!!! note "The Precision vs. Polling Trade-off"
55+
Jobify consciously avoids polling in order to achieve maximum efficiency and sub-millisecond precision. This architectural decision means that the scheduler is sensitive to significant changes in the operating system's clock. For more information on this trade-off and why it is important, please see [System Time and Scheduling](advanced_usage/system_time.md){ data-preview }.
56+
4557
## Quick Start
4658

4759
### Installation
@@ -108,4 +120,5 @@ async def main() -> None:
108120

109121
if __name__ == "__main__":
110122
asyncio.run(main())
123+
111124
```

docs/integrations.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Jobify is designed to be extensible. Below are the community-supported and offic
1111
FastAPI and other ASGI frameworks that support lifespan don't need a separate integration. Just add the code below to your lifespan or startup/shutdown handlers:
1212

1313
```python
14-
from collections.abc import AsyncGenerator
14+
from collections.abc import AsyncIterator
1515
from contextlib import asynccontextmanager
1616

1717
from fastapi import FastAPI
@@ -22,7 +22,7 @@ jobify_app = Jobify()
2222

2323

2424
@asynccontextmanager
25-
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
25+
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
2626
async with jobify_app:
2727
yield
2828

@@ -33,7 +33,7 @@ Or, using explicit startup/shutdown:
3333

3434
```python
3535
@asynccontextmanager
36-
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
36+
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
3737
await jobify_app.startup()
3838
yield
3939
await jobify_app.shutdown()

docs/schedule.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
You can schedule tasks in two main ways: using recurring cron expressions or dynamically at runtime.
44

5+
!!! info "Precision & System Time"
6+
Jobify uses high-precision timers instead of polling, which makes the scheduler very efficient. However, this also means that it is sensitive to changes in the system time. For details, see [System Time and Scheduling Trade-offs](advanced_usage/system_time.md).
7+
58
## Cron Expressions
69

710
`Jobify` uses the [crontab](https://pypi.org/project/crontab/) library to parse and schedule jobs from cron expressions. This provides a flexible and powerful way to define recurring tasks.

src/jobify/_internal/common/datastructures.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,31 @@
22
from __future__ import annotations
33

44
from collections import UserDict
5-
from typing import Any
5+
from typing import Any, Literal
66

7-
from typing_extensions import override
7+
from typing_extensions import Self, override
88

99

10-
class EmptyPlaceholder:
10+
class EmptyPlaceholder(str):
11+
__slots__: tuple[()] = ()
12+
13+
def __new__(cls) -> Self:
14+
return super().__new__(cls, "__EMPTY__")
15+
16+
def __bool__(self) -> Literal[False]:
17+
return False
18+
1119
@override
12-
def __repr__(self) -> str:
13-
return "EMPTY"
20+
def __str__(self) -> str:
21+
return super().__str__()
1422

1523
@override
1624
def __hash__(self) -> int:
17-
return hash("EMPTY")
25+
return hash(super().__str__())
1826

1927
@override
2028
def __eq__(self, other: object) -> bool:
21-
return isinstance(other, self.__class__)
22-
23-
def __bool__(self) -> bool:
24-
return False
29+
return other == super().__str__()
2530

2631

2732
class State(UserDict[str, Any]):

src/jobify/_internal/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,12 @@ def raise_app_already_started_error(operation: str) -> NoReturn:
102102
"Move this call outside/before the 'async with jobify:' block."
103103
),
104104
)
105+
106+
107+
class NoResultError(BaseJobifyError):
108+
"""Raised when a task should fail immediately without retrying."""
109+
110+
def __init__(
111+
self, msg: str = "Job aborted: no result expected and retries stopped."
112+
) -> None:
113+
super().__init__(msg)

src/jobify/_internal/middleware/retry.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from typing_extensions import override
88

9+
from jobify._internal.exceptions import NoResultError
910
from jobify._internal.middleware.base import BaseMiddleware, CallNext
1011

1112
if TYPE_CHECKING:
@@ -25,7 +26,9 @@ async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
2526
while True:
2627
try:
2728
return await call_next(context)
28-
except Exception as exc: # noqa: PERF203
29+
except NoResultError: # noqa: PERF203
30+
raise
31+
except Exception as exc:
2932
failures += 1
3033
if failures > max_retries:
3134
msg = (

0 commit comments

Comments
 (0)