Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
3862105
refactor: move routes modules
giancarloromeo Jun 30, 2025
efd3900
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 1, 2025
f769929
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 2, 2025
c91bda1
Merge branch 'master' into add-notifications-service
giancarloromeo Jul 2, 2025
b9b64cd
feat: add skeletons and schemas
giancarloromeo Jul 2, 2025
cc93001
feat: add reqs
giancarloromeo Jul 2, 2025
f196f2e
fix: settings
giancarloromeo Jul 2, 2025
0e890d7
fix: add Celery worker setup
giancarloromeo Jul 2, 2025
217d6a5
feat: docker boot
giancarloromeo Jul 3, 2025
02be423
feat: refactor models
giancarloromeo Jul 3, 2025
0ba8ce4
fix: req deps
giancarloromeo Jul 3, 2025
31b06bd
fix: docker scripts
giancarloromeo Jul 3, 2025
19a467a
fix: add package
giancarloromeo Jul 3, 2025
e153e84
fix: settings
giancarloromeo Jul 3, 2025
6d5c30e
fix: set celery queue
giancarloromeo Jul 3, 2025
92b2714
fix: env vars
giancarloromeo Jul 3, 2025
ca423cf
fix: startup
giancarloromeo Jul 3, 2025
abbeaa4
fix: lifespan
giancarloromeo Jul 3, 2025
1c50f23
refactor: queue names
giancarloromeo Jul 3, 2025
610906c
fix: recipient
giancarloromeo Jul 3, 2025
cf92a77
fix: log
giancarloromeo Jul 3, 2025
44d76ff
refactor: change name
giancarloromeo Jul 3, 2025
a41e756
fix: task name
giancarloromeo Jul 3, 2025
bbc2859
tests: add pytest-celery
giancarloromeo Jul 3, 2025
7938e6f
tests: add mocks
giancarloromeo Jul 3, 2025
77b398a
tests: fix name
giancarloromeo Jul 3, 2025
79dcafa
tests: linting
giancarloromeo Jul 3, 2025
3d8c0ec
Merge branch 'master' into add-notifications-service
giancarloromeo Jul 3, 2025
d9a2234
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 3, 2025
f2534d5
Merge branch 'add-notifications-service' of github.com:giancarloromeo…
giancarloromeo Jul 3, 2025
95991af
refactor: move models
giancarloromeo Jul 4, 2025
50ea6f9
refactor: models
giancarloromeo Jul 4, 2025
60db2ca
tests: add rabbitmq client
giancarloromeo Jul 4, 2025
421d5f8
fix: imports
giancarloromeo Jul 4, 2025
2bda0d8
fix: update import
giancarloromeo Jul 4, 2025
e23cc2e
refactor: move task metadata
giancarloromeo Jul 4, 2025
380820e
fix: param name
giancarloromeo Jul 4, 2025
accab18
feat: add rabbitmq call
giancarloromeo Jul 7, 2025
a26d82c
tests: add test
giancarloromeo Jul 7, 2025
30ff8c9
tests: fix initialization
giancarloromeo Jul 7, 2025
116e355
fix: add notification library dependency
giancarloromeo Jul 7, 2025
a49127a
Merge branch 'master' into add-notifications-service
giancarloromeo Jul 7, 2025
ab3de30
fix: typecheck
giancarloromeo Jul 7, 2025
c6380a1
Merge branch 'add-notifications-service' of github.com:giancarloromeo…
giancarloromeo Jul 7, 2025
ebd8cba
fix: dependency
giancarloromeo Jul 7, 2025
dacbe7b
tests: add test client
giancarloromeo Jul 7, 2025
dcae6ec
tests: fix health
giancarloromeo Jul 7, 2025
f9fb772
tests: fix service selection
giancarloromeo Jul 7, 2025
09d6026
fix: rename
giancarloromeo Jul 8, 2025
6dd7fd7
tests: update models
giancarloromeo Jul 8, 2025
a657d3e
feat: add async_jobs
giancarloromeo Jul 8, 2025
628042c
refactor: decouple models
giancarloromeo Jul 9, 2025
817b2a7
refactor: packages
giancarloromeo Jul 9, 2025
6088a9a
fix: validate phone numbers
giancarloromeo Jul 9, 2025
ac0cdff
fix: add phonennumbers dep
giancarloromeo Jul 9, 2025
f064499
refactor: models
giancarloromeo Jul 9, 2025
e021ae3
fix: events
giancarloromeo Jul 9, 2025
f322d02
feat: revert postgres removal
giancarloromeo Jul 9, 2025
7cecec8
fix: add postgres deps
giancarloromeo Jul 9, 2025
1c87d24
fix: postgres deps
giancarloromeo Jul 9, 2025
b3074b4
tests: fix postgres
giancarloromeo Jul 9, 2025
8b06ed3
tests: fix postgres plugin
giancarloromeo Jul 9, 2025
4793012
fix: typecheck
giancarloromeo Jul 9, 2025
158f780
fix: test
giancarloromeo Jul 9, 2025
a63532a
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 10, 2025
40729d8
fix: task filter import
giancarloromeo Jul 10, 2025
c318e90
fix: imports
giancarloromeo Jul 11, 2025
24c1154
fix: models
giancarloromeo Jul 11, 2025
8d416f7
tests: disable tracing
giancarloromeo Jul 14, 2025
999f337
feat: improve models
giancarloromeo Jul 14, 2025
5d9fc5c
feat: add dumps util
giancarloromeo Jul 14, 2025
906c67f
tests: add new test
giancarloromeo Jul 14, 2025
eebbeb2
fix: add missing ipinfo
giancarloromeo Jul 14, 2025
1a3eca3
fix: address display name default
giancarloromeo Jul 14, 2025
5f03a50
fix: authors
giancarloromeo Jul 15, 2025
c12db38
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 15, 2025
4fbac04
Merge remote-tracking branch 'upstream/master' into add-notifications…
giancarloromeo Jul 15, 2025
ef872a3
fix: typecheck
giancarloromeo Jul 15, 2025
3840df1
Merge branch 'master' into add-notifications-service
giancarloromeo Aug 25, 2025
77197a8
fix: upgrade click
giancarloromeo Aug 25, 2025
fb1cd83
Merge branch 'master' into add-notifications-service
giancarloromeo Aug 25, 2025
d0b4dda
Merge branch 'master' into pr/giancarloromeo/8011
pcrespov Aug 26, 2025
919dfee
mionr
pcrespov Aug 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions packages/celery-library/src/celery_library/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
from common_library.async_tools import make_async
from models_library.progress_bar import ProgressReport
from servicelib.celery.models import (
TASK_QUEUE_DEFAULT,
Task,
TaskFilter,
TaskID,
TaskInfoStore,
TaskMetadata,
TaskName,
TaskQueue,
TaskState,
TaskStatus,
TaskUUID,
Expand All @@ -38,34 +41,43 @@ class CeleryTaskManager:
_celery_settings: CelerySettings
_task_info_store: TaskInfoStore

async def submit_task(
async def send_task(
self,
task_metadata: TaskMetadata,
*,
task_name: TaskName,
task_filter: TaskFilter,
*,
task_ephemeral: bool = True,
task_queue: TaskQueue = TASK_QUEUE_DEFAULT,
**task_params,
) -> TaskUUID:
with log_context(
_logger,
logging.DEBUG,
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
msg=f"Send {task_name=}: {task_filter=} {task_params=}",
):
task_uuid = uuid4()
task_id = build_task_id(task_filter, task_uuid)
self._celery_app.send_task(
task_metadata.name,
task_name,
task_id=task_id,
kwargs={"task_id": task_id} | task_params,
queue=task_metadata.queue.value,
queue=task_queue,
)

expiry = (
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
if task_metadata.ephemeral
if task_ephemeral
else self._celery_settings.CELERY_RESULT_EXPIRES
)

await self._task_info_store.create_task(
task_id, task_metadata, expiry=expiry
task_id,
TaskMetadata(
name=task_name,
ephemeral=task_ephemeral,
queue=task_queue,
),
expiry=expiry,
)
return task_uuid

Expand Down Expand Up @@ -97,6 +109,7 @@ async def get_task_result(
msg=f"Get task result: {task_filter=} {task_uuid=}",
):
task_id = build_task_id(task_filter, task_uuid)

async_result = self._celery_app.AsyncResult(task_id)
result = async_result.result
if async_result.ready():
Expand All @@ -106,7 +119,7 @@ async def get_task_result(
await self._task_info_store.remove_task(task_id)
return result

async def _get_task_progress_report(
async def _get_progress_report(
self, task_filter: TaskFilter, task_uuid: TaskUUID, task_state: TaskState
) -> ProgressReport:
if task_state in (TaskState.STARTED, TaskState.RETRY, TaskState.ABORTED):
Expand Down Expand Up @@ -144,7 +157,7 @@ async def get_task_status(
return TaskStatus(
task_uuid=task_uuid,
task_state=task_state,
progress_report=await self._get_task_progress_report(
progress_report=await self._get_progress_report(
task_filter, task_uuid, task_state
),
)
Expand Down
2 changes: 1 addition & 1 deletion packages/celery-library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs):


@pytest.fixture
async def celery_task_manager(
async def task_manager(
celery_app: Celery,
celery_settings: CelerySettings,
with_celery_worker: TestWorkController,
Expand Down
16 changes: 8 additions & 8 deletions packages/celery-library/tests/unit/test_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from models_library.rabbitmq_basic_types import RPCNamespace
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata
from servicelib.celery.models import TaskFilter, TaskID
from servicelib.celery.task_manager import TaskManager
from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
Expand Down Expand Up @@ -83,8 +83,8 @@ async def rpc_sync_job(
) -> AsyncJobGet:
task_name = sync_job.__name__
task_filter = TaskFilter.model_validate(job_filter.model_dump())
task_uuid = await task_manager.submit_task(
TaskMetadata(name=task_name), task_filter=task_filter, **kwargs
task_uuid = await task_manager.send_task(
task_name=task_name, task_filter=task_filter, **kwargs
)

return AsyncJobGet(job_id=task_uuid, job_name=task_name)
Expand All @@ -96,8 +96,8 @@ async def rpc_async_job(
) -> AsyncJobGet:
task_name = async_job.__name__
task_filter = TaskFilter.model_validate(job_filter.model_dump())
task_uuid = await task_manager.submit_task(
TaskMetadata(name=task_name), task_filter=task_filter, **kwargs
task_uuid = await task_manager.send_task(
task_name=task_name, task_filter=task_filter, **kwargs
)

return AsyncJobGet(job_id=task_uuid, job_name=task_name)
Expand Down Expand Up @@ -141,13 +141,13 @@ async def async_job(task: Task, task_id: TaskID, action: Action, payload: Any) -

@pytest.fixture
async def register_rpc_routes(
async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, celery_task_manager: TaskManager
async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, task_manager: TaskManager
) -> None:
await async_jobs_rabbitmq_rpc_client.register_router(
_async_jobs.router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=celery_task_manager
_async_jobs.router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager
)
await async_jobs_rabbitmq_rpc_client.register_router(
router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=celery_task_manager
router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager
)


Expand Down
59 changes: 23 additions & 36 deletions packages/celery-library/tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from servicelib.celery.models import (
TaskFilter,
TaskID,
TaskMetadata,
TaskState,
)
from servicelib.logging_utils import log_context
Expand Down Expand Up @@ -91,14 +90,12 @@ def _(celery_app: Celery) -> None:


async def test_submitting_task_calling_async_function_results_with_success_state(
celery_task_manager: CeleryTaskManager,
task_manager: CeleryTaskManager,
):
task_filter = TaskFilter(user_id=42)

task_uuid = await celery_task_manager.submit_task(
TaskMetadata(
name=fake_file_processor.__name__,
),
task_uuid = await task_manager.send_task(
task_name=fake_file_processor.__name__,
task_filter=task_filter,
files=[f"file{n}" for n in range(5)],
)
Expand All @@ -109,26 +106,22 @@ async def test_submitting_task_calling_async_function_results_with_success_state
stop=stop_after_delay(30),
):
with attempt:
status = await celery_task_manager.get_task_status(task_filter, task_uuid)
status = await task_manager.get_task_status(task_filter, task_uuid)
assert status.task_state == TaskState.SUCCESS

assert (
await celery_task_manager.get_task_status(task_filter, task_uuid)
await task_manager.get_task_status(task_filter, task_uuid)
).task_state == TaskState.SUCCESS
assert (
await celery_task_manager.get_task_result(task_filter, task_uuid)
) == "archive.zip"
assert (await task_manager.get_task_result(task_filter, task_uuid)) == "archive.zip"


async def test_submitting_task_with_failure_results_with_error(
celery_task_manager: CeleryTaskManager,
task_manager: CeleryTaskManager,
):
task_filter = TaskFilter(user_id=42)

task_uuid = await celery_task_manager.submit_task(
TaskMetadata(
name=failure_task.__name__,
),
task_uuid = await task_manager.send_task(
task_name=failure_task.__name__,
task_filter=task_filter,
)

Expand All @@ -139,56 +132,50 @@ async def test_submitting_task_with_failure_results_with_error(
):

with attempt:
raw_result = await celery_task_manager.get_task_result(
task_filter, task_uuid
)
raw_result = await task_manager.get_task_result(task_filter, task_uuid)
assert isinstance(raw_result, TransferrableCeleryError)

raw_result = await celery_task_manager.get_task_result(task_filter, task_uuid)
raw_result = await task_manager.get_task_result(task_filter, task_uuid)
assert f"{raw_result}" == "Something strange happened: BOOM!"


async def test_cancelling_a_running_task_aborts_and_deletes(
celery_task_manager: CeleryTaskManager,
task_manager: CeleryTaskManager,
):
task_filter = TaskFilter(user_id=42)

task_uuid = await celery_task_manager.submit_task(
TaskMetadata(
name=dreamer_task.__name__,
),
task_uuid = await task_manager.send_task(
task_name=dreamer_task.__name__,
task_filter=task_filter,
)

await asyncio.sleep(3.0)

await celery_task_manager.cancel_task(task_filter, task_uuid)
await task_manager.cancel_task(task_filter, task_uuid)

for attempt in Retrying(
retry=retry_if_exception_type(AssertionError),
wait=wait_fixed(1),
stop=stop_after_delay(30),
):
with attempt:
progress = await celery_task_manager.get_task_status(task_filter, task_uuid)
progress = await task_manager.get_task_status(task_filter, task_uuid)
assert progress.task_state == TaskState.ABORTED

assert (
await celery_task_manager.get_task_status(task_filter, task_uuid)
await task_manager.get_task_status(task_filter, task_uuid)
).task_state == TaskState.ABORTED

assert task_uuid not in await celery_task_manager.list_tasks(task_filter)
assert task_uuid not in await task_manager.list_tasks(task_filter)


async def test_listing_task_uuids_contains_submitted_task(
celery_task_manager: CeleryTaskManager,
task_manager: CeleryTaskManager,
):
task_filter = TaskFilter(user_id=42)

task_uuid = await celery_task_manager.submit_task(
TaskMetadata(
name=dreamer_task.__name__,
),
task_uuid = await task_manager.send_task(
task_name=dreamer_task.__name__,
task_filter=task_filter,
)

Expand All @@ -198,8 +185,8 @@ async def test_listing_task_uuids_contains_submitted_task(
stop=stop_after_delay(10),
):
with attempt:
tasks = await celery_task_manager.list_tasks(task_filter)
tasks = await task_manager.list_tasks(task_filter)
assert any(task.uuid == task_uuid for task in tasks)

tasks = await celery_task_manager.list_tasks(task_filter)
tasks = await task_manager.list_tasks(task_filter)
assert any(task.uuid == task_uuid for task in tasks)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from re import Pattern
from typing import Annotated, Final, TypeAlias

from pydantic import Field
from pydantic import Field, StringConstraints
from pydantic_core import core_schema

# https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Registered_ports
Expand Down Expand Up @@ -77,3 +77,12 @@ class LongTruncatedStr(ConstrainedStr):
# Analogous to ShortTruncatedStr
strip_whitespace = True
curtail_length = 65536 # same as github descripton


NotEmptyStr: TypeAlias = Annotated[
str,
StringConstraints(
min_length=1,
strip_whitespace=True,
),
]
5 changes: 3 additions & 2 deletions packages/models-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
arrow
jsonschema
orjson
pydantic-extra-types
pydantic-settings
phonenumbers
pydantic[email]
pydantic-settings
pydantic-extra-types
2 changes: 2 additions & 0 deletions packages/models-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ orjson==3.10.15
# -c requirements/../../../requirements/constraints.txt
# -r requirements/../../../packages/common-library/requirements/_base.in
# -r requirements/_base.in
phonenumbers==9.0.9
# via -r requirements/_base.in
pydantic==2.11.7
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ._notifications import Notification

__all__: tuple[str, ...] = ("Notification",)

# nopycln: file
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Annotated

from pydantic import BaseModel, Field

from .channels import Channel
from .events import Event


class Notification(BaseModel):
event: Annotated[Event, Field(discriminator="type")]
channel: Annotated[Channel, Field(discriminator="type")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import TypeAlias

from ._email_channel import EmailAddress, EmailChannel
from ._sms_channel import SMSChannel

Channel: TypeAlias = EmailChannel | SMSChannel


__all__: tuple[str, ...] = (
"Channel",
"EmailAddress",
"EmailChannel",
"SMSChannel",
)

# nopycln: file
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Literal

from pydantic import BaseModel, EmailStr


class EmailAddress(BaseModel):
display_name: str = ""
addr_spec: EmailStr


class EmailChannel(BaseModel):
type: Literal["email"] = "email"

from_addr: EmailAddress
to_addr: EmailAddress
reply_to_addr: EmailAddress | None = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Literal

from pydantic import BaseModel
from pydantic_extra_types.phone_numbers import PhoneNumber


class SMSChannel(BaseModel):
type: Literal["sms"] = "sms"

phone_number: PhoneNumber
Loading
Loading