Skip to content

Commit 87db17e

Browse files
committed
add rabbitmq broker for user registration events
1 parent 7e64885 commit 87db17e

File tree

9 files changed

+108
-18
lines changed

9 files changed

+108
-18
lines changed

api/firebase_api.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
from firebase_admin import auth as firebase_auth
55
from google.oauth2 import id_token
66

7-
from api.validators.user_validation import (ChangePasswordPayload,
8-
ChangePasswordResponse,
9-
FirebaseLoginPayload,
10-
FirebaseLoginResponse,
11-
FirebaseRegistrationPayload,
12-
FirebaseRegistrationResponse)
7+
from api.validators.user_validation import (
8+
ChangePasswordPayload,
9+
ChangePasswordResponse,
10+
FirebaseLoginPayload,
11+
FirebaseLoginResponse,
12+
FirebaseRegistrationPayload,
13+
FirebaseRegistrationResponse,
14+
)
1315
from models.service.user_service import User
1416

1517
logger = logging.getLogger("groceror")

api/inventory_api.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
from fastapi import APIRouter, Depends, HTTPException, status
55

66
from api.helpers.inventory_helper import InventoryHelper
7-
from api.validators.inventory_validation import (AddInventoryPayload,
8-
AddInventoryResponse,
9-
StoreInventoryResponse)
7+
from api.validators.inventory_validation import (
8+
AddInventoryPayload,
9+
AddInventoryResponse,
10+
StoreInventoryResponse,
11+
)
1012
from helpers.jwt import auth_required
1113
from models.entity.user_entity import User
1214

api/user_api.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@
44
from loguru import logger
55

66
from api.helpers import auth_helper
7-
from api.validators.user_validation import (ChangePasswordPayload,
8-
ChangePasswordResponse,
9-
LoginPayload, LoginResponse,
10-
RegistrationPayload,
11-
RegistrationResponse)
7+
from api.validators.user_validation import (
8+
ChangePasswordPayload,
9+
ChangePasswordResponse,
10+
LoginPayload,
11+
LoginResponse,
12+
RegistrationPayload,
13+
RegistrationResponse,
14+
)
1215
from config import JWTConfig
16+
from engine import publisher
1317
from helpers.jwt import JWT
18+
import pika
19+
import json
1420

1521
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
1622

@@ -54,6 +60,9 @@ async def register(registration_payload: RegistrationPayload):
5460
detail="Issue with registering user",
5561
)
5662
else:
63+
publisher.publish_message(
64+
event="user_registered", routing_key="email_queue", new_user=new_user
65+
)
5766
return {"id": new_user.id}
5867

5968

engine/publisher.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import json
2+
import logging
3+
import pika
4+
5+
from models.entity.user_entity import User
6+
7+
logger = logging.getLogger(__name__)
8+
9+
def publish_message(event: str, routing_key: str, new_user: User):
10+
# Publish registration event to email service
11+
try:
12+
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
13+
channel = connection.channel()
14+
15+
# Ensure exchange exists
16+
channel.queue_declare(queue='email_queue', durable=True)
17+
18+
# Create message payload
19+
message = {
20+
"event": event,
21+
"user_id": str(new_user.id),
22+
"email": new_user.email,
23+
"name": new_user.name
24+
}
25+
26+
# Publish message
27+
channel.basic_publish(
28+
exchange='',
29+
routing_key=routing_key,
30+
body=json.dumps(message),
31+
properties=pika.BasicProperties(
32+
delivery_mode=2, # make message persistent
33+
content_type='application/json'
34+
)
35+
)
36+
37+
connection.close()
38+
39+
except Exception as e:
40+
logger.error(f"Failed to publish registration event: {str(e)}")
41+
# Don't raise exception - registration succeeded even if notification fails

models/entity/store_entity.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Optional
2+
from typing import List, Optional
33
from uuid import UUID, uuid4
44

55
from sqlmodel import Field, Relationship, SQLModel
@@ -23,6 +23,8 @@ class Store(SQLModel, table=True):
2323
is_active: bool = Field(default=True)
2424
latitude: float = Field(default=None)
2525
longitude: float = Field(default=None)
26+
# user: User = Relationship(back_populates="store")
27+
# inventory: List["Inventory"] = Relationship(back_populates="store")
2628

2729
def __repr__(self):
2830
return f"<Store(id={self.id}, name={self.name})>"

models/entity/user_entity.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import uuid
22
from datetime import datetime
33
from enum import Enum
4-
from typing import Optional
4+
from typing import List, Optional
55

6-
from sqlmodel import Field, SQLModel
6+
from sqlmodel import Field, Relationship, SQLModel
77

88

99
class UserType(str, Enum):
@@ -22,3 +22,4 @@ class User(SQLModel, table=True):
2222
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
2323
location: Optional[str] = None
2424
# inventory: List["Inventory"] = Relationship(back_populates="user")
25+
# store: Optional["Store"] = Relationship(back_populates="user")

models/service/inventory_service.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class InventoryService:
88
def __init__(self, store_id: str):
99
self.store_id = store_id
1010

11-
def get_inventory(self) -> List[Inventory]:
11+
def get_inventory_for_store(self) -> List[Inventory]:
1212
return (
1313
db_session.query(Inventory)
1414
.filter(Inventory.store_id == self.store_id)
@@ -24,5 +24,23 @@ def add_inventory(
2424
db_session.add(inventory)
2525
db_session.commit()
2626

27+
def delete_inventory(self, name: str):
28+
store_inventory = (
29+
db_session.query(Inventory)
30+
.filter(Inventory.name == name, Inventory.store_id == self.store_id)
31+
.first()
32+
)
33+
db_session.delete(store_inventory)
34+
db_session.commit()
35+
2736
def update_inventory(self, inventory: Inventory):
37+
store_inventory = (
38+
db_session.query(Inventory)
39+
.filter(
40+
Inventory.name == inventory.name, Inventory.store_id == self.store_id
41+
)
42+
.first()
43+
)
44+
store_inventory.quantity = inventory.quantity
45+
store_inventory.price = inventory.price
2846
db_session.commit()

models/service/store_service.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,17 @@ def find_nearby_stores(
182182
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
183183
detail=f"Error finding nearby stores: {str(e)}",
184184
)
185+
186+
def get_store_user_specs(self, store_id: UUID) -> dict:
187+
store = self.get_store(store_id)
188+
user = db_session.query(User).filter(User.id == store.user_id).first()
189+
return {
190+
"name": user.name,
191+
"email": user.email,
192+
"phone": user.phone,
193+
"address": user.address,
194+
}
195+
196+
def get_store_email(self, store_id: UUID) -> str:
197+
store = self.get_store(store_id)
198+
return db_session.query(User).filter(User.id == store.user_id).first().email

models/service/user_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sqlalchemy
66

77
from helpers.exceptions import GrocerorError
8+
89
# from helpers.jwt import JWT
910
from models.db import db_session
1011
from models.entity.user_entity import User

0 commit comments

Comments
 (0)