Skip to content

Commit 8c4e453

Browse files
authored
Merge pull request #1 from knifecake/database-config
Concurrency maintenance
2 parents 5bc627b + ee6bcfa commit 8c4e453

25 files changed

Lines changed: 557 additions & 94 deletions

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ lint:
1414

1515
.PHONY: force-kill
1616
force-kill:
17-
ps | grep steady_queue | cut -f 1 -d ' ' | xargs kill -9 && rm -f tmp/pids/steady_queue_supervisor.pid
17+
ps | grep steady_queue | cut -f 1 -d ' ' | xargs kill -9
18+
rm -f tmp/pids/steady_queue_supervisor.pid

README.md

Lines changed: 138 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ Steady Queue will try to find our configuration under the `STEADY_QUEUE` variabl
129129
from steady_queue.configuration import Configuration
130130
from datetime import timedelta
131131

132-
STEADY_QUEUE = Configuration.ConfigurationOptions(
132+
STEADY_QUEUE = Configuration.Options(
133133
dispatchers=[
134-
Configuration.DispatcherConfiguration(
134+
Configuration.Dispatcher(
135135
polling_interval=timedelta(seconds=1),
136136
batch_size=500
137137
)
138138
],
139139
workers=[
140-
Configuration.WorkerConfiguration(
140+
Configuration.Worker(
141141
queues=["*"],
142142
threads=3,
143143
polling_interval=timedelta(seconds=0.1)
@@ -155,19 +155,19 @@ Here's an overview of the different options:
155155
- `batch_size`: the dispatcher will dispatch tasks in batches of this size. The default is 500.
156156

157157
- `concurrency_maintenance_interval`: the time interval in seconds that the
158-
dispatcher will wait before checking for blocked jobs that can be unblocked.
158+
dispatcher will wait before checking for blocked tasks that can be unblocked.
159159
Read more about [concurrency controls](#concurrency-controls) to learn more
160160
about this setting. It defaults to `600` seconds.
161161

162162
- `queues`: the list of queues that workers will pick tasks from. You can use
163163
`*` to indicate all queues (which is also the default and the behavior you'll
164164
get if you omit this). Tasks will be polled from those queues in order, so for
165-
example, with `['real_time', 'background']`, no jobs will be taken from
166-
`background` unless there aren't any more jobs waiting in `real_time`.
165+
example, with `['real_time', 'background']`, no tasks will be taken from
166+
`background` unless there aren't any more tasks waiting in `real_time`.
167167

168168
You can also provide a prefix with a wildcard to match queues starting with a
169169
prefix. For example adding `staging*` to the queues list will create a worker
170-
fetching jobs from all queues starting with `staging`. The wildcard `*` is
170+
fetching tasks from all queues starting with `staging`. The wildcard `*` is
171171
only allowed on it's own or at the end of a queue name; you can't specify
172172
queue names such as `*_some_queue`. These will be ignored.
173173

@@ -388,9 +388,137 @@ TODO
388388

389389
## Concurrency controls
390390

391-
TODO
391+
Steady Queue extends Django Tasks with concurrency controls, that allows you to limit how many tasks of a certain type or with certain arguments can run at the same time. When limited in this way, tasks will be blocked from running, and they'll stay blocked until another task finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Tasks are never discarded or lost, just blocked.
392+
393+
394+
```python
395+
from django_tasks import task
396+
397+
from steady_queue.concurrency import limits_concurrency
398+
399+
@limits_concurrency(
400+
key=lambda arg1, arg2, **kwargs: pass,
401+
to=max_concurrent_executions,
402+
duration=max_timedelta_to_guarantee_concurrency_limit,
403+
group=concurrency_group
404+
)
405+
@task()
406+
def my_task(arg1, arg2, **kwargs):
407+
pass
408+
```
409+
410+
- `key` is the only required parameter, and it can be a string or a callable
411+
that receives the same arguments as the task and returns a string. It will be
412+
used to identify the tasks that need to be limited together.
413+
- `to` is `1` by default.
414+
- `duration` is set to `steady_queue.default_concurrency_control_period` by
415+
default, which itself defaults to `3 minutes`.
416+
- `group` is used to control the concurrency of different tasks types together.
417+
It defaults to the task's module path.
418+
419+
When a task includes these controls, we'll ensure that, at most, the number of
420+
tasks (indicated as `to`) that yield the same `key` will be performed
421+
concurrently, and this guarantee will last for `duration` for each task
422+
enqueued. Note that there is no guarantee about _the order of execution_, only
423+
about tasks being performed at the same time (overlapping).
424+
425+
The concurrency limits use the concept of semaphores when enqueueing, and work
426+
as follows: when a task is enqueued, we check if it specifies concurrency
427+
controls. If it does, we check the semaphore for the computed concurrency key.
428+
If the semaphore is open, we claim it and we set the task as _ready_. Ready
429+
means it can be picked up by workers for execution. When the task finishes
430+
execution (be it successfully or unsuccessfully, resulting in a failed
431+
execution), we signal the semaphore and try to unblock the next task with the
432+
same key, if any. Unblocking the next task doesn't mean running that task right
433+
away, but moving it from _blocked_ to _ready_. Since something can heppen that
434+
prevents the first task from releasing the semaphore and unblocking the next
435+
task (for example, someone pulling a plug in the machine where the worker is
436+
running), we have the `duration` as a failsafe. Tasks that have been blocked for
437+
more than duration are candidates to be released, but only as many of them as
438+
the concurrency rules allow, as each one would need to go through the semaphore
439+
dance check. This means that the `duration` is not really about the task that's
440+
enqueued or being run, it's about the tasks that are blocked waiting. It's
441+
important to note that after one or more candidate tasks are unblocked (either
442+
because a task finishes or because `duration` expires and a semaphore is
443+
released), the `duration` timer for the still blocked tasks is reset. This
444+
happens indirectly via the expiration time of the semaphore, which is updated.
445+
446+
For example
447+
448+
449+
```python
450+
@limits_concurrency(
451+
to=2,
452+
key=lambda contact: contact.account_id,
453+
duration=timedelta(minutes=5)
454+
)
455+
@task()
456+
def deliver_announcement(contact):
457+
pass
458+
```
459+
460+
In this case, we'll ensure that at most two tasks of the kind
461+
`deliver_announcement` for the same account will run concurrently. If, for any
462+
reason, one of those tasks takes longer than 5 minutes or doesn't release its
463+
concurrency lock (signals the semaphore) within 5 minutes of acquiring it, a new
464+
task with the same key might gain the lock.
465+
466+
Let's see another example using `group`:
467+
468+
```python
469+
@limits_concurrency(
470+
key=lambda contact: contact.pk,
471+
duration=timedelta(minutes=15),
472+
group='contact_tasks'
473+
)
474+
@task()
475+
def contact_action(contact):
476+
pass
477+
```
478+
479+
480+
```python
481+
@limits_concurrency(
482+
key=lambda bundle: bundle.contact_id,
483+
duration=timedelta(minutes=15),
484+
group='contact_tasks'
485+
)
486+
@task()
487+
def bundle_action(bundle):
488+
pass
489+
```
392490

393-
## Failed jobs and retries
491+
In this case, if we have a `contact_action` task enqueued for a contact record
492+
with id `123` and another `bundle_action` task enqueued simultaneously for a
493+
bundle record that references contact `123`, only one of them will be allowed to
494+
proceed. The other one will stay blocked until the first one finishes (or 15
495+
minutes pass, whatever happens first).
496+
497+
Note that the `duration` setting depends indirectly on the value for
498+
`concurrency_maintenance_interval` that you set for your dispatcher(s), as
499+
that'd be the frequency with which blocked tasks are checked and unblocked (at
500+
which point, only one task per concurrency key, at most, is unblocked). In
501+
general, you should set `duration` in a way that all your tasks would finish
502+
well under that duration and think of the concurrency maintenance task as a
503+
failsafe in case something goes wrong.
504+
505+
Tasks are unblocked in order of priority but queue order is not taken into
506+
account for unblocking tasks. That means that if you have a group of tasks that
507+
share a concurrency group but are in different queues, or tasks of the same
508+
class that you enqueue in different queues, the queue order you set for a worker
509+
is not taken into account when unblocking blocked ones. The reason is that a
510+
task that runs unblocks the next one, and the task itself doesn't know about a
511+
particular worker's queue order (you could even have different workers with
512+
different queue orders), it can only know about priority. Once blocked tasks are
513+
unblocked and available for polling, they'll be picked up by a worker following
514+
its queue order.
515+
516+
Finally, failed tasks that are automatically or manually retried work in the
517+
same way as new tasks that get enqueued: they get in the queue for getting an
518+
open semaphore, and whenever they get it, they'll be run. It doesn't matter if
519+
they had already gotten an open semaphore in the past.
520+
521+
## Failed tasks and retries
394522

395523
TODO
396524

@@ -492,7 +620,7 @@ It is possible to run multiple schedulers, for example, if you have multiple
492620
servers for redundancy and your run the `scheduler` in more than one of them. To
493621
avoid enqueueing duplicate tasks at the same time, an entry in the
494622
`steady_queue_recurringexecution` table is added in the same transaction as the
495-
job is enqueued. This table has a unique index on `task_key` and `run_at`,
623+
task is enqueued. This table has a unique index on `task_key` and `run_at`,
496624
ensuring only one entry per task per time will be created. This only works if
497625
you have `preserve_finished_tasks` set to `True` (the default), and the
498626
guarantee applies as long as you keep tasks around.

TODOs.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
- [x] Remove demo app in favor of test dummy
3131
- [x] Configure package builds
3232
- [x] Review logging noisiness
33-
- [ ] Concurrency controls
33+
- [x] Cleanup configuration
34+
- [x] Timer tasks that run immediately
35+
- [x] Concurrency controls
36+
- [ ] Support for multiple databases
3437
- [ ] Lifecycle hooks on processes
3538
- [ ] Tests
3639
- [ ] Class-based tasks
@@ -40,5 +43,4 @@
4043
- [ ] Signals on tasks: pre/post enqueue, pre/post perform
4144
- [ ] Contributing
4245
- [ ] Readme
43-
- [ ] Support for multiple databases
4446
- [ ] Publish to PyPI

steady_queue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import timedelta
22
from typing import Optional
33

4-
VERSION = (0, 1, "0b7")
4+
VERSION = (0, 1, "0b8")
55

66
__version__ = ".".join(map(str, VERSION))
77

steady_queue/backend.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from django_tasks.backends.base import BaseTaskBackend
33
from django_tasks.task import P, T
44

5-
from steady_queue.models import Job
65
from steady_queue.task import SteadyQueueTask
76

87

@@ -22,6 +21,8 @@ def validate_task(self, task: Task) -> None:
2221
def enqueue(
2322
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
2423
) -> TaskResult[T]:
24+
from steady_queue.models import Job
25+
2526
if not isinstance(task, SteadyQueueTask):
2627
raise ValueError("Steady Queue only supports SteadyQueueTasks")
2728

@@ -35,7 +36,7 @@ def get_result(self, result_id: str) -> TaskResult:
3536
"This backend does not support retrieving or refreshing results."
3637
)
3738

38-
def _to_task_result(self, task: SteadyQueueTask, job: Job) -> TaskResult:
39+
def _to_task_result(self, task: SteadyQueueTask, job) -> TaskResult:
3940
return TaskResult(
4041
task=task,
4142
id=str(job.id),

steady_queue/concurrency.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from datetime import timedelta
2+
from typing import Optional
3+
4+
import steady_queue
5+
from steady_queue.task import SteadyQueueTask
6+
7+
8+
def limits_concurrency(
9+
key: str,
10+
to: int = 1,
11+
duration: Optional[timedelta] = None,
12+
group: Optional[str] = None,
13+
):
14+
def wrapper(task: SteadyQueueTask):
15+
task.concurrency_key = key
16+
task.concurrency_limit = to
17+
task.concurrency_duration = (
18+
duration or steady_queue.default_concurrency_control_period
19+
)
20+
task.concurrency_group = group or task.module_path
21+
22+
return task
23+
24+
return wrapper

steady_queue/configuration.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,21 @@
77

88
class Configuration:
99
@dataclass
10-
class WorkerConfiguration:
10+
class Worker:
1111
queues: list[str] = field(default_factory=lambda: ["*"])
1212
threads: int = 3
1313
processes: int = 1
1414
polling_interval: timedelta = timedelta(seconds=1)
1515

1616
@dataclass
17-
class DispatcherConfiguration:
17+
class Dispatcher:
1818
polling_interval: timedelta = timedelta(seconds=0.1)
1919
batch_size: int = 500
2020
concurrency_maintenance: bool = True
2121
concurrency_maintenance_interval: timedelta = timedelta(minutes=5)
2222

2323
@dataclass
24-
class RecurringTaskConfiguration:
24+
class RecurringTask:
2525
key: str
2626
class_name: Optional[str] = None
2727
command: Optional[str] = None
@@ -32,36 +32,35 @@ class RecurringTaskConfiguration:
3232
description: Optional[str] = None
3333

3434
@classmethod
35-
def discover(cls) -> list["Configuration.RecurringTaskConfiguration"]:
35+
def discover(cls) -> list["Configuration.RecurringTask"]:
3636
from steady_queue.recurring_task import configurations
3737

3838
return configurations
3939

4040
@dataclass
41-
class ConfigurationOptions:
42-
workers: list["Configuration.WorkerConfiguration"]
43-
dispatchers: list["Configuration.DispatcherConfiguration"]
44-
recurring_tasks: list["Configuration.RecurringTaskConfiguration"]
41+
class Options:
42+
workers: list["Configuration.Worker"]
43+
dispatchers: list["Configuration.Dispatcher"]
44+
recurring_tasks: list["Configuration.RecurringTask"]
4545
only_work: bool = False
4646
skip_recurring: bool = False
4747

4848
def __init__(
4949
self,
50-
workers: list["Configuration.WorkerConfiguration"] | None = None,
51-
dispatchers: list["Configuration.DispatcherConfiguration"] | None = None,
52-
recurring_tasks: list["Configuration.RecurringTaskConfiguration"]
53-
| None = None,
50+
workers: list["Configuration.Worker"] | None = None,
51+
dispatchers: list["Configuration.Dispatcher"] | None = None,
52+
recurring_tasks: list["Configuration.RecurringTask"] | None = None,
5453
only_work: bool = False,
5554
skip_recurring: bool = False,
5655
):
5756
if workers is None:
58-
workers = [Configuration.WorkerConfiguration()]
57+
workers = [Configuration.Worker()]
5958

6059
if dispatchers is None:
61-
dispatchers = [Configuration.DispatcherConfiguration()]
60+
dispatchers = [Configuration.Dispatcher()]
6261

6362
if recurring_tasks is None:
64-
recurring_tasks = Configuration.RecurringTaskConfiguration.discover()
63+
recurring_tasks = Configuration.RecurringTask.discover()
6564

6665
self.workers = workers
6766
self.dispatchers = dispatchers
@@ -90,20 +89,20 @@ def instantiate(self) -> Base:
9089

9190
raise ValueError(f"Invalid process kind: {self.kind}")
9291

93-
def __init__(self, options: Optional[ConfigurationOptions] = None):
92+
def __init__(self, options: Optional[Options] = None):
9493
if options is None:
95-
options = self.ConfigurationOptions()
94+
options = self.Options()
9695
self.options = options
9796

9897
@property
99-
def configured_processes(self):
98+
def configured_processes(self) -> list["Configuration.Process"]:
10099
if self.options.only_work:
101100
return self.workers
102101

103102
return self.workers + self.dispatchers + self.schedulers
104103

105104
@property
106-
def workers(self):
105+
def workers(self) -> list["Configuration.Process"]:
107106
workers = []
108107
for worker_config in self.options.workers:
109108
workers += [
@@ -113,14 +112,14 @@ def workers(self):
113112
return workers
114113

115114
@property
116-
def dispatchers(self):
115+
def dispatchers(self) -> list["Configuration.Process"]:
117116
return [
118117
self.Process(kind="dispatcher", attributes=dispatcher_config)
119118
for dispatcher_config in self.options.dispatchers
120119
]
121120

122121
@property
123-
def schedulers(self):
122+
def schedulers(self) -> list["Configuration.Process"]:
124123
return [
125124
self.Process(
126125
kind="scheduler",

0 commit comments

Comments
 (0)