|
3 | 3 |
|
4 | 4 | This module provides functions to initialize default roles and permissions |
5 | 5 | in the database when the application starts for the first time. |
| 6 | +It also ensures ADMIN_EMAILS users always have the admin role. |
6 | 7 | """ |
7 | 8 |
|
8 | 9 | import logging |
@@ -185,3 +186,72 @@ async def init_default_roles_and_permissions(db: AsyncSession) -> None: |
185 | 186 | logger.error(f"[SEED] Error initializing roles and permissions: {e}") |
186 | 187 | await db.rollback() |
187 | 188 | raise |
| 189 | + |
| 190 | + |
| 191 | +async def sync_admin_users(db: AsyncSession) -> None: |
| 192 | + """ |
| 193 | + Ensure all users in ADMIN_EMAILS have the admin role. |
| 194 | +
|
| 195 | + This function runs at every startup to guarantee that configured |
| 196 | + admin emails always have admin privileges, even if their role |
| 197 | + was accidentally changed. |
| 198 | + """ |
| 199 | + from ..config import settings |
| 200 | + from .user import User |
| 201 | + |
| 202 | + if not settings.ADMIN_EMAILS: |
| 203 | + logger.info("[SEED] No ADMIN_EMAILS configured, skipping admin sync") |
| 204 | + return |
| 205 | + |
| 206 | + try: |
| 207 | + # Get admin role |
| 208 | + result = await db.execute(select(Role).where(Role.name == "admin")) |
| 209 | + admin_role = result.scalar_one_or_none() |
| 210 | + |
| 211 | + if not admin_role: |
| 212 | + logger.warning("[SEED] Admin role not found, cannot sync admin users") |
| 213 | + return |
| 214 | + |
| 215 | + # Parse admin emails |
| 216 | + admin_emails = [e.strip().lower() for e in settings.ADMIN_EMAILS.split(",") if e.strip()] |
| 217 | + |
| 218 | + if not admin_emails: |
| 219 | + return |
| 220 | + |
| 221 | + logger.info(f"[SEED] Syncing admin role for {len(admin_emails)} configured admin(s)...") |
| 222 | + |
| 223 | + updated_count = 0 |
| 224 | + for email in admin_emails: |
| 225 | + # Find user by email (case-insensitive) |
| 226 | + result = await db.execute( |
| 227 | + select(User).where(User.email.ilike(email)) |
| 228 | + ) |
| 229 | + user = result.scalar_one_or_none() |
| 230 | + |
| 231 | + if not user: |
| 232 | + logger.debug(f"[SEED] Admin user not found (not yet registered): {email}") |
| 233 | + continue |
| 234 | + |
| 235 | + # Check if user needs update |
| 236 | + needs_update = False |
| 237 | + if user.role_id != admin_role.id: |
| 238 | + user.role_id = admin_role.id |
| 239 | + needs_update = True |
| 240 | + if not user.is_admin: |
| 241 | + user.is_admin = True |
| 242 | + needs_update = True |
| 243 | + |
| 244 | + if needs_update: |
| 245 | + updated_count += 1 |
| 246 | + logger.info(f"[SEED] Admin role assigned to: {user.email}") |
| 247 | + |
| 248 | + if updated_count > 0: |
| 249 | + await db.commit() |
| 250 | + logger.info(f"[SEED] Admin sync complete: {updated_count} user(s) updated") |
| 251 | + else: |
| 252 | + logger.info("[SEED] Admin sync complete: all admin users already have correct role") |
| 253 | + |
| 254 | + except Exception as e: |
| 255 | + logger.error(f"[SEED] Error syncing admin users: {e}") |
| 256 | + await db.rollback() |
| 257 | + raise |
0 commit comments