Skip to content

Commit 0f003bb

Browse files
authored
Merge pull request #65 from febus982/celery
Celery
2 parents d26f5d0 + 8ca31a9 commit 0f003bb

36 files changed

+702
-207
lines changed

.coveragerc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[run]
22
branch = True
3-
source = http_app, grpc_app, domains, gateways
4-
omit = grpc_app/generated/*
3+
source = http_app, grpc_app, domains, gateways, celery_worker, common
4+
omit = grpc_app/generated/*, common/config.py
55
concurrency = multiprocessing
66
parallel = true
77

Dockerfile

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ COPY --chown=nonroot:nonroot . .
4949
# when signals are propagated, we disable it in dev image default CMD
5050
CMD ["uvicorn", "http_app:create_app", "--host", "0.0.0.0", "--port", "8000", "--factory", "--reload"]
5151

52+
# Installs requirements to run production celery application
53+
FROM base_builder as celery_builder
54+
RUN poetry install --no-root
55+
5256
# Installs requirements to run production http application
5357
FROM base_builder as http_builder
5458
RUN poetry install --no-root --with http
@@ -66,8 +70,7 @@ COPY --chown=nonroot:nonroot poetry.lock .
6670
COPY --chown=nonroot:nonroot alembic ./alembic
6771
COPY --chown=nonroot:nonroot domains ./domains
6872
COPY --chown=nonroot:nonroot gateways ./gateways
69-
COPY --chown=nonroot:nonroot config.py .
70-
COPY --chown=nonroot:nonroot di_container.py .
73+
COPY --chown=nonroot:nonroot common ./common
7174
COPY --chown=nonroot:nonroot alembic.ini .
7275
COPY --chown=nonroot:nonroot Makefile .
7376

@@ -84,3 +87,10 @@ COPY --from=grpc_builder /poetryvenvs /poetryvenvs
8487
COPY --chown=nonroot:nonroot grpc_app ./grpc_app
8588
# Run CMD using array syntax, so it's uses `exec` and runs as PID1
8689
CMD ["opentelemetry-instrument", "python3", "-m", "grpc_app"]
90+
91+
# Copy the celery python package and requirements from relevant builder
92+
FROM base_app as celery_app
93+
COPY --from=celery_builder /poetryvenvs /poetryvenvs
94+
COPY --chown=nonroot:nonroot celery_worker ./celery_worker
95+
# Run CMD using array syntax, so it's uses `exec` and runs as PID1
96+
CMD ["opentelemetry-instrument", "celery", "-A", "celery_worker:app", "worker", "-l", "INFO"]

alembic/env.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,12 @@
44
from sqlalchemy.ext.asyncio import AsyncEngine
55

66
from alembic import context
7-
from config import AppConfig, init_logger
8-
from gateways.storage.SQLAlchemy import init_tables
7+
from common.bootstrap import application_init
8+
from common.config import AppConfig
99

1010
USE_TWOPHASE = False
1111

1212

13-
def init_container(app_config: AppConfig):
14-
# Workaround to avoid circular import
15-
from di_container import Container
16-
17-
return Container(config=app_config)
18-
19-
2013
# this is the Alembic Config object, which provides
2114
# access to the values within the .ini file in use.
2215
config = context.config
@@ -31,13 +24,9 @@ def init_container(app_config: AppConfig):
3124
# in the sample .ini file.
3225
# db_names = config.get_main_option("databases")
3326

34-
# TODO: Something better organised than this
35-
app_config = AppConfig()
36-
init_logger(app_config)
27+
di_container = application_init(AppConfig()).di_container
3728
logger = logging.getLogger("alembic.env")
38-
di_container = init_container(app_config)
3929
sa_manager = di_container.SQLAlchemyBindManager()
40-
init_tables()
4130

4231
target_metadata = sa_manager.get_bind_mappers_metadata()
4332
db_names = target_metadata.keys()

architecture.png

11.8 KB
Loading

architecture.puml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
allowmixing
33
skinparam componentStyle uml1
44

5-
package http_app {
5+
package celery_worker {}
6+
7+
package http_app #fff {
68
package routes #DDDDDD
79
}
810

9-
package grpc_app {
11+
package grpc_app #fff {
1012
package servicers #DDDDDD
1113
}
1214

@@ -17,6 +19,11 @@ package domains #DDDDDD {
1719

1820
package entities {
1921
class BookModel
22+
class BookEvent
23+
}
24+
25+
package tasks {
26+
class BookTask
2027
}
2128

2229
package data_access_interface {
@@ -37,6 +44,9 @@ routes --> BookService
3744
routes --> Book
3845
servicers --> BookService
3946
servicers --> Book
47+
celery_worker ---> tasks
48+
tasks <-u-> BookService
49+
tasks -u-> Book
4050

4151
'links internal to books domain
4252
BookService -l-> Book

celery_worker/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""
2+
This is a tiny layer that takes care of initialising the shared
3+
application layers (storage, logs) when running standalone workers
4+
without having to initialise the HTTP framework (or other ones)
5+
"""
6+
from common.bootstrap import application_init
7+
from common.config import AppConfig
8+
9+
app = application_init(AppConfig()).celery_app
File renamed without changes.

common/bootstrap.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from celery import Celery
2+
from dependency_injector.containers import DynamicContainer
3+
from dependency_injector.providers import Object
4+
from pydantic import BaseModel, ConfigDict
5+
6+
from domains import init_celery, init_domains
7+
from gateways.storage import init_storage
8+
9+
from .config import AppConfig, init_logger
10+
from .di_container import Container
11+
12+
13+
class InitReference(BaseModel):
14+
celery_app: Celery
15+
di_container: DynamicContainer
16+
17+
model_config = ConfigDict(arbitrary_types_allowed=True)
18+
19+
20+
def application_init(app_config: AppConfig) -> InitReference:
21+
container = Container(
22+
config=Object(app_config),
23+
)
24+
init_logger(app_config)
25+
init_domains(app_config)
26+
init_storage()
27+
celery = init_celery(app_config)
28+
29+
return InitReference(
30+
celery_app=celery,
31+
di_container=container,
32+
)

config.py renamed to common/config.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,53 @@
44

55
import structlog
66
from opentelemetry import trace
7-
from pydantic_settings import BaseSettings
7+
from pydantic import BaseModel
8+
from pydantic_settings import BaseSettings, SettingsConfigDict
89
from sqlalchemy_bind_manager import SQLAlchemyAsyncConfig
910
from structlog.typing import Processor
1011

1112
TYPE_ENVIRONMENT = Literal["local", "test", "staging", "production"]
1213

1314

15+
class CeleryConfig(BaseModel):
16+
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration
17+
18+
timezone: str = "UTC"
19+
20+
# Broker config
21+
broker_url: str = "redis://redis:6379/0"
22+
broker_connection_retry_on_startup: bool = True
23+
24+
# Results backend config
25+
result_backend: str = "redis://redis:6379/1"
26+
redis_socket_keepalive: bool = True
27+
28+
# Enable to ignore the results by default and not produce tombstones
29+
task_ignore_result: bool = False
30+
31+
# We want to use the default python logger configured using structlog
32+
worker_hijack_root_logger: bool = False
33+
34+
# Events enabled for monitoring
35+
worker_send_task_events: bool = True
36+
task_send_sent_event: bool = True
37+
38+
beat_schedule: dict = {
39+
"recurrent_example": {
40+
"task": "domains.books.tasks.book_created",
41+
"schedule": 5.0,
42+
"args": ("a-random-book-id",),
43+
},
44+
}
45+
46+
1447
class AppConfig(BaseSettings):
48+
model_config = SettingsConfigDict(env_nested_delimiter="__")
49+
50+
APP_NAME: str = "bootstrap"
51+
CELERY: CeleryConfig = CeleryConfig()
52+
DEBUG: bool = False
53+
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
1554
SQLALCHEMY_CONFIG: Dict[str, SQLAlchemyAsyncConfig] = dict(
1655
default=SQLAlchemyAsyncConfig(
1756
engine_url=f"sqlite+aiosqlite:///{os.path.dirname(os.path.abspath(__file__))}/sqlite.db",
@@ -24,8 +63,6 @@ class AppConfig(BaseSettings):
2463
),
2564
),
2665
)
27-
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
28-
DEBUG: bool = False
2966

3067

3168
def init_logger(config: AppConfig) -> None:

di_container.py renamed to common/di_container.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sqlalchemy_bind_manager import SQLAlchemyBindManager
44
from sqlalchemy_bind_manager._repository import SQLAlchemyAsyncRepository
55

6-
from config import AppConfig
6+
from common.config import AppConfig
77
from domains.books._data_access_interfaces import (
88
BookEventGatewayInterface,
99
BookRepositoryInterface,

0 commit comments

Comments
 (0)