-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadmin_user_repository.py
More file actions
114 lines (89 loc) · 4.43 KB
/
admin_user_repository.py
File metadata and controls
114 lines (89 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import re
from datetime import datetime, timezone
from beanie.odm.operators.find import BaseFindOperator
from beanie.operators import Eq, Or, RegEx
from app.core.security import SecurityService
from app.db.docs import (
EventDocument,
ExecutionDocument,
NotificationDocument,
SagaDocument,
SavedScriptDocument,
UserDocument,
UserSettingsDocument,
)
from app.domain.enums import UserRole
from app.domain.user import DomainUserCreate, PasswordReset, User, UserListResult, UserUpdate
class AdminUserRepository:
def __init__(self, security_service: SecurityService) -> None:
self.security_service = security_service
async def create_user(self, create_data: DomainUserCreate) -> User:
doc = UserDocument(**create_data.model_dump())
await doc.insert()
return User.model_validate(doc, from_attributes=True)
async def list_users(
self, limit: int = 100, offset: int = 0, search: str | None = None, role: UserRole | None = None
) -> UserListResult:
conditions: list[BaseFindOperator] = []
if search:
escaped_search = re.escape(search)
conditions.append(
Or(
RegEx(UserDocument.username, escaped_search, options="i"),
RegEx(UserDocument.email, escaped_search, options="i"),
)
)
if role:
conditions.append(Eq(UserDocument.role, role))
query = UserDocument.find(*conditions)
total = await query.count()
docs = await query.skip(offset).limit(limit).to_list()
users = [User.model_validate(doc, from_attributes=True) for doc in docs]
return UserListResult(users=users, total=total, offset=offset, limit=limit)
async def get_user_by_id(self, user_id: str) -> User | None:
doc = await UserDocument.find_one({"user_id": user_id})
return User.model_validate(doc, from_attributes=True) if doc else None
async def update_user(self, user_id: str, update_data: UserUpdate) -> User | None:
doc = await UserDocument.find_one({"user_id": user_id})
if not doc:
return None
update_dict = update_data.model_dump(exclude_none=True)
# Handle password hashing
if "password" in update_dict:
update_dict["hashed_password"] = self.security_service.get_password_hash(update_dict.pop("password"))
if update_dict:
update_dict["updated_at"] = datetime.now(timezone.utc)
await doc.set(update_dict)
return User.model_validate(doc, from_attributes=True)
async def delete_user(self, user_id: str, cascade: bool = True) -> dict[str, int]:
deleted_counts = {}
doc = await UserDocument.find_one({"user_id": user_id})
if doc:
await doc.delete()
deleted_counts["user"] = 1
else:
deleted_counts["user"] = 0
if not cascade:
return deleted_counts
# Cascade delete related data
exec_result = await ExecutionDocument.find({"user_id": user_id}).delete()
deleted_counts["executions"] = exec_result.deleted_count if exec_result else 0
scripts_result = await SavedScriptDocument.find({"user_id": user_id}).delete()
deleted_counts["saved_scripts"] = scripts_result.deleted_count if scripts_result else 0
notif_result = await NotificationDocument.find({"user_id": user_id}).delete()
deleted_counts["notifications"] = notif_result.deleted_count if notif_result else 0
settings_result = await UserSettingsDocument.find({"user_id": user_id}).delete()
deleted_counts["user_settings"] = settings_result.deleted_count if settings_result else 0
events_result = await EventDocument.find({"metadata.user_id": user_id}).delete()
deleted_counts["events"] = events_result.deleted_count if events_result else 0
sagas_result = await SagaDocument.find({"context_data.user_id": user_id}).delete()
deleted_counts["sagas"] = sagas_result.deleted_count if sagas_result else 0
return deleted_counts
async def reset_user_password(self, reset_data: PasswordReset) -> bool:
doc = await UserDocument.find_one({"user_id": reset_data.user_id})
if not doc:
return False
doc.hashed_password = self.security_service.get_password_hash(reset_data.new_password)
doc.updated_at = datetime.now(timezone.utc)
await doc.save()
return True