Skip to content

Commit a1ef1b0

Browse files
authored
Added events, state and event handlers. (#51)
* Added events, state and event handlers. Signed-off-by: Pavel Kirilin <[email protected]>
1 parent bcc9d9e commit a1ef1b0

23 files changed

+344
-26
lines changed

docs/examples/introduction/aio_pika_broker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ async def main() -> None:
2222
print(f"Returned value: {result.return_value}")
2323
else:
2424
print("Error found while executing task.")
25+
await broker.shutdown()
2526

2627

2728
if __name__ == "__main__":

docs/examples/introduction/full_example.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ async def main() -> None:
2626
print(f"Returned value: {result.return_value}")
2727
else:
2828
print("Error found while executing task.")
29+
await broker.shutdown()
2930

3031

3132
if __name__ == "__main__":

docs/examples/introduction/inmemory_run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ async def add_one(value: int) -> int:
1212

1313

1414
async def main() -> None:
15+
await broker.startup()
1516
# Send the task to the broker.
1617
task = await add_one.kiq(1)
1718
# Wait for the result.
@@ -21,6 +22,7 @@ async def main() -> None:
2122
print(f"Returned value: {result.return_value}")
2223
else:
2324
print("Error found while executing task.")
25+
await broker.shutdown()
2426

2527

2628
if __name__ == "__main__":

docs/examples/state/events_example.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from redis.asyncio import ConnectionPool, Redis # type: ignore
5+
from taskiq_aio_pika import AioPikaBroker
6+
from taskiq_redis import RedisAsyncResultBackend
7+
8+
from taskiq import Context, TaskiqEvents, TaskiqState
9+
from taskiq.context import default_context
10+
11+
# To run this example, please install:
12+
# * taskiq
13+
# * taskiq-redis
14+
# * taskiq-aio-pika
15+
16+
broker = AioPikaBroker(
17+
"amqp://localhost",
18+
result_backend=RedisAsyncResultBackend(
19+
"redis://localhost/0",
20+
),
21+
)
22+
23+
24+
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
25+
async def startup(state: TaskiqState) -> None:
26+
# Here we store connection pool on startup for later use.
27+
state.redis = ConnectionPool.from_url("redis://localhost/1")
28+
29+
30+
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
31+
async def shutdown(state: TaskiqState) -> None:
32+
# Here we close our pool on shutdown event.
33+
await state.redis.disconnect()
34+
35+
36+
@broker.task
37+
async def get_val(key: str, context: Context = default_context) -> Optional[str]:
38+
# Now we can use our pool.
39+
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
40+
return await redis.get(key)
41+
42+
43+
@broker.task
44+
async def set_val(key: str, value: str, context: Context = default_context) -> None:
45+
# Now we can use our pool to set value.
46+
await Redis(connection_pool=context.state.redis).set(key, value)
47+
48+
49+
async def main() -> None:
50+
await broker.startup()
51+
52+
set_task = await set_val.kiq("key", "value")
53+
set_result = await set_task.wait_result(with_logs=True)
54+
if set_result.is_err:
55+
print(set_result.log)
56+
raise ValueError("Cannot set value in redis. See logs.")
57+
58+
get_task = await get_val.kiq("key")
59+
get_res = await get_task.wait_result()
60+
print(f"Got redis value: {get_res.return_value}")
61+
62+
await broker.shutdown()
63+
64+
65+
if __name__ == "__main__":
66+
asyncio.run(main())

docs/guide/getting-started.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,16 @@ from taskiq import InMemoryBroker
5454
broker = InMemoryBroker()
5555
```
5656

57-
And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further.
57+
And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. Also, we call the `startup` method at the beginning of the `main` function.
5858

5959
@[code python](../examples/introduction/inmemory_run.py)
6060

61+
::: tip Cool tip!
62+
63+
Calling the `startup` method is not required, but we strongly recommend you do so.
64+
65+
:::
66+
6167
If you run this code, you will get this in your terminal:
6268

6369
```bash:no-line-numbers

docs/guide/scheduling-tasks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
order: 7
2+
order: 8
33
---
44

55
# Scheduling tasks

docs/guide/state-and-events.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
---
2+
order: 7
3+
---
4+
5+
# State and events
6+
7+
The `TaskiqState` is a global variable where you can keep the variables you want to use later.
8+
For example, you want to open a database connection pool at a broker's startup.
9+
10+
This can be acieved by adding event handlers.
11+
12+
You can use one of these events:
13+
* `WORKER_STARTUP`
14+
* `CLIENT_STARTUP`
15+
* `WORKER_SHUTDOWN`
16+
* `CLIENT_SHUTDOWN`
17+
18+
Worker events are called when you start listening to the broker messages using taskiq.
19+
Client events are called when you call the `startup` method of your broker from your code.
20+
21+
This is an example of code using event handlers:
22+
23+
@[code python](../examples/state/events_example.py)
24+
25+
::: tip Cool tip!
26+
27+
If you want to add handlers programmatically, you can use the `broker.add_event_handler` function.
28+
29+
:::
30+
31+
As you can see in this example, this worker will initialize the Redis pool at the startup.
32+
You can access the state from the context.

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]
77
readme = "README.md"
88
repository = "https://github.com/taskiq-python/taskiq"
9+
homepage = "https://taskiq-python.github.io/"
10+
documentation = "https://taskiq-python.github.io/"
911
license = "LICENSE"
1012
classifiers = [
1113
"Typing :: Typed",
@@ -21,7 +23,6 @@ classifiers = [
2123
"Topic :: System :: Networking",
2224
"Development Status :: 3 - Alpha",
2325
]
24-
homepage = "https://github.com/taskiq-python/taskiq"
2526
keywords = ["taskiq", "tasks", "distributed", "async"]
2627

2728
[tool.poetry.dependencies]

taskiq/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,24 @@
88
from taskiq.brokers.shared_broker import async_shared_broker
99
from taskiq.brokers.zmq_broker import ZeroMQBroker
1010
from taskiq.context import Context
11+
from taskiq.events import TaskiqEvents
1112
from taskiq.exceptions import TaskiqError
1213
from taskiq.funcs import gather
1314
from taskiq.message import BrokerMessage, TaskiqMessage
1415
from taskiq.result import TaskiqResult
1516
from taskiq.scheduler import ScheduledTask, TaskiqScheduler
17+
from taskiq.state import TaskiqState
1618
from taskiq.task import AsyncTaskiqTask
1719

1820
__all__ = [
1921
"gather",
2022
"Context",
2123
"AsyncBroker",
2224
"TaskiqError",
25+
"TaskiqState",
2326
"TaskiqResult",
2427
"ZeroMQBroker",
28+
"TaskiqEvents",
2529
"TaskiqMessage",
2630
"BrokerMessage",
2731
"InMemoryBroker",

taskiq/abc/broker.py

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
import inspect
21
import os
32
import sys
43
from abc import ABC, abstractmethod
4+
from collections import defaultdict
55
from functools import wraps
66
from logging import getLogger
77
from typing import ( # noqa: WPS235
88
TYPE_CHECKING,
99
Any,
10+
Awaitable,
1011
Callable,
1112
Coroutine,
13+
DefaultDict,
1214
Dict,
1315
List,
1416
Optional,
@@ -18,22 +20,27 @@
1820
)
1921
from uuid import uuid4
2022

21-
from typing_extensions import ParamSpec
23+
from typing_extensions import ParamSpec, TypeAlias
2224

25+
from taskiq.abc.middleware import TaskiqMiddleware
2326
from taskiq.decor import AsyncTaskiqDecoratedTask
27+
from taskiq.events import TaskiqEvents
2428
from taskiq.formatters.json_formatter import JSONFormatter
2529
from taskiq.message import BrokerMessage
2630
from taskiq.result_backends.dummy import DummyResultBackend
31+
from taskiq.state import TaskiqState
32+
from taskiq.utils import maybe_awaitable
2733

28-
if TYPE_CHECKING:
34+
if TYPE_CHECKING: # pragma: no cover
2935
from taskiq.abc.formatter import TaskiqFormatter
30-
from taskiq.abc.middleware import TaskiqMiddleware
3136
from taskiq.abc.result_backend import AsyncResultBackend
3237

3338
_T = TypeVar("_T") # noqa: WPS111
3439
_FuncParams = ParamSpec("_FuncParams")
3540
_ReturnType = TypeVar("_ReturnType")
3641

42+
EventHandler: TypeAlias = Callable[[TaskiqState], Optional[Awaitable[None]]]
43+
3744
logger = getLogger("taskiq")
3845

3946

@@ -49,7 +56,7 @@ def default_id_generator() -> str:
4956
return uuid4().hex
5057

5158

52-
class AsyncBroker(ABC):
59+
class AsyncBroker(ABC): # noqa: WPS230
5360
"""
5461
Async broker.
5562
@@ -75,8 +82,16 @@ def __init__(
7582
self.decorator_class = AsyncTaskiqDecoratedTask
7683
self.formatter: "TaskiqFormatter" = JSONFormatter()
7784
self.id_generator = task_id_generator
78-
79-
def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
85+
# Every event has a list of handlers.
86+
# Every handler is a function which takes state as a first argument.
87+
# And handler can be either sync or async.
88+
self.event_handlers: DefaultDict[ # noqa: WPS234
89+
TaskiqEvents,
90+
List[Callable[[TaskiqState], Optional[Awaitable[None]]]],
91+
] = defaultdict(list)
92+
self.state = TaskiqState()
93+
94+
def add_middlewares(self, *middlewares: "TaskiqMiddleware") -> None:
8095
"""
8196
Add a list of middlewares.
8297
@@ -86,11 +101,23 @@ def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
86101
:param middlewares: list of middlewares.
87102
"""
88103
for middleware in middlewares:
104+
if not isinstance(middleware, TaskiqMiddleware):
105+
logger.warning(
106+
f"Middleware {middleware} is not an instance of TaskiqMiddleware. "
107+
"Skipping...",
108+
)
109+
continue
89110
middleware.set_broker(self)
90111
self.middlewares.append(middleware)
91112

92113
async def startup(self) -> None:
93114
"""Do something when starting broker."""
115+
event = TaskiqEvents.CLIENT_STARTUP
116+
if self.is_worker_process:
117+
event = TaskiqEvents.WORKER_STARTUP
118+
119+
for handler in self.event_handlers[event]:
120+
await maybe_awaitable(handler(self.state))
94121

95122
async def shutdown(self) -> None:
96123
"""
@@ -99,11 +126,13 @@ async def shutdown(self) -> None:
99126
This method is called,
100127
when broker is closig.
101128
"""
102-
for middleware in self.middlewares:
103-
middleware_shutdown = middleware.shutdown()
104-
if inspect.isawaitable(middleware_shutdown):
105-
await middleware_shutdown
106-
await self.result_backend.shutdown()
129+
event = TaskiqEvents.CLIENT_SHUTDOWN
130+
if self.is_worker_process:
131+
event = TaskiqEvents.WORKER_SHUTDOWN
132+
133+
# Call all shutdown events.
134+
for handler in self.event_handlers[event]:
135+
await maybe_awaitable(handler(self.state))
107136

108137
@abstractmethod
109138
async def kick(
@@ -232,3 +261,43 @@ def inner(
232261
inner_task_name=task_name,
233262
inner_labels=labels or {},
234263
)
264+
265+
def on_event(self, *events: TaskiqEvents) -> Callable[[EventHandler], EventHandler]:
266+
"""
267+
Adds event handler.
268+
269+
This function adds function to call when event occurs.
270+
271+
:param events: events to react to.
272+
:return: a decorator function.
273+
"""
274+
275+
def handler(function: EventHandler) -> EventHandler:
276+
for event in events:
277+
self.event_handlers[event].append(function)
278+
return function
279+
280+
return handler
281+
282+
def add_event_handler(
283+
self,
284+
event: TaskiqEvents,
285+
handler: EventHandler,
286+
) -> None:
287+
"""
288+
Adds event handler.
289+
290+
this function is the same as on_event.
291+
292+
>>> broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, my_startup)
293+
294+
if similar to:
295+
296+
>>> @broker.on_event(TaskiqEvents.WORKER_STARTUP)
297+
>>> async def my_startup(context: Context) -> None:
298+
>>> ...
299+
300+
:param event: Event to react to.
301+
:param handler: handler to call when event is started.
302+
"""
303+
self.event_handlers[event].append(handler)

0 commit comments

Comments
 (0)