Skip to content

Commit 92e3c9d

Browse files
authored
Merge pull request #10 from CaptainAril/feat/user-registration
queue implementation for user registration and email verification
2 parents 28b5142 + ffe8ed4 commit 92e3c9d

File tree

16 files changed

+392
-12
lines changed

16 files changed

+392
-12
lines changed

.env.example

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,14 @@ JWT_ISSUER=xxx
2828
# time in minutes
2929
OTP_LIFETIME=10
3030

31+
GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX'
32+
GATEWAY_KEY_TTL=2
33+
34+
GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX'
35+
GATEWAY_KEY_TTL=2
36+
3137
# amqp://username:password@host[:port]/
32-
RABBITMQ_URL=amqp://guest:guest@localhost:5672/
38+
RABBITMQ_URL=amqp://guest:guest@localhost:5672/
39+
40+
QUEUE_SECRECT_KEY='XXXXXXXXXXXXXXXXX'
41+
QUEUE_SECRECT_KEY_TTL=0.5

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ dependencies = [
2121
"pyjwt>=2.10.1",
2222
"faststream[rabbit]>=0.5.39",
2323
"starlette>=0.46.2",
24-
"uvicorn>=0.34.1",
24+
"django-extensions>=4.1",
25+
"uvicorn>=0.34.2",
2526
]
2627

2728
[dependency-groups]

