Skip to content

Commit 9a0516f

Browse files
committed
setup celery for email send task
1 parent 2c34093 commit 9a0516f

File tree

16 files changed

+313
-39
lines changed

16 files changed

+313
-39
lines changed

docker/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,6 @@ ENV PATH /${IMAGE_NAME}/venv/bin:$PATH
7575
WORKDIR /${IMAGE_NAME}
7676
COPY . ./
7777

78-
RUN chmod +x /${IMAGE_NAME}/docker/entrypoint.sh
78+
RUN chmod +x /${IMAGE_NAME}/scripts/entrypoint.sh
7979

80-
ENTRYPOINT ["sh", "/docker/entrypoint.sh"]
80+
ENTRYPOINT ["sh", "scripts/entrypoint.sh"]

docker/docker-compose.yaml

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ services:
1313
- ./data/pgdata:/var/lib/postgresql/my_data
1414
env_file:
1515
- ../env/.env
16+
environment:
17+
- SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@fastapi-2fa-db:5432/postgres
18+
- CELERY_BROKER_URL=amqp://admin:admin@fastapi-2fa-rabbitmq:5672/
19+
- result_backend=redis://fastapi-2fa-cache:6379/0
20+
1621
ports:
1722
- "5454:5432"
18-
networks:
19-
- servicenetwork
2023

2124
## REDIS
2225
redis:
@@ -25,11 +28,62 @@ services:
2528
restart: always
2629
env_file:
2730
- ../env/.env
31+
environment:
32+
- SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@fastapi-2fa-db:5432/postgres
33+
- CELERY_BROKER_URL=amqp://admin:admin@fastapi-2fa-rabbitmq:5672/
34+
- result_backend=redis://fastapi-2fa-cache:6379/0
2835
ports:
2936
- "6389:6379"
30-
networks:
31-
- servicenetwork
3237

33-
networks:
34-
servicenetwork:
35-
driver: bridge
38+
rabbitmq:
39+
container_name: fastapi-2fa-rabbitmq
40+
image: rabbitmq:3-management
41+
env_file:
42+
- ../env/.env
43+
environment:
44+
- SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@fastapi-2fa-db:5432/postgres
45+
- CELERY_BROKER_URL=amqp://admin:admin@fastapi-2fa-rabbitmq:5672/
46+
- result_backend=redis://fastapi-2fa-cache:6379/0
47+
ports:
48+
- "5672:5672"
49+
50+
celery_worker:
51+
container_name: fastapi-2fa-celery
52+
build:
53+
context: ..
54+
dockerfile: ./docker/Dockerfile
55+
image: fastapi_2fa_celery_worker
56+
entrypoint: /scripts/start_celery_worker.sh
57+
env_file:
58+
- ../env/.env
59+
environment:
60+
- SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@fastapi-2fa-db:5432/postgres
61+
- CELERY_BROKER_URL=amqp://admin:admin@fastapi-2fa-rabbitmq:5672/
62+
- result_backend=redis://fastapi-2fa-cache:6379/0
63+
depends_on:
64+
- redis
65+
- db
66+
- rabbitmq
67+
68+
flower:
69+
container_name: fastapi-2fa-flower
70+
build:
71+
context: ..
72+
dockerfile: ./docker/Dockerfile
73+
image: fastapi_2fa_celery_flower
74+
entrypoint: /scripts/start_celery_flower.sh
75+
volumes:
76+
- flower_db:/app/flower_db
77+
env_file:
78+
- ../env/.env
79+
ports:
80+
- 5557:5557
81+
depends_on:
82+
- redis
83+
- db
84+
- rabbitmq
85+
- celery_worker
86+
87+
volumes:
88+
pgdata:
89+
flower_db:

env/.env

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
FASTAPI_CONFIG=development
2-
32
FAKE_EMAIL_SENDER=[email protected]
4-
53
API_V1_STR=/api/v1
64
PROJECT_NAME=fastapi-2fa
75

6+
# # JWT
87
JWT_SECRET_KEY=9d58ca3446806034a68b02cbcdf69d8f
98
JWT_SECRET_KEY_REFRESH=mXuqbFTs2kTmz+6rBpJh1B4T+zHQh8Nq1nc7BOrWTb4=
109
PRE_TFA_SECRET_KEY=11rWUgZLTckPHI7KO1SPbFgG1OILrDQjI7v9Q7KgYZw=
1110
ALGORITHM=HS256
11+
ACCESS_TOKEN_EXPIRE_MINUTES=30
12+
REFRESH_TOKEN_EXPIRE_MINUTES=1440 # 24 h
1213

