Skip to content

Commit 7efd6ee

Browse files
authored
Merge pull request #67 from febus982/cleanup-tasks-code
Improve and document tasks application architecture
2 parents 8f7d5a7 + c5e66e8 commit 7efd6ee

File tree

7 files changed

+54
-42
lines changed

7 files changed

+54
-42
lines changed

architecture.png

597 Bytes
Loading

architecture.puml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ routes --> Book
4545
servicers --> BookService
4646
servicers --> Book
4747
celery_worker ---> tasks
48-
tasks <-u-> BookService
48+
tasks <-u- BookService
4949
tasks -u-> Book
5050

5151
'links internal to books domain

common/config.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ class CeleryConfig(BaseModel):
3535
worker_send_task_events: bool = True
3636
task_send_sent_event: bool = True
3737

38-
beat_schedule: dict = {
39-
"recurrent_example": {
40-
"task": "domains.books.tasks.book_created",
41-
"schedule": 5.0,
42-
"args": ("a-random-book-id",),
43-
},
44-
}
38+
# Recurring tasks triggered directly by Celery
39+
beat_schedule: dict = {}
40+
# beat_schedule: dict = {
41+
# "recurrent_example": {
42+
# "task": "domains.books.tasks.book_cpu_intensive_task",
43+
# "schedule": 5.0,
44+
# "args": ("a-random-book-id",),
45+
# },
46+
# }
4547

4648

4749
class AppConfig(BaseSettings):

domains/books/service.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from collections.abc import Iterable
22

33
from anyio import to_thread
4+
from celery.result import AsyncResult
45
from dependency_injector.wiring import Provide, inject
56
from structlog import get_logger
67

@@ -9,6 +10,7 @@
910

1011
from ._data_access_interfaces import BookEventGatewayInterface, BookRepositoryInterface
1112
from .dto import Book, BookData
13+
from .tasks import book_cpu_intensive_task
1214

1315

1416
class BookService:
@@ -30,13 +32,13 @@ def __init__(
3032
self.event_gateway = event_gateway
3133

3234
async def create_book(self, book: BookData) -> Book:
33-
# Example of CPU intensive task, run in a different thread
34-
# Using processes could be better, but it would bring technical complexity
35-
# https://anyio.readthedocs.io/en/3.x/subprocesses.html#running-functions-in-worker-processes
36-
book_data_altered = await to_thread.run_sync(
37-
some_cpu_intensive_blocking_task, book.model_dump()
38-
)
39-
book_model = BookModel(**book_data_altered)
35+
# Example of CPU intensive task, run in a celery task
36+
book_task: AsyncResult = book_cpu_intensive_task.delay(book)
37+
# task.get() would block the application, we run it in a thread to remain async
38+
# we can also build a wrapper coroutine to do this using `asyncio.sleep`
39+
# and poll the AsyncResult class in case we do not want to use threads
40+
book_data_altered: BookData = await to_thread.run_sync(book_task.get)
41+
book_model = BookModel(**book_data_altered.model_dump())
4042
book = Book.model_validate(
4143
await self.book_repository.save(book_model), from_attributes=True
4244
)
@@ -56,13 +58,6 @@ async def list_books(self) -> Iterable[Book]:
5658
return [Book.model_validate(x, from_attributes=True) for x in books]
5759

5860
async def book_created_event_handler(self, book_id) -> None: # pragma: no cover
59-
# This is just an example placeholder,
60-
# there's nothing to test.
61+
# This is just an example placeholder, there's nothing to test.
6162
logger = get_logger()
6263
await logger.ainfo(f"Processed book crated event for id `{book_id}`")
63-
64-
65-
def some_cpu_intensive_blocking_task(book: dict) -> dict:
66-
# This is just an example placeholder,
67-
# there's nothing to test.
68-
return book # pragma: no cover

domains/books/tasks.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
1-
from asgiref.sync import async_to_sync
1+
"""
2+
Tasks defined in this module are considered intensive operations
3+
that happen as part of one of the BookService methods,
4+
therefore we shouldn't invoke again the book service directly
5+
from here.
6+
7+
Tasks that invoke the BookService could exist (e.g. an event
8+
worker), there are 2 options to implement them:
9+
- Create a different module, that would behave similar to HTTP
10+
routes, and invoke the service from there.
11+
- Invoke the service using inversion of control.
12+
13+
IMPORTANT: It's dangerous to have nested task when they depend
14+
on each other's result. If you find yourself in this scenario
15+
it is probably better to redesign your application. If this is
16+
not possible, then celery provides task synchronisation primitives.
17+
18+
https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks
19+
"""
220
from celery import shared_task
321

4-
from domains.books.service import BookService
22+
from domains.books.dto import BookData
523

624

725
@shared_task()
8-
def book_created(book_id):
9-
book_service = BookService()
10-
11-
"""
12-
This might not work if we call directly hello() from an app with
13-
an already running async loop. It would be great having either
14-
native async support or a better solution to identify running loops.
15-
For now, we'll assume this is always executed in the worker.
16-
In order to call this directly we might have to use the opposite
17-
`sync_to_async` wrapper from asgiref.sync
18-
"""
19-
return async_to_sync(book_service.book_created_event_handler)(book_id)
26+
def book_cpu_intensive_task(book: BookData) -> BookData:
27+
return book

tests/domains/books/test_book_service.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
from unittest.mock import AsyncMock, MagicMock
1+
from unittest.mock import AsyncMock, MagicMock, patch
22

33
from domains.books import dto, service
44
from domains.books._data_access_interfaces import BookEventGatewayInterface
55
from domains.books.models import BookModel
6+
from domains.books.tasks import book_cpu_intensive_task
67

78

89
async def test_create_book(book_repository):
@@ -15,7 +16,12 @@ async def test_create_book(book_repository):
1516
title="test",
1617
author_name="other",
1718
)
18-
returned_book = await book_service.create_book(book)
19+
mocked_task_return = MagicMock
20+
mocked_task_return.get = MagicMock(return_value=book_cpu_intensive_task(book))
21+
with patch.object(
22+
book_cpu_intensive_task, "delay", return_value=mocked_task_return
23+
):
24+
returned_book = await book_service.create_book(book)
1925
assert book.title == returned_book.title
2026
assert book.author_name == returned_book.author_name
2127
assert returned_book.book_id is not None
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from unittest.mock import MagicMock, patch
22

3+
from domains.books.dto import BookData
34
from domains.books.service import BookService
4-
from domains.books.tasks import book_created
5+
from domains.books.tasks import book_cpu_intensive_task
56

67

78
@patch.object(BookService, "book_created_event_handler", return_value=None)
89
def test_book_created_task(mocked_task_handler: MagicMock):
9-
book_created(123)
10-
mocked_task_handler.assert_called_once_with(123)
10+
b = BookData(title="AA", author_name="BB")
11+
assert book_cpu_intensive_task(b) == b

0 commit comments

Comments
 (0)