Skip to content

Commit 9ece818

Browse files
authored
Producer for running tasks on start (#90)
* Basics of an on-start producer * Add test for on_start producer * Update schema * Update docs with new produer
1 parent 570ff03 commit 9ece818

File tree

7 files changed

+84
-5
lines changed

7 files changed

+84
-5
lines changed

dispatcher.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,8 @@ producers:
2121
schedule: 3
2222
'lambda: __import__("time").sleep(2)':
2323
schedule: 3
24+
OnStartProducer:
25+
task_list:
26+
'lambda: print("This task runs on startup")': {}
2427
publish:
2528
default_broker: pg_notify

dispatcher/producers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .base import BaseProducer
22
from .brokered import BrokeredProducer
3+
from .on_start import OnStartProducer
34
from .scheduled import ScheduledProducer
45

5-
__all__ = ['BaseProducer', 'BrokeredProducer', 'ScheduledProducer']
6+
__all__ = ['BaseProducer', 'BrokeredProducer', 'ScheduledProducer', 'OnStartProducer']

dispatcher/producers/on_start.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import asyncio
2+
import logging
3+
from typing import Union
4+
5+
from .base import BaseProducer
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class OnStartProducer(BaseProducer):
11+
def __init__(self, task_list: dict[str, dict[str, Union[int, str]]]):
12+
self.task_list = task_list
13+
super().__init__()
14+
15+
async def start_producing(self, dispatcher) -> None:
16+
self.events.ready_event.set()
17+
18+
for task_name, options in self.task_list.items():
19+
message = options.copy()
20+
message['task'] = task_name
21+
message['uuid'] = f'on-start-{self.produced_count}'
22+
23+
logger.debug(f"Produced on-start task: {task_name}")
24+
self.produced_count += 1
25+
await dispatcher.process_message(message)
26+
27+
def all_tasks(self) -> list[asyncio.Task]:
28+
return []
29+
30+
async def shutdown(self) -> None:
31+
pass

dispatcher/producers/scheduled.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import asyncio
22
import logging
3+
from typing import Union
34

45
from .base import BaseProducer
56

67
logger = logging.getLogger(__name__)
78

89

910
class ScheduledProducer(BaseProducer):
10-
def __init__(self, task_schedule: dict[str, dict[str, int]]):
11+
def __init__(self, task_schedule: dict[str, dict[str, Union[int, str]]]):
1112
self.task_schedule = task_schedule
1213
self.scheduled_tasks: list[asyncio.Task] = []
1314
super().__init__()

docs/config.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,9 @@ For every listed broker, a `BrokeredProducer` is automatically
116116
created. That means that tasks may be produced from the messaging
117117
system that the dispatcher service is listening to.
118118

119-
The other current use case is `ScheduledProducer`,
120-
which submits tasks every certain number of seconds.
119+
The others are:
120+
- `ScheduledProducer` - submits tasks every certain number of seconds
121+
- `OnStartProducer` - runs tasks once after starting
121122

122123
#### Publish
123124

schema.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
},
1212
"producers": {
1313
"ScheduledProducer": {
14-
"task_schedule": "dict[str, dict[str, int]]"
14+
"task_schedule": "dict[str, dict[str, typing.Union[int, str]]]"
15+
},
16+
"OnStartProducer": {
17+
"task_list": "dict[str, dict[str, typing.Union[int, str]]]"
1518
}
1619
},
1720
"service": {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import logging
2+
import asyncio
3+
4+
import pytest
5+
6+
from dispatcher.factories import from_settings
7+
from dispatcher.config import DispatcherSettings
8+
9+
10+
@pytest.mark.asyncio
11+
async def test_on_start_tasks(caplog):
12+
try:
13+
settings = DispatcherSettings({
14+
'version': 2,
15+
'service': {
16+
'pool_kwargs': {'max_workers': 2}
17+
},
18+
'brokers': {}, # do not need them for this test
19+
'producers': {
20+
'OnStartProducer': {
21+
'task_list': {'lambda: return "confirmation_of_run"': {}}
22+
}
23+
}
24+
})
25+
dispatcher = from_settings(settings=settings)
26+
assert dispatcher.pool.finished_count == 0
27+
28+
await dispatcher.connect_signals()
29+
with caplog.at_level(logging.DEBUG):
30+
await dispatcher.start_working()
31+
await dispatcher.wait_for_producers_ready()
32+
await asyncio.wait_for(dispatcher.pool.events.work_cleared.wait(), timeout=2)
33+
await asyncio.sleep(0.02) # still may be some time between clearing event and desired log
34+
35+
assert dispatcher.pool.finished_count == 1
36+
assert 'result: confirmation_of_run' not in caplog.text
37+
finally:
38+
await dispatcher.shutdown()
39+
await dispatcher.cancel_tasks()

0 commit comments

Comments
 (0)