14+
# # 2 FACTOR AUTHENTICATION
1315
FERNET_KEY_TFA_TOKEN=J_TYpprFmoLlVM0MNZElt8IwEkvEEhAwCmb8P_f7Fro=
1416
TFA_BACKUP_TOKENS_NR=5
1517
TFA_TOKEN_LENGTH=6
@@ -18,16 +20,27 @@ TFA_TOKEN_LENGTH=6
1820
# -->MAX = 10 => 5 minutes
1921
TOTP_TOKEN_TOLERANCE=2
2022
TOTP_ISSUER_NAME=fastapi_2fa
21-
22-
ACCESS_TOKEN_EXPIRE_MINUTES=30
23-
REFRESH_TOKEN_EXPIRE_MINUTES=1440 # 24 h
2423
PRE_TFA_TOKEN_EXPIRE_MINUTES=5
2524

25+
# # CORS
2626
BACKEND_CORS_ORIGINS=http://localhost:5555
2727

28+
# # DB
2829
POSTGRES_USER=admin
2930
POSTGRES_PASSWORD=admin
3031
POSTGRES_HOST=localhost
31-
POSTGRES_PORT=5430
32+
POSTGRES_PORT=5432
3233
POSTGRES_DB=postgres
33-
SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@localhost:5454/postgres
34+
SQLALCHEMY_DATABASE_URI=postgresql+asyncpg://admin:admin@localhost:5454/postgres
35+
36+
# # RABBIT MQ
37+
RABBITMQ_DEFAULT_USER=admin
38+
RABBITMQ_DEFAULT_PASS=admin
39+
40+
# # CELERY
41+
CELERY_BROKER_URL=amqp://admin:admin@localhost:5672/
42+
result_backend=redis://localhost:6389/0
43+
44+
# # FLOWER
45+
CELERY_FLOWER_USER=admin
46+
CELERY_FLOWER_PASSWORD=admin
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from celery.result import AsyncResult
2+
from fastapi import APIRouter
3+
4+
from fastapi_2fa.core.config import settings
5+
from fastapi_2fa.core.utils import Email
6+
from fastapi_2fa.tasks.tasks import send_email_task
7+
8+
tasks_router = APIRouter()
9+
10+
11+
@tasks_router.get(
12+
"/test-celery",
13+
description="endpoint to test celery send mail function"
14+
)
15+
async def testcelery():
16+
"""
17+
Test celery
18+
"""
19+
email_ob = Email(
20+
to_=[settings.FAKE_EMAIL_SENDER],
21+
from_=settings.FAKE_EMAIL_SENDER,
22+
text_="Prova messaggio con Celery"
23+
)
24+
task = send_email_task.apply_async(kwargs={'email': email_ob.to_json()})
25+
task_id = task.task_id
26+
return {'task_id': task_id}
27+
28+
29+
@tasks_router.get(
30+
"/taskstatus",
31+
description="endpoint to retrieve task status"
32+
)
33+
async def taskstatus(task_id):
34+
task = AsyncResult(task_id)
35+
if isinstance(task.result, Exception):
36+
task_result = str(task.result)
37+
else:
38+
task_result = task.result
39+
40+
response = {
41+
'state': task.state,
42+
'result': task_result,
43+
}
44+
45+
return response

fastapi_2fa/api/endpoints/api_v1/router.py renamed to fastapi_2fa/api/endpoints/router.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from fastapi import APIRouter
22

3-
from fastapi_2fa.api.endpoints.api_v1 import auth, two_factor_auth, users
3+
from fastapi_2fa.api.endpoints.api_v1 import (auth, tasks, two_factor_auth,
4+
users)
45

56
router = APIRouter()
67

@@ -12,3 +13,6 @@
1213

1314
# tfa
1415
router.include_router(two_factor_auth.tfa_router, prefix="/tfa", tags=["tfa"])
16+
17+
# tasks
18+
router.include_router(tasks.tasks_router, prefix="/tasks", tags=["tasks"])

fastapi_2fa/core/config.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,21 @@ class BaseConfig(BaseSettings):
1515

1616
FAKE_EMAIL_SENDER: EmailStr = os.environ.get("FAKE_EMAIL_SENDER")
1717

18+
# # JWT
1819
JWT_SECRET_KEY: str = os.environ.get("JWT_SECRET_KEY")
1920
JWT_SECRET_KEY_REFRESH: str = os.environ.get("JWT_SECRET_KEY_REFRESH")
2021
PRE_TFA_SECRET_KEY: str = os.environ.get("PRE_TFA_SECRET_KEY")
2122
ALGORITHM: str = os.environ.get("ALGORITHM")
23+
ACCESS_TOKEN_EXPIRE_MINUTES: int = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 15)
24+
REFRESH_TOKEN_EXPIRE_MINUTES: int = os.environ.get(
25+
"REFRESH_TOKEN_EXPIRE_MINUTES", 60 * 24
26+
) # 24 h
2227

