Skip to content

Commit e5f9faf

Browse files
committed
feat: add celery
1 parent 5a6da9d commit e5f9faf

File tree

10 files changed

+254
-5
lines changed

10 files changed

+254
-5
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ repos:
5252
hooks:
5353
# Linter
5454
- id: ruff
55-
types_or: [python, pyi, jupyter, toml]
55+
types_or: [python, pyi, jupyter, pyproject]
5656
args: [--fix, --exit-non-zero-on-fix]
5757
# Formatter
5858
- id: ruff-format
59-
types_or: [python, pyi, jupyter, toml]
59+
types_or: [python, pyi, jupyter, pyproject]
6060

6161
- repo: https://github.com/RobertCraigie/pyright-python
6262
rev: v1.1.398
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import shlex
2+
import subprocess
3+
import typing
4+
from pathlib import Path
5+
6+
from django.core.management.base import BaseCommand
7+
from django.utils import autoreload
8+
9+
WORKER_STATE_DIR = Path("/var/run/celery")
10+
11+
CMD = "celery -A main worker -E --concurrency=2 -l info"
12+
13+
14+
def restart_celery(*args: typing.Any, **kwargs: typing.Any):
15+
kill_worker_cmd = "pkill -9 celery"
16+
subprocess.call(shlex.split(kill_worker_cmd)) # noqa: S603
17+
subprocess.call(shlex.split(CMD)) # noqa: S603
18+
19+
20+
class Command(BaseCommand):
21+
@typing.override
22+
def handle(self, *args: typing.Any, **options: typing.Any):
23+
self.stdout.write("Starting celery worker with autoreload...")
24+
if not Path.exists(WORKER_STATE_DIR):
25+
Path.mkdir(WORKER_STATE_DIR, parents=True)
26+
autoreload.run_with_reloader(restart_celery, args=None, kwargs=None)

apps/common/management/commands/wait_for_resources.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55

66
import requests
77
from django.conf import settings
8+
from django.core.cache import cache
89
from django.core.management.base import BaseCommand, CommandParser
910
from django.db import connections
1011
from django.db.utils import OperationalError
12+
from redis.exceptions import ConnectionError as RedisConnectionError
1113

1214

1315
class TimeoutException(Exception): ...
@@ -37,6 +39,25 @@ def wait_for_db(self):
3739

3840
self.stdout.write(self.style.SUCCESS(f"DB is available after {time.time() - start_time} seconds"))
3941

42+
def wait_for_redis(self):
43+
self.stdout.write("Waiting for Redis...")
44+
redis_conn = None
45+
start_time = time.time()
46+
while True:
47+
try:
48+
cache.set("wait-for-it-ping", "pong", timeout=1) # Set a key to check Redis availability
49+
redis_conn = cache.get("wait-for-it-ping") # Try to get the value back from Redis
50+
if redis_conn != "pong":
51+
raise TypeError
52+
break
53+
except (RedisConnectionError, TypeError):
54+
...
55+
# Try again
56+
self.stdout.write(self.style.WARNING("Redis not available, waiting..."))
57+
time.sleep(1)
58+
59+
self.stdout.write(self.style.SUCCESS(f"Redis is available after {time.time() - start_time} seconds"))
60+
4061
def wait_for_minio(self):
4162
self.stdout.write("Waiting for Minio...")
4263
AWS_S3_CONFIG_OPTIONS = getattr(settings, "AWS_S3_CONFIG_OPTIONS", None) or {}
@@ -69,6 +90,8 @@ def add_arguments(self, parser: CommandParser):
6990
help="The maximum time (in seconds) the command is allowed to run before timing out. Default is 10 min.",
7091
)
7192
parser.add_argument("--db", action="store_true", help="Wait for DB to be available")
93+
parser.add_argument("--celery-queue", action="store_true", help="Wait for Celery queue to be available")
94+
parser.add_argument("--redis", action="store_true", help="Wait for Redis to be available")
7295
parser.add_argument("--minio", action="store_true", help="Wait for MinIO (S3) storage to be available")
7396
parser.add_argument("--all", action="store_true", help="Wait for all to be available")
7497

@@ -86,6 +109,10 @@ def handle(self, **kwargs: typing.Any):
86109
self.wait_for_db()
87110
if _all or kwargs["minio"]:
88111
self.wait_for_minio()
112+
if _all or kwargs["redis"]:
113+
self.wait_for_redis()
114+
if _all or kwargs["celery_queue"]:
115+
self.wait_for_redis()
89116
except TimeoutException:
90117
...
91118
finally:

