Skip to content

Commit 89993f9

Browse files
authored
Support for bind=True kwarg (#53)
* Support for bind=True kwarg * Add integration tests for bind=True * Add docs
1 parent 0d7535a commit 89993f9

File tree

9 files changed

+208
-10
lines changed

9 files changed

+208
-10
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ You will use dispatcher to trigger a background task over pg_notify.
2020
Both your *background dispatcher service* and your *task publisher* process must have
2121
python configured so that your task is importable.
2222

23+
For more options, see `docs/usage.md`.
24+
2325
#### Library
2426

2527
The dispatcher `@task()` decorator is used to register tasks.

dispatcher/publish.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,24 @@
1010

1111
class DispatcherDecorator:
1212
def __init__(
13-
self, registry: DispatcherMethodRegistry, *, queue: Optional[str] = None, on_duplicate: Optional[str] = None, timeout: Optional[float] = None
13+
self,
14+
registry: DispatcherMethodRegistry,
15+
*,
16+
bind: bool = False,
17+
queue: Optional[str] = None,
18+
on_duplicate: Optional[str] = None,
19+
timeout: Optional[float] = None,
1420
) -> None:
1521
self.registry = registry
22+
self.bind = bind
1623
self.queue = queue
1724
self.on_duplicate = on_duplicate
1825
self.timeout = timeout
1926

2027
def __call__(self, fn: DispatcherCallable, /) -> DispatcherCallable:
2128
"Concrete task decorator, registers method and glues on some methods from the registry"
2229

23-
dmethod = self.registry.register(fn, queue=self.queue, on_duplicate=self.on_duplicate, timeout=self.timeout)
30+
dmethod = self.registry.register(fn, bind=self.bind, queue=self.queue, on_duplicate=self.on_duplicate, timeout=self.timeout)
2431

2532
setattr(fn, 'apply_async', dmethod.apply_async)
2633
setattr(fn, 'delay', dmethod.delay)
@@ -30,6 +37,7 @@ def __call__(self, fn: DispatcherCallable, /) -> DispatcherCallable:
3037

3138
def task(
3239
*,
40+
bind: bool = False,
3341
queue: Optional[str] = None,
3442
on_duplicate: Optional[str] = None,
3543
timeout: Optional[float] = None,
@@ -72,4 +80,4 @@ def announce():
7280
# The on_duplicate kwarg controls behavior when multiple instances of the task running
7381
# options are documented in dispatcher.utils.DuplicateBehavior
7482
"""
75-
return DispatcherDecorator(registry, queue=queue, on_duplicate=on_duplicate, timeout=timeout)
83+
return DispatcherDecorator(registry, bind=bind, queue=queue, on_duplicate=on_duplicate, timeout=timeout)

dispatcher/registry.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def publication_defaults(self) -> dict:
5454
def delay(self, *args, **kwargs) -> Tuple[dict, str]:
5555
return self.apply_async(args, kwargs)
5656

57-
def get_async_body(self, args=None, kwargs=None, uuid=None, on_duplicate: Optional[str] = None, timeout: Optional[float] = 0.0, delay: float = 0.0) -> dict:
57+
def get_async_body(
58+
self, args=None, kwargs=None, uuid=None, bind: bool = False, on_duplicate: Optional[str] = None, timeout: Optional[float] = 0.0, delay: float = 0.0
59+
) -> dict:
5860
"""
5961
Get the python dict to become JSON data in the pg_notify message
6062
This same message gets passed over the dispatcher IPC queue to workers
@@ -66,6 +68,8 @@ def get_async_body(self, args=None, kwargs=None, uuid=None, on_duplicate: Option
6668

6769
# TODO: callback to add other things, guid in case of AWX
6870

71+
if bind:
72+
body['bind'] = bind
6973
if on_duplicate:
7074
body['on_duplicate'] = on_duplicate
7175
if delay:

dispatcher/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def resolve_callable(task: str) -> Optional[Callable]:
2525
That is out of scope of this method now.
2626
This is mainly used by the worker.
2727
"""
28-
if task.startswith('lambda:'):
28+
if task.startswith('lambda'):
2929
return eval(task)
3030

3131
if MODULE_METHOD_DELIMITER not in task:

dispatcher/worker/task.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ def exit_gracefully(self, *args, **kwargs):
3535
self.kill_now = True
3636

3737

38+
class DispatcherBoundMethods:
39+
"""
40+
If you use the task decorator with the bind=True argument,
41+
an object of this type will be passed in.
42+
This contains public methods for users of the dispatcher to call.
43+
"""
44+
45+
def __init__(self, worker_id, message):
46+
self.worker_id = worker_id
47+
self.uuid = message.get('uuid', '<unknown>')
48+
49+
3850
class TaskWorker:
3951
"""
4052
A worker implementation that deserializes task messages and runs native
@@ -50,8 +62,8 @@ class TaskWorker:
5062
Previously this initialized pre-fork, making init logic unusable.
5163
"""
5264

53-
def __init__(self, worker_id):
54-
self.worker_id = worker_id
65+
def __init__(self, worker_id: int):
66+
self.worker_id: int = worker_id
5567
self.ppid = os.getppid()
5668
self.pid = os.getpid()
5769
self.signal_handler = WorkerSignalHandler(worker_id)
@@ -68,18 +80,27 @@ def should_exit(self) -> bool:
6880
def get_uuid(self, message):
6981
return message.get('uuid', '<unknown>')
7082

83+
def produce_binder(self, message: dict) -> DispatcherBoundMethods:
84+
"""
85+
Return the object with public callbacks to pass to the task
86+
"""
87+
return DispatcherBoundMethods(self.worker_id, message)
88+
7189
def run_callable(self, message):
7290
"""
73-
Given some AMQP message, import the correct Python code and run it.
91+
Import the Python code and run it.
7492
"""
7593
task = message['task']
76-
args = message.get('args', [])
94+
args = message.get('args', []).copy()
7795
kwargs = message.get('kwargs', {})
7896
_call = registry.get_method(task).get_callable()
7997

8098
# don't print kwargs, they often contain launch-time secrets
8199
logger.debug(f'task (uuid={self.get_uuid(message)}) starting {task}(*{args}) on worker {self.worker_id}')
82100

101+
if message.get('bind') is True:
102+
args = [self.produce_binder(message)] + args
103+
83104
try:
84105
return _call(*args, **kwargs)
85106
except DispatcherCancel:

docs/task_options.md

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
## Task Options
2+
3+
You can specify additional details about the behavior of a task
4+
by applying additional options.
5+
6+
There are normally 2 days to do this
7+
- pass to the `@task` decorator
8+
- pass to the `.apply_async` method for a one-off use
9+
10+
The structure of use of those options is illustrated below,
11+
with `options` being keyword-based additional options.
12+
13+
```python
14+
from dispatcher.publish import task
15+
16+
@task(queue='test_channel', **options)
17+
def print_hello():
18+
print('hello world!!')
19+
```
20+
21+
For example, to set a timeout of 1 second on the task:
22+
23+
```python
24+
from dispatcher.publish import task
25+
26+
@task(queue='test_channel', **options)
27+
def print_hello():
28+
print('hello world!!')
29+
```
30+
31+
For the one-off use, `.apply_async` can take options,
32+
but `.delay` cannot, because of the argument structure.
33+
Using `.delay` inherently runs the task with the task default options.
34+
35+
```python
36+
from test_methods import print_hello
37+
38+
print_hello.apply_async(args=[], kwargs={}, **options)
39+
```
40+
41+
For the timeout seconds example:
42+
43+
```python
44+
from test_methods import print_hello
45+
46+
print_hello.apply_async(args=[], kwargs={}, timeout=2)
47+
```
48+
49+
The `apply_async` options will take precedence over the
50+
task default options (those passed into the decorator).
51+
52+
### Task Options Manifest
53+
54+
This section documents specific options.
55+
These follow a "standard" pattern, meaning that they
56+
can be used in both of the ways described above.
57+
58+
#### Bind
59+
60+
If `bind=True` is passed (default is `False`), then
61+
additional argument is inserted at the start of the
62+
argument list to the method. Like:
63+
64+
```python
65+
@task(bind=True)
66+
def hello_world(dispatcher, *args, **kwargs):
67+
print(f'I see the dispatcher object {dispatcher}')
68+
```
69+
70+
The `dispatcher` object contains public methods
71+
which allow interaction with the parent process.
72+
Available methods will expand in the future,
73+
right now it offers:
74+
75+
- `uuid` - the internal id of this task call in dispatcher
76+
- `worker_id` - the id of the worker running this task
77+
78+
#### Queue
79+
80+
The queue or channel this task is submitted to.
81+
For instance, the pg_notify channel.
82+
This can be a callable to get this dynamically.
83+
84+
#### on_duplicate
85+
86+
This option helps to manage capacity, controlling
87+
- task "shedding"
88+
- task queuing
89+
90+
Depending on the value, a task submission will be ignored
91+
if certain conditions are met, "shedding", or queued if all
92+
workers are busy.
93+
94+
- parallel - multiple tasks (running the given `@task` method) are allowed at the same time. Tasks queue if no free workers are available.
95+
- discard - if a task is already being ran or is queued, any new submissions of this task are ignored.
96+
- serial - only 1 task (running the given method) will be ran at a single time in the local dispatcher service. Additional submissions are queued, so all submissions will be ran eventually.
97+
- queue_one - for idempotent tasks, only 1 task (running the given method) will be ran at a single time, and an additional submission is queued. However, only 1 task will be held in the queue, and additional submissions are discarded. This assures _timely_ running of an idempotent task.
98+
99+
### Unusual Options
100+
101+
These do not follow the standard pattern for some reason.
102+
Usually for testing.
103+
104+
#### registry
105+
106+
The dispatcher uses a global task registry.
107+
To enable isolated testing `@task` can take a custom
108+
(meaning non-global) registry.
109+
110+
There is no real multi-registry feature,
111+
and additional custom code hooks would be needed to make this work.

tests/integration/test_bind.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
import json
3+
import logging
4+
5+
import pytest
6+
7+
from tests.data import methods as test_methods
8+
9+
ASSERT_UUID = 'lambda dispatcher: dispatcher.uuid'
10+
11+
12+
@pytest.mark.asyncio
13+
async def test_bind_uuid_matches(apg_dispatcher, pg_message, caplog):
14+
assert apg_dispatcher.pool.finished_count == 0
15+
16+
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
17+
with caplog.at_level(logging.DEBUG):
18+
await pg_message(json.dumps({
19+
'task': ASSERT_UUID,
20+
'uuid': 'hello-world-12345543221',
21+
'bind': True
22+
}))
23+
await asyncio.wait_for(clearing_task, timeout=3)
24+
25+
assert 'result: hello-world-12345543221' in caplog.text
26+
27+
assert apg_dispatcher.pool.finished_count == 1
28+
29+
@pytest.mark.asyncio
30+
async def test_bind_not_set(apg_dispatcher, pg_message, caplog):
31+
assert apg_dispatcher.pool.finished_count == 0
32+
33+
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
34+
with caplog.at_level(logging.DEBUG):
35+
await pg_message(json.dumps({
36+
'task': ASSERT_UUID,
37+
'uuid': 'hello-world-12345543221'
38+
}))
39+
await asyncio.wait_for(clearing_task, timeout=3)
40+
41+
assert 'result: hello-world-12345543221' not in caplog.text
42+
43+
assert apg_dispatcher.pool.finished_count == 1

tools/test_methods.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ def sleep_serial(seconds=1):
1818
time.sleep(seconds)
1919

2020

21+
@task(queue='test_channel', bind=True)
22+
def hello_world_binder(binder):
23+
print(f'Values in binder {vars(binder)}')
24+
print(f'Hello world, from worker {binder.worker_id} running task {binder.uuid}')
25+
26+
2127
@task(queue='test_channel')
2228
def print_hello():
2329
print('hello world!!')

tools/write_messages.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
sys.path.append(tools_dir)
1717

18-
from test_methods import sleep_function, sleep_discard, task_has_timeout
18+
from test_methods import sleep_function, sleep_discard, task_has_timeout, hello_world_binder
1919

2020
# Database connection details
2121
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"
@@ -111,6 +111,9 @@ def main():
111111
print('demo of task_has_timeout that times out due to decorator use')
112112
task_has_timeout.apply_async(config={'conninfo': CONNECTION_STRING})
113113

114+
print('demo of using bind=True, with hello_world_binder')
115+
hello_world_binder.apply_async(config={'conninfo': CONNECTION_STRING})
116+
114117
if __name__ == "__main__":
115118
logging.basicConfig(level='ERROR', stream=sys.stdout)
116119
main()

0 commit comments

Comments
 (0)