-
Notifications
You must be signed in to change notification settings - Fork 32
✨ Add a Distributed Task Queue (using Celery) (🏗️ ⚠️DEVOPS) #7214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
giancarloromeo
merged 163 commits into
ITISFoundation:master
from
giancarloromeo:add-distributed-task-queue
Mar 11, 2025
Merged
Changes from 142 commits
Commits
Show all changes
163 commits
Select commit
Hold shift + click to select a range
119b9da
add distribute task queue
giancarloromeo 235c8ed
add redis database
giancarloromeo f3827c7
add settings
giancarloromeo 1a2cc8d
update reqs
giancarloromeo ce01b63
add celery task
giancarloromeo 866ca9b
add celery task queue class
giancarloromeo 9e90c73
rename
giancarloromeo b33cdae
make testable
giancarloromeo af41640
add storage worker
giancarloromeo 6890444
continue working
giancarloromeo d6b85be
continue
giancarloromeo 8a8cdd5
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 00751db
use rabbit
giancarloromeo bd27fa5
continue
giancarloromeo 2794e5b
continue
giancarloromeo f2e43b4
add unit tests
giancarloromeo dbf0e9c
continue
giancarloromeo 2bc8ed4
base working tests
giancarloromeo 6f84e1d
add progress
giancarloromeo b8a010c
continue fixing
giancarloromeo fa3e919
continue fixing
giancarloromeo 6450c2e
fix docker
giancarloromeo bc2a773
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo 4220788
working
giancarloromeo b40a24c
continue fix
giancarloromeo 341f7fa
fix tests
giancarloromeo 80753e7
update boot.sh
giancarloromeo 578573c
fix files endpoint
giancarloromeo 499d290
rename
giancarloromeo ccd17b5
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo b305a53
add abortable task
giancarloromeo a5e1128
typechecks
giancarloromeo 920a912
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 6ab33bd
add healthcheck
giancarloromeo 5b86b64
register abortable tasks by default
giancarloromeo c27f509
continue
giancarloromeo e386ff9
rename
giancarloromeo 2d4951a
add settings
giancarloromeo 6d1e32b
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 04ffc07
remove debug
giancarloromeo 54b9a11
remuve unused pytest plugin
giancarloromeo 1caeca5
fix healthcheck
giancarloromeo 0d84c14
typecheck
giancarloromeo f297a3b
fix tests
giancarloromeo d760277
fix tests
giancarloromeo c768f0e
fix import
giancarloromeo 9f52c9d
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 8a81d22
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo 3c8eb4c
add utils
giancarloromeo 780a947
update utils
giancarloromeo 830d562
fix tests
giancarloromeo b22abea
update example
giancarloromeo 5072b3b
add async interface
giancarloromeo e09a66b
add tests
giancarloromeo e94f736
improve typehints
giancarloromeo 079fd1b
improve typehint
giancarloromeo 975b4d3
remove unused
giancarloromeo 2efae6b
update
giancarloromeo 754c065
fix util
giancarloromeo 9c9adf0
refactor
giancarloromeo 3810eda
update docker
giancarloromeo e7f3fa2
typecheck
giancarloromeo dcd9da3
fix typecheck
giancarloromeo abec223
change list to set
giancarloromeo 0ce1aa4
rename
giancarloromeo 651b327
Merge branch 'master' into add-distributed-task-queue
giancarloromeo ce77046
add task context
giancarloromeo 2f96e96
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo f28cbc9
add rabbit
giancarloromeo ad2c55c
add fixture
giancarloromeo 3da4515
update interface
giancarloromeo 87a457f
typecheck
giancarloromeo d0a97fb
adapt code
giancarloromeo 6a0fc4a
rename
giancarloromeo 5bbf86c
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo 6f070ff
add stop
giancarloromeo 5b44bb9
fix import
giancarloromeo 8bb9055
move example
giancarloromeo ea709b3
change default
giancarloromeo f9ecc62
update description
giancarloromeo 2b5f143
update prefix
giancarloromeo 872baf7
add types
giancarloromeo 320095d
refactor
giancarloromeo 319570e
use suppress
giancarloromeo 432f940
use longnames
giancarloromeo 966baf6
change var names
giancarloromeo 4a2cada
fix settings
giancarloromeo ce4c2fa
log_context
giancarloromeo 7ce7d6f
remove example
giancarloromeo d1714fd
update docker compose
giancarloromeo 67ec039
remove typehint
giancarloromeo 3896794
add log
giancarloromeo 4aeb1c9
add log
giancarloromeo 558cd5d
remove comment
giancarloromeo 4f3e722
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 1107435
add task params
giancarloromeo dd7f2fa
track started tasks
giancarloromeo 4b4d5a2
add enums
giancarloromeo 50550b4
add redis on startup
giancarloromeo aa08b2a
add enum
giancarloromeo 363b15d
fix worker startup
giancarloromeo 0fdd564
fix state
giancarloromeo 46ac5df
continue
giancarloromeo 5d7115b
add validation
giancarloromeo 52d9219
remove started and stopped
giancarloromeo 38b4be7
progress not nullable
giancarloromeo c3b2f4d
removed for now
giancarloromeo 319f269
typecheck
giancarloromeo 096c3b7
fix port
giancarloromeo 02d401a
fix tests
giancarloromeo 165b198
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 0b624ec
remove register
giancarloromeo e77d2dd
add concurrency
giancarloromeo 26160a8
add fixture
giancarloromeo 3bb2c6b
fix tests
giancarloromeo e6253da
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 964a22e
fix import
giancarloromeo 994ee8f
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo c620103
progress
giancarloromeo 95898e6
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo c75ccae
fix done property
giancarloromeo 317ee66
update keys
giancarloromeo 1eab85b
add retry
giancarloromeo 99cb6b7
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo 3a7f7e9
improve error handling
giancarloromeo 8e5b1f9
add task
giancarloromeo c2be8f0
update
giancarloromeo 87fb925
fix get
giancarloromeo 9a96dd8
fix get_uuids
giancarloromeo 8df0a5c
fix
giancarloromeo c56b8b7
fix import
giancarloromeo 0ec94b6
update
giancarloromeo 403aaa1
use taskstate
giancarloromeo 65a07e0
remove unused
giancarloromeo 327c8f1
type hinting
giancarloromeo d077472
remove taskresult
giancarloromeo 59b4a08
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 6c3779f
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo 66e51d9
add celery env
giancarloromeo 3c86023
get hostname
giancarloromeo d8a848e
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 8ea33e6
fix imports
giancarloromeo c76cbd3
fix settings
giancarloromeo ed8f92c
fix
giancarloromeo 7f09d30
add comment
giancarloromeo 5b1f97a
fix docker compose
giancarloromeo 9e6284b
use settings
giancarloromeo 9d71230
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 99ba805
fix typecheck
giancarloromeo 1630c8a
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 8f95fd2
fix envs
giancarloromeo 37e8704
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo ec4bb00
fix mypy
giancarloromeo 28a440f
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 76e7fc3
fix env
giancarloromeo 3cf012e
revert mypy setting
giancarloromeo cf96a04
typecheck
giancarloromeo a61e5eb
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 6d5640e
Merge branch 'master' into add-distributed-task-queue
giancarloromeo 3133758
exclude service
giancarloromeo 5050df1
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo a71cc9a
Merge branch 'master' into add-distributed-task-queue
giancarloromeo f6232ff
exclude sto-worker
giancarloromeo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| import asyncio | ||
| import functools | ||
| from collections.abc import Awaitable, Callable | ||
| from concurrent.futures import Executor | ||
| from typing import ParamSpec, TypeVar | ||
|
|
||
| R = TypeVar("R") | ||
| P = ParamSpec("P") | ||
|
|
||
|
|
||
| def make_async( | ||
| executor: Executor | None = None, | ||
| ) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]: | ||
| def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: | ||
| @functools.wraps(func) | ||
| async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: | ||
| loop = asyncio.get_running_loop() | ||
| return await loop.run_in_executor( | ||
| executor, functools.partial(func, *args, **kwargs) | ||
| ) | ||
|
|
||
| return wrapper | ||
|
|
||
giancarloromeo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return decorator | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| import pytest | ||
| from common_library.async_tools import make_async | ||
|
|
||
|
|
||
| @make_async() | ||
| def sync_function(x: int, y: int) -> int: | ||
| return x + y | ||
|
|
||
|
|
||
| @make_async() | ||
| def sync_function_with_exception() -> None: | ||
| raise ValueError("This is an error!") | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_make_async_returns_coroutine(): | ||
| result = sync_function(2, 3) | ||
| assert asyncio.iscoroutine(result), "Function should return a coroutine" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_make_async_execution(): | ||
| result = await sync_function(2, 3) | ||
| assert result == 5, "Function should return 5" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_make_async_exception(): | ||
| with pytest.raises(ValueError, match="This is an error!"): | ||
| await sync_function_with_exception() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_make_async_with_executor(): | ||
| executor = ThreadPoolExecutor() | ||
|
|
||
| @make_async(executor) | ||
| def heavy_computation(x: int) -> int: | ||
| return x * x | ||
|
|
||
| result = await heavy_computation(4) | ||
| assert result == 16, "Function should return 16" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| from datetime import timedelta | ||
| from typing import Annotated | ||
|
|
||
| from pydantic import Field | ||
| from pydantic_settings import SettingsConfigDict | ||
| from settings_library.rabbit import RabbitSettings | ||
| from settings_library.redis import RedisSettings | ||
|
|
||
| from .base import BaseCustomSettings | ||
|
|
||
|
|
||
| class CelerySettings(BaseCustomSettings): | ||
| CELERY_RABBIT_BROKER: Annotated[ | ||
| RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True}) | ||
| ] | ||
| CELERY_REDIS_RESULT_BACKEND: Annotated[ | ||
| RedisSettings, Field(json_schema_extra={"auto_default_from_env": True}) | ||
giancarloromeo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ] | ||
| CELERY_RESULT_EXPIRES: Annotated[ | ||
| timedelta, | ||
| Field( | ||
| description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)." | ||
| ), | ||
| ] = timedelta(days=7) | ||
| CELERY_RESULT_PERSISTENT: Annotated[ | ||
| bool, | ||
| Field( | ||
| description="If set to True, result messages will be persistent (after a broker restart)." | ||
giancarloromeo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ), | ||
| ] = True | ||
|
|
||
| model_config = SettingsConfigDict( | ||
| json_schema_extra={ | ||
| "examples": [ | ||
| { | ||
| "CELERY_RABBIT_BROKER": { | ||
| "RABBIT_USER": "guest", | ||
| "RABBIT_SECURE": False, | ||
| "RABBIT_PASSWORD": "guest", | ||
| "RABBIT_HOST": "localhost", | ||
| "RABBIT_PORT": 5672, | ||
| }, | ||
| "CELERY_REDIS_RESULT_BACKEND": { | ||
| "REDIS_HOST": "localhost", | ||
| "REDIS_PORT": 6379, | ||
| }, | ||
| "CELERY_RESULT_EXPIRES": timedelta(days=1), # type: ignore[dict-item] | ||
| "CELERY_RESULT_PERSISTENT": True, | ||
| } | ||
| ], | ||
| } | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.