28+
# # 2 FACTOR AUTHENTICATION
2329
FERNET_KEY_TFA_TOKEN: str = os.environ.get("FERNET_KEY_TFA_TOKEN")
24-
30+
PRE_TFA_TOKEN_EXPIRE_MINUTES: int = os.environ.get(
31+
"PRE_TFA_TOKEN_EXPIRE_MINUTES", 2
32+
)
2533
TFA_BACKUP_TOKENS_NR: int = os.environ.get("TFA_BACKUP_TOKENS_NR")
2634
TFA_TOKEN_LENGTH: int = os.environ.get("TFA_TOKEN_LENGTH")
2735
TOTP_ISSUER_NAME: str = os.environ.get("TOTP_ISSUER_NAME")
@@ -34,18 +42,12 @@ class BaseConfig(BaseSettings):
3442
le=10
3543
)
3644

37-
ACCESS_TOKEN_EXPIRE_MINUTES: int = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 15)
38-
REFRESH_TOKEN_EXPIRE_MINUTES: int = os.environ.get(
39-
"REFRESH_TOKEN_EXPIRE_MINUTES", 60 * 24
40-
) # 24 h
41-
PRE_TFA_TOKEN_EXPIRE_MINUTES: int = os.environ.get(
42-
"PRE_TFA_TOKEN_EXPIRE_MINUTES", 2
43-
)
44-
45+
# # CORS
4546
BACKEND_CORS_ORIGINS: AnyHttpUrl | list[AnyHttpUrl] = os.environ.get(
4647
"BACKEND_CORS_ORIGINS", "http://localhost:5555"
4748
)
4849

50+
# # DB
4951
SQLALCHEMY_DATABASE_URI: str = os.environ.get("SQLALCHEMY_DATABASE_URI")
5052

5153
@validator("SQLALCHEMY_DATABASE_URI", pre=True)
@@ -62,6 +64,16 @@ def assemble_db_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any
6264
path=f"/{values.get('POSTGRES_DB') or ''}",
6365
)
6466

67+
# # CELERY
68+
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0")
69+
result_backend: str = os.environ.get("result_backend", "redis://redis:6379/0")
70+
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
71+
# Celery ACK the queue only when task is completed
72+
CELERY_TASK_ACKS_LATE = True
73+
imports = ("fastapi_2fa.tasks.tasks",)
74+
task_serializer = "pickle"
75+
accept_content = ('pickle', 'json',)
76+
6577
class Config:
6678
case_sensitive = True
6779

fastapi_2fa/core/utils.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
from dataclasses import dataclass
1+
import json
2+
from dataclasses import asdict, dataclass
23

4+
from celery.result import AsyncResult
35
from pydantic import EmailStr
46

57
from fastapi_2fa.core.config import settings
68
from fastapi_2fa.models.device import Device
79
from fastapi_2fa.models.users import User
8-
from fastapi_2fa.tasks.tasks import send_email
10+
from fastapi_2fa.tasks.tasks import send_email_task
911

1012

1113
@dataclass(slots=True, frozen=True)
@@ -14,6 +16,12 @@ class Email:
1416
from_: EmailStr
1517
text_: str
1618

19+
def to_json(self):
20+
"""
21+
get the json formated string
22+
"""
23+
return json.dumps(asdict(self))
24+
1725
def __repr__(self) -> str:
1826
return (
1927
f"Email (to: {self.to_}), "
@@ -22,19 +30,21 @@ def __repr__(self) -> str:
2230
)
2331

2432

25-
def send_mail_backup_tokens(user: User, device: Device):
33+
def send_mail_backup_tokens(user: User, device: Device) -> AsyncResult:
2634
email_ob = Email(
2735
to_=[user.email],
2836
from_=settings.FAKE_EMAIL_SENDER,
2937
text_=f"Backup tokens : {device.backup_tokens}"
3038
)
31-
send_email(email=email_ob)
39+
task = send_email_task.apply_async(kwargs={'email': email_ob.to_json()})
40+
return task
3241

3342

34-
def send_mail_totp_token(user: User, token: str):
43+
def send_mail_totp_token(user: User, token: str) -> AsyncResult:
3544
email_ob = Email(
3645
to_=[user.email],
3746
from_=settings.FAKE_EMAIL_SENDER,
3847
text_=f"Access TOTP token : {token}"
3948
)
40-
send_email(email=email_ob)
49+
task = send_email_task.apply_async(kwargs={'email': email_ob.to_json()})
50+
return task

fastapi_2fa/crud/users.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,4 @@ async def get_by_email(db: Session, email: EmailStr) -> User | None:
5454
return model_instance
5555

5656

57-
5857
user_crud = UserCrud(model=User)

fastapi_2fa/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from fastapi import FastAPI
33
from starlette.middleware.cors import CORSMiddleware
44

5-
from fastapi_2fa.api.endpoints.api_v1.router import router
5+
from fastapi_2fa.api.endpoints.router import router
66
from fastapi_2fa.core.config import settings
77

88
app = FastAPI(

fastapi_2fa/tasks/celery_conf.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from celery import Celery
2+
3+
from fastapi_2fa.core.config import settings
4+
5+
celery = Celery('my_celery', config_source=settings, namespace='CELERY')

0 commit comments

Comments
 (0)