|
| 1 | +""" |
| 2 | +Auth protocol definitions for PraisonAI Agents. |
| 3 | +
|
| 4 | +Defines the structural contracts for authentication and authorization |
| 5 | +backends via typing.Protocol. Any class implementing the required methods |
| 6 | +satisfies the protocol without explicit inheritance (structural subtyping). |
| 7 | +
|
| 8 | +Design influenced by: |
| 9 | +- Multica: JWT-based auth with email codes + Google OAuth, role-based |
| 10 | + workspace membership (owner/admin/member), agents as assignees. |
| 11 | +- Paperclip: better-auth sessions, company memberships with principal |
| 12 | + types (user/agent), fine-grained permission grants. |
| 13 | +- PraisonAI patterns: ApprovalProtocol, GatewayProtocol, DbAdapter. |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +from dataclasses import dataclass, field |
| 19 | +from typing import Any, Dict, List, Optional, Protocol, runtime_checkable |
| 20 | + |
| 21 | + |
| 22 | +@dataclass |
| 23 | +class AuthIdentity: |
| 24 | + """Authenticated identity (user or agent). |
| 25 | +
|
| 26 | + Attributes: |
| 27 | + id: Unique identifier for this principal. |
| 28 | + type: Principal type — ``"user"`` or ``"agent"``. |
| 29 | + workspace_id: Active workspace/tenant scope (optional). |
| 30 | + roles: Roles within the workspace (e.g. ``["owner"]``). |
| 31 | + email: Email address (users only, optional). |
| 32 | + name: Display name (optional). |
| 33 | + metadata: Arbitrary extra data for backend-specific info. |
| 34 | + """ |
| 35 | + |
| 36 | + id: str |
| 37 | + type: str = "user" |
| 38 | + workspace_id: Optional[str] = None |
| 39 | + roles: List[str] = field(default_factory=list) |
| 40 | + email: Optional[str] = None |
| 41 | + name: Optional[str] = None |
| 42 | + metadata: Dict[str, Any] = field(default_factory=dict) |
| 43 | + |
| 44 | + @property |
| 45 | + def is_user(self) -> bool: |
| 46 | + """Check if this identity is a human user.""" |
| 47 | + return self.type == "user" |
| 48 | + |
| 49 | + @property |
| 50 | + def is_agent(self) -> bool: |
| 51 | + """Check if this identity is an agent.""" |
| 52 | + return self.type == "agent" |
| 53 | + |
| 54 | + def has_role(self, role: str) -> bool: |
| 55 | + """Check if this identity has a specific role.""" |
| 56 | + return role in self.roles |
| 57 | + |
| 58 | + def has_any_role(self, *roles: str) -> bool: |
| 59 | + """Check if this identity has any of the specified roles.""" |
| 60 | + return bool(set(roles) & set(self.roles)) |
| 61 | + |
| 62 | + |
| 63 | +@dataclass |
| 64 | +class AuthConfig: |
| 65 | + """Configuration for authentication behaviour. |
| 66 | +
|
| 67 | + Follows PraisonAI's ``False/True/Config`` progressive-disclosure pattern: |
| 68 | +
|
| 69 | + - ``auth=False`` — disabled (no auth checks). |
| 70 | + - ``auth=True`` — use default backend from registry. |
| 71 | + - ``auth=AuthConfig(...)`` — full control. |
| 72 | +
|
| 73 | + Attributes: |
| 74 | + backend: An :class:`AuthBackendProtocol` backend. |
| 75 | + ``None`` falls back to registry default. |
| 76 | + token_ttl: Token time-to-live in seconds (default: 30 days). |
| 77 | + require_workspace: Whether workspace_id is required for authorization. |
| 78 | + """ |
| 79 | + |
| 80 | + backend: Any = None |
| 81 | + token_ttl: int = 30 * 24 * 3600 |
| 82 | + require_workspace: bool = False |
| 83 | + |
| 84 | + |
| 85 | +@runtime_checkable |
| 86 | +class AuthBackendProtocol(Protocol): |
| 87 | + """Protocol for authentication/authorization backends. |
| 88 | +
|
| 89 | + Implement ``authenticate`` and ``authorize`` to create custom auth: |
| 90 | + - JWT token validation |
| 91 | + - API key lookup |
| 92 | + - OAuth token exchange |
| 93 | + - Session cookie resolution |
| 94 | +
|
| 95 | + Example:: |
| 96 | +
|
| 97 | + class JWTAuthBackend: |
| 98 | + async def authenticate( |
| 99 | + self, credentials: dict |
| 100 | + ) -> AuthIdentity | None: |
| 101 | + token = credentials.get("token") |
| 102 | + payload = decode_jwt(token) |
| 103 | + return AuthIdentity( |
| 104 | + id=payload["sub"], |
| 105 | + type="user", |
| 106 | + email=payload.get("email"), |
| 107 | + roles=payload.get("roles", []), |
| 108 | + ) |
| 109 | +
|
| 110 | + async def authorize( |
| 111 | + self, identity: AuthIdentity, resource: str, action: str |
| 112 | + ) -> bool: |
| 113 | + return identity.has_role("admin") |
| 114 | + """ |
| 115 | + |
| 116 | + async def authenticate( |
| 117 | + self, |
| 118 | + credentials: Dict[str, Any], |
| 119 | + ) -> Optional[AuthIdentity]: |
| 120 | + """Authenticate credentials and return an identity. |
| 121 | +
|
| 122 | + Args: |
| 123 | + credentials: Backend-specific credentials dict. |
| 124 | + Common keys: ``token``, ``api_key``, ``email``, ``password``, |
| 125 | + ``session_id``. |
| 126 | +
|
| 127 | + Returns: |
| 128 | + AuthIdentity if authentication succeeds, None otherwise. |
| 129 | + """ |
| 130 | + ... |
| 131 | + |
| 132 | + async def authorize( |
| 133 | + self, |
| 134 | + identity: AuthIdentity, |
| 135 | + resource: str, |
| 136 | + action: str, |
| 137 | + ) -> bool: |
| 138 | + """Check if an identity is authorized for an action on a resource. |
| 139 | +
|
| 140 | + Args: |
| 141 | + identity: The authenticated identity. |
| 142 | + resource: Resource identifier (e.g. ``"workspace:ws-123"`` |
| 143 | + or ``"issue:ISS-42"``). |
| 144 | + action: Action to perform (e.g. ``"read"``, ``"write"``, |
| 145 | + ``"delete"``, ``"admin"``). |
| 146 | +
|
| 147 | + Returns: |
| 148 | + True if authorized, False otherwise. |
| 149 | + """ |
| 150 | + ... |
| 151 | + |
| 152 | + |
| 153 | +@runtime_checkable |
| 154 | +class WorkspaceContextProtocol(Protocol): |
| 155 | + """Protocol for providing workspace context to agents. |
| 156 | +
|
| 157 | + Enables agents to access workspace-specific settings, instructions, |
| 158 | + and agent configurations from the platform layer. |
| 159 | +
|
| 160 | + Example:: |
| 161 | +
|
| 162 | + class PlatformWorkspaceContext: |
| 163 | + async def get_workspace_context(self, workspace_id): |
| 164 | + ws = await db.get_workspace(workspace_id) |
| 165 | + return ws.context |
| 166 | +
|
| 167 | + async def get_agent_config(self, workspace_id, agent_id): |
| 168 | + agent = await db.get_agent(workspace_id, agent_id) |
| 169 | + return {"system_prompt": agent.system_prompt, ...} |
| 170 | + """ |
| 171 | + |
| 172 | + async def get_workspace_context( |
| 173 | + self, |
| 174 | + workspace_id: str, |
| 175 | + ) -> Optional[str]: |
| 176 | + """Get workspace-level context/instructions for agents. |
| 177 | +
|
| 178 | + Args: |
| 179 | + workspace_id: The workspace identifier. |
| 180 | +
|
| 181 | + Returns: |
| 182 | + Context string if available, None otherwise. |
| 183 | + """ |
| 184 | + ... |
| 185 | + |
| 186 | + async def get_agent_config( |
| 187 | + self, |
| 188 | + workspace_id: str, |
| 189 | + agent_id: str, |
| 190 | + ) -> Optional[Dict[str, Any]]: |
| 191 | + """Get agent configuration from the platform. |
| 192 | +
|
| 193 | + Args: |
| 194 | + workspace_id: The workspace identifier. |
| 195 | + agent_id: The agent identifier. |
| 196 | +
|
| 197 | + Returns: |
| 198 | + Agent configuration dict if found, None otherwise. |
| 199 | + Expected keys: ``system_prompt``, ``model``, ``tools``, |
| 200 | + ``max_concurrent_tasks``. |
| 201 | + """ |
| 202 | + ... |
0 commit comments