docker-compose.yaml

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ x-server: &base_server_setup
4242
- ipython_data_local:/root/.ipython/profile_default # persist ipython data, including ipython history
4343
depends_on:
4444
- db
45+
- redis
46+
- mailpit
47+
48+
x-worker: &base_worker_setup
49+
<<: *base_server_setup
50+
restart: unless-stopped
51+
environment:
52+
<<: *base_server_environments
53+
APP_TYPE: "WORKER"
4554

4655
services:
4756
db:
@@ -53,12 +62,17 @@ services:
5362
volumes:
5463
- postgres_data17:/var/lib/postgresql/data
5564

65+
redis:
66+
image: redis:8
67+
volumes:
68+
- redis_data:/data
69+
5670
mailpit:
5771
image: axllent/mailpit
5872
container_name: mailpit
5973
restart: unless-stopped
6074
volumes:
61-
- mailpit-data:/data
75+
- mailpit_data:/data
6276
ports:
6377
- 8025:8025
6478
environment:
@@ -77,8 +91,20 @@ services:
7791
- 127.0.0.1:8000:8000
7892
depends_on:
7993
- db
94+
- redis
95+
96+
worker:
97+
<<: *base_worker_setup
98+
command: bash -c "/code/misc/dev/run_worker.sh"
99+
healthcheck:
100+
test: ["CMD-SHELL", "celery -A main inspect ping -d celery@$$HOSTNAME || exit 1"]
101+
interval: 30s
102+
timeout: 5s
103+
retries: 3
104+
start_period: 30s
80105

81106
volumes:
82107
postgres_data17:
83108
ipython_data_local:
84-
mailpit-data:
109+
mailpit_data:
110+
redis_data:

main/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
import django_stubs_ext
22

3+
from .celery import app as celery_app
4+
5+
__all__ = ("celery_app",)
6+
37
django_stubs_ext.monkeypatch()

main/celery.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import logging
2+
import os
3+
4+
from celery import Celery
5+
6+
logger = logging.getLogger(__name__)
7+
8+
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
9+
10+
app = Celery("main")
11+
12+
app.config_from_object("django.conf:settings", namespace="CELERY")
13+
14+
15+
app.autodiscover_tasks()
16+
17+
18+
@app.task(bind=True) # type: ignore[reportIncompatibleVariableOverride]
19+
def debug_task(self):
20+
logger.info("Request: %s", self.request)

main/settings.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
DEFAULT_FROM_EMAIL=(str, None),
6060
EMAIL_TO=(str, None),
6161
EMAIL_USE_TLS=(bool, False),
62+
# celery
63+
CELERY_REDIS_URL=str, # redis://redis:6379/0
64+
# Cache
65+
CACHE_REDIS_URL=str, # redis://redis:6379/1
66+
TEST_CACHE_REDIS_URL=(str, None), # redis://redis:6379/11
6267
)
6368

6469

@@ -405,3 +410,9 @@
405410
EMAIL_HOST_PASSWORD = env("EMAIL_HOST_PASSWORD")
406411
DEFAULT_FROM_EMAIL = env("DEFAULT_FROM_EMAIL")
407412
EMAIL_TO = env("EMAIL_TO")
413+
414+
# Celery
415+
CELERY_REDIS_URL = env("CELERY_REDIS_URL")
416+
CACHE_REDIS_URL = env("CACHE_REDIS_URL")
417+
CELERY_BROKER_URL = CELERY_REDIS_URL
418+
CELERY_RESULT_BACKEND = CELERY_REDIS_URL

misc/dev/run_worker.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash -e
2+
3+
REDIS_HOST_PORT=$(echo $CELERY_REDIS_URL | sed 's|redis://\([^/]*\)/.*|\1|')
4+
5+
./manage.py wait_for_resources --db --celery-queue
6+
7+
./manage.py run_celery_dev

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"django-mdeditor>=0.1.20",
3333
"django-simple-captcha>=0.6.3",
3434
"django-stubs>=5.1.3",
35+
"celery>=5.6.3",
3536
]
3637

3738
[dependency-groups]
@@ -62,7 +63,6 @@ extend-exclude = [
6263
"venv",
6364
"**/migrations/*",
6465
"__pycache__",
65-
"uv.lock"
6666
]
6767

6868
[tool.ruff.lint]

0 commit comments

Comments
 (0)