src/api/constants/queues.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import TypedDict
2+
3+
4+
class QueueNames(TypedDict):
5+
USER_REGISTRATION: str
6+
EMAIL_VALIDATION: str
7+
8+
9+
QUEUE_NAMES: QueueNames = {
10+
"USER_REGISTRATION": "create_user",
11+
"EMAIL_VALIDATION": "validate_email",
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import TypedDict
2+
3+
4+
class SignatureSources(TypedDict):
5+
gateway: str
6+
queue: str
7+
8+
9+
SIGNATURE_SOURCES: SignatureSources = {
10+
"gateway": "Gateway",
11+
"queue": "Broker Queue",
12+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from types import CoroutineType
2+
from typing import Any
3+
from collections.abc import Callable, Awaitable
4+
5+
from faststream import BaseMiddleware
6+
from faststream.broker.message import StreamMessage
7+
from faststream.rabbit.message import RabbitMessage
8+
9+
from src.env import queue
10+
from src.utils.logger import Logger
11+
from src.api.services.UtilityService import UtilityService
12+
from src.api.constants.signature_sources import SIGNATURE_SOURCES
13+
14+
15+
class PublishMiddleware(BaseMiddleware):
16+
"""
17+
Middleware to handle subscription messages.
18+
"""
19+
20+
async def publish_scope(
21+
self,
22+
call_next: Callable[..., Awaitable[Any]],
23+
msg: RabbitMessage,
24+
*args: tuple[Any, ...],
25+
**kwargs: dict[str, Any],
26+
) -> CoroutineType:
27+
timestamp = UtilityService.get_timestamp()
28+
signature = UtilityService.generate_signature(queue["key"], timestamp)
29+
30+
headers = {}
31+
32+
headers["X-BROKER-SIGNATURE"] = signature
33+
headers["X-BROKER-TIMESTAMP"] = timestamp
34+
headers["X-BROKER-KEY"] = queue["key"]
35+
36+
kwargs["headers"] = headers
37+
return await super().publish_scope(call_next, msg, *args, **kwargs)
38+
39+
40+
class SubscribeMiddleware(BaseMiddleware):
41+
async def consume_scope(
42+
self, call_next: Callable[[Any], Awaitable[Any]], msg: StreamMessage
43+
) -> CoroutineType | None:
44+
logger = Logger(__name__)
45+
try:
46+
UtilityService.verify_signature(
47+
logger=logger,
48+
signature_data={
49+
"signature": msg.headers["X-BROKER-SIGNATURE"],
50+
"timestamp": msg.headers["X-BROKER-TIMESTAMP"],
51+
"key": queue["key"],
52+
"ttl": queue["ttl"],
53+
"title": SIGNATURE_SOURCES["gateway"],
54+
},
55+
)
56+
57+
return await super().consume_scope(call_next, msg)
58+
except KeyError as e:
59+
message = f"Missing required header: {e}"
60+
logger.error(
61+
{
62+
"activity_type": "Authenticate GatewaBroker Queue Request",
63+
"message": message,
64+
"metadata": {"headers": msg.headers},
65+
}
66+
)
67+
except Exception as e:
68+
queue_operation = msg.raw_message.routing_key
69+
message = f"`{queue_operation}` operation failed: {e}"
70+
logger.error(
71+
{
72+
"activity_type": "Authenticate GatewaBroker Queue Request",
73+
"message": message,
74+
"metadata": {"headers": msg.headers, "message": msg._decoded_body},
75+
}
76+
)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from django.http import HttpRequest
2+
from ninja.errors import AuthenticationError
3+
from ninja.security import APIKeyHeader
4+
from ninja.openapi.schema import OpenAPISchema
5+
6+
from src.env import api_gateway
7+
from src.utils.logger import Logger
8+
from src.api.services.UtilityService import SignatureData, UtilityService
9+
from src.api.constants.signature_sources import SIGNATURE_SOURCES
10+
11+
12+
class GateWayAuth(APIKeyHeader):
13+
def __init__(self, logger: Logger) -> None:
14+
self.logger = logger
15+
super().__init__()
16+
17+
def authenticate(self, request: HttpRequest, key: str | None) -> str | None:
18+
try:
19+
api_key = request.headers["X-API-GATEWAY-KEY"]
20+
api_timestamp = request.headers["X-API-GATEWAY-TIMESTAMP"]
21+
api_signature = request.headers["X-API-GATEWAY-SIGNATURE"]
22+
except KeyError as e:
23+
message = f"Missing required header: {e}"
24+
self.logger.error(
25+
{
26+
"activity_type": "Authenticate Gateway Request",
27+
"message": message,
28+
"metadata": {"headers": request.headers},
29+
}
30+
)
31+
raise AuthenticationError(message=message)
32+
33+
valid_api_key = api_gateway["key"]
34+
if api_key != valid_api_key:
35+
message = "Invalid API key!"
36+
self.logger.error(
37+
{
38+
"activity_type": "Authenticate Gateway Request",
39+
"message": message,
40+
"metadata": {"headers": request.headers},
41+
}
42+
)
43+
raise AuthenticationError(message=message)
44+
45+
signature_data: SignatureData = {
46+
"signature": api_signature,
47+
"timestamp": api_timestamp,
48+
"key": valid_api_key,
49+
"ttl": 5,
50+
"title": SIGNATURE_SOURCES["gateway"],
51+
}
52+
53+
UtilityService.verify_signature(
54+
logger=self.logger, signature_data=signature_data
55+
)
56+
57+
self.logger.debug(
58+
{
59+
"activity_type": "Authenticate Gateway Request",
60+
"message": "Successfully authenticated gateway request",
61+
"metadata": {
62+
"headers": request.headers,
63+
},
64+
}
65+
)
66+
67+
return api_signature
68+
69+
70+
def get_authentication() -> GateWayAuth:
71+
gateway_auth = GateWayAuth(Logger("Authentication"))
72+
return gateway_auth
73+
74+
75+
def add_global_headers(schema: OpenAPISchema) -> OpenAPISchema:
76+
for path in schema["paths"]:
77+
for method in schema["paths"][path]:
78+
operation = schema["paths"][path][method]
79+
if operation.get("security"):
80+
operation["security"] = schema["security"]
81+
return schema
82+
83+
84+
authentication = get_authentication()

src/api/routes/__init__.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,56 @@
11
from ninja import NinjaAPI
22
from django.http import HttpRequest
3+
from ninja.openapi.schema import OpenAPISchema
34

45
from src.env import app
6+
from src.api.middlewares.GateWayMiddleware import authentication, add_global_headers
57

68
api: NinjaAPI = NinjaAPI(
79
version=app["version"],
810
title=app["display_name"],
911
description=app["description"],
12+
auth=authentication,
1013
)
1114

15+
16+
original_get_openapi_schema = api.get_openapi_schema
17+
18+
19+
def custom_openapi_schema(path_params: dict | None = None) -> OpenAPISchema:
20+
schema = original_get_openapi_schema()
21+
22+
schema["components"]["securitySchemes"] = {
23+
"Gateway Key": {
24+
"type": "apiKey",
25+
"in": "header",
26+
"name": "X-API-GATEWAY-KEY",
27+
},
28+
"API Timestamp": {
29+
"type": "apiKey",
30+
"in": "header",
31+
"name": "X-API-GATEWAY-TIMESTAMP",
32+
},
33+
"API Signature": {
34+
"type": "apiKey",
35+
"in": "header",
36+
"name": "X-API-GATEWAY-SIGNATURE",
37+
},
38+
}
39+
40+
schema["security"] = [
41+
{
42+
"Gateway Key": [],
43+
"API Timestamp": [],
44+
"API Signature": [],
45+
}
46+
]
47+
48+
schema = add_global_headers(schema)
49+
return schema
50+
51+
52+
setattr(api, "get_openapi_schema", custom_openapi_schema)
53+
1254
from src.api.utils import error_handlers # noqa: E402, F401
1355

1456

src/api/services/AuthService.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from typing import Annotated
22

33
from src.utils.svcs import Service
4+
from src.config.asgi import broker
45
from src.utils.logger import Logger
56
from src.api.typing.JWT import JWTSuccess
7+
from src.api.constants.queues import QUEUE_NAMES
68
from src.api.typing.UserExists import UserExists
79
from src.api.constants.messages import MESSAGES, DYNAMIC_MESSAGES
810
from src.api.typing.UserSuccess import UserSuccess
@@ -60,6 +62,11 @@ async def register(self, req: CreateUserRequest) -> UserExists:
6062

6163
created_user = await UserRepository.add(req)
6264

65+
user_data = {"id": created_user.id, "email": created_user.email}
66+
queue = QUEUE_NAMES["USER_REGISTRATION"]
67+
68+
await broker.publish(message=user_data, queue=queue, persist=True)
69+
6370
await self.otp_service.send_otp(created_user.id)
6471

6572
user = self.utility_service.sanitize_user_object(created_user)
@@ -120,6 +127,11 @@ async def validate_email(self, req: AuthenticateUserOtp) -> bool:
120127
await UserRepository.update_by_user(
121128
user, {"is_active": True, "is_enabled": True, "is_validated": True}
122129
)
130+
131+
user_data = {"id": user.id, "email": user.email}
132+
queue = QUEUE_NAMES["EMAIL_VALIDATION"]
133+
await broker.publish(message=user_data, queue=queue, persist=True)
134+
123135
return True
124136

125137
async def login(self, req: AuthenticateUserRequest) -> UserSuccess:

src/api/services/UtilityService.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
import hmac
2+
import hashlib
13
from uuid import uuid4
2-
from datetime import timedelta
4+
from typing import TypedDict
5+
from datetime import datetime, timedelta
36

47
import jwt
58
import bcrypt
69
from faker import Faker
710
from django.utils import timezone
11+
from ninja.errors import AuthenticationError
812

913
from src.env import jwt_config
1014
from src.utils.svcs import Service
15+
from src.utils.logger import Logger
1116
from src.api.typing.JWT import JWTData
1217
from src.api.models.postgres import User
1318
from src.api.typing.ExpireUUID import ExpireUUID
@@ -17,6 +22,14 @@
1722
fake = Faker()
1823

1924

25+
class SignatureData(TypedDict):
26+
title: str
27+
signature: str
28+
timestamp: str
29+
key: str
30+
ttl: int | float
31+
32+
2033
@Service()
2134
class UtilityService:
2235
@staticmethod
@@ -95,3 +108,52 @@ def generate_uuid() -> ExpireUUID:
95108
lifespan = timedelta(hours=24)
96109
expires_at = current_time + lifespan
97110
return {"uuid": uuid4(), "expires_at": expires_at}
111+
112+
@staticmethod
113+
def generate_signature(key: str, timestamp: str) -> str:
114+
signature = hmac.new(
115+
key=key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256
116+
).hexdigest()
117+
return signature
118+
119+
@staticmethod
120+
def verify_signature(signature_data: SignatureData, logger: Logger) -> bool:
121+
signature = signature_data["signature"]
122+
timestamp = signature_data["timestamp"]
123+
key = signature_data["key"]
124+
ttl = signature_data["ttl"]
125+
title = signature_data["title"]
126+
127+
valid_signature = UtilityService.generate_signature(key, timestamp)
128+
is_valid = hmac.compare_digest(valid_signature, signature)
129+
130+
if not is_valid:
131+
message = "Invalid signature!"
132+
logger.error(
133+
{
134+
"activity_type": f"Authenticate {title} Request",
135+
"message": message,
136+
"metadata": {"signature": signature},
137+
}
138+
)
139+
raise AuthenticationError(message=message)
140+
141+
initial_time = datetime.fromtimestamp(float(timestamp) / 1000)
142+
valid_window = initial_time + timedelta(minutes=ttl)
143+
if valid_window < datetime.now():
144+
message = "Signature expired!"
145+
logger.error(
146+
{
147+
"activity_type": f"Authenticate {title} Request",
148+
"message": message,
149+
"metadata": {"timestamp": timestamp},
150+
}
151+
)
152+
raise AuthenticationError(message=message)
153+
154+
return True
155+
156+
@staticmethod
157+
def get_timestamp() -> str:
158+
current_time = datetime.now().timestamp() * 1000
159+
return str(current_time)

0 commit comments

Comments
 (0)