Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions docs/entitlements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Entitlements System

The entitlements system provides a pluggable backend architecture for checking user access rights and mailbox storage quotas. It integrates with the DeployCenter (Espace Operateur) API in production and uses a dummy backend for development.

## Architecture

```
┌─────────────────────────────────────────────┐
│ Django Cache Layer │
│ get_user_entitlements() │
│ get_mailbox_entitlements() │
└──────────────┬──────────────────────────────┘
┌──────────────▼──────────────────────────────┐
│ Backend Factory (singleton) │
│ get_entitlements_backend() │
└──────────────┬──────────────────────────────┘
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼───────────────┐
│ Dummy │ │ DeployCenter │
│ Backend │ │ Backend │
│ (dev/test) │ │ (production) │
└────────────┘ └──────────────────────┘
```

### Components

- **Cached service layer** (`core/entitlements/__init__.py`): Public functions with Django cache. TTL configurable via `ENTITLEMENTS_CACHE_TIMEOUT`.
- **Backend factory** (`core/entitlements/factory.py`): `@functools.cache` singleton that imports and instantiates the configured backend class.
- **Abstract base** (`core/entitlements/backends/base.py`): Defines the `EntitlementsBackend` interface.
- **Dummy backend** (`core/entitlements/backends/dummy.py`): Always grants access, returns no storage info.
- **DeployCenter backend** (`core/entitlements/backends/deploycenter.py`): Calls the DeployCenter API.
- **OIDC access check** (`core/authentication/backends.py`): Enforces `can_access` at login time.
- **API endpoint** (`core/api/viewsets/entitlements.py`): `GET /api/v1.0/entitlements/` for the frontend.

### Error Handling

All backend methods raise `EntitlementsUnavailableError` on failure. The access check at login is **fail-open**: if the entitlements service is unavailable during OIDC login, the user is allowed in. The entitlements API endpoint returns 503 if the service is unavailable.

## Configuration

### Environment Variables

| Variable | Default | Description |
|---|---|---|
| `ENTITLEMENTS_BACKEND` | `core.entitlements.backends.dummy.DummyEntitlementsBackend` | Python import path of the backend class |
| `ENTITLEMENTS_BACKEND_PARAMETERS` | `{}` | JSON object of parameters passed to the backend constructor |
| `ENTITLEMENTS_CACHE_TIMEOUT` | `300` | Cache TTL in seconds |

### DeployCenter Backend Parameters

When using `core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend`, provide these in `ENTITLEMENTS_BACKEND_PARAMETERS`:

```json
{
"base_url": "https://deploycenter.example.com",
"service_id": "messages-prod",
"api_key": "your-api-key",
"timeout": 10
}
```

### Example Production Configuration

```bash
ENTITLEMENTS_BACKEND=core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend
ENTITLEMENTS_BACKEND_PARAMETERS={"base_url":"https://deploycenter.example.com","service_id":"messages-prod","api_key":"secret-key","timeout":10}
ENTITLEMENTS_CACHE_TIMEOUT=300
```

## Backend Interface

Custom backends must extend `EntitlementsBackend` and implement:

```python
class MyBackend(EntitlementsBackend):
def __init__(self, **kwargs):
# Receive ENTITLEMENTS_BACKEND_PARAMETERS as kwargs
pass

def get_user_entitlements(self, user_sub, user_email, access_token=None):
# Return: {"can_access": bool, "can_admin_maildomains": [str], "operator": dict|None}
# Raise EntitlementsUnavailableError on failure
pass

def get_mailbox_entitlements(self, mailbox_email, access_token=None):
# Return: {"max_storage": int|None, "storage_used": int|None}
# Raise EntitlementsUnavailableError on failure
pass
```

## DeployCenter API

The DeployCenter backend calls:

```
GET {base_url}/api/v1.0/entitlements?service_id=X&account_type=X&account_id=X
```

Headers:
- `X-Service-Auth: Bearer {api_key}`
- `Authorization: Bearer {access_token}` (if provided)

### User Entitlements Request

- `account_type=user`
- `account_id=<user_email>`

Response: `{"operator": {...}, "entitlements": {"can_access": bool, "can_admin_maildomains": [str], ...}}`

### Mailbox Entitlements Request

- `account_type=mailbox`
- `account_id=<mailbox_email>`

Response: `{"operator": {...}, "entitlements": {"max_storage": int, "storage_used": int, ...}}`

## OIDC Login Integration

During OIDC login (`post_get_or_create_user`), the system:

1. Fetches user entitlements with `force_refresh=True`
2. Syncs `MailDomainAccess` ADMIN records based on `can_admin_maildomains`:
- Creates missing admin accesses for entitled domains
- Removes admin accesses for domains not in the entitled list
3. If `can_admin_maildomains` is `None` (e.g. dummy backend), sync is skipped entirely
4. Checks `can_access` and denies login if `False` (raises `SuspiciousOperation`)
- If the entitlements service is unavailable, login is allowed (fail-open)

### Deployment Consideration

Before enabling the DeployCenter backend in production, ensure that existing domain admin assignments are synced in DeployCenter. The entitlements sync will **remove** admin accesses that are not in the DeployCenter response.

## Frontend Quota Widget

The frontend includes a quota widget that displays mailbox storage usage in the header. It:

- Calls `GET /api/v1.0/entitlements/?mailbox_id=<uuid>` when a mailbox is selected
- Displays a progress bar with `storage_used / max_storage`
- Hides itself when no storage data is available (dummy backend)
- Caches data for 5 minutes on the client side
68 changes: 68 additions & 0 deletions src/backend/core/api/viewsets/entitlements.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""API ViewSet for entitlements."""

import logging

from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView

from core.entitlements import (
EntitlementsUnavailableError,
get_mailbox_entitlements,
get_user_entitlements,
)
from core.models import Mailbox

logger = logging.getLogger(__name__)


class EntitlementsView(APIView):
"""API endpoint for retrieving user and mailbox entitlements.

GET /api/v1.0/entitlements/
Returns user entitlements from cache.

GET /api/v1.0/entitlements/?mailbox_id=<uuid>
Also fetches mailbox entitlements on-demand.
"""

permission_classes = [IsAuthenticated]

def get(self, request):
"""Return entitlements for the authenticated user."""
try:
user_entitlements = get_user_entitlements(
request.user.sub, request.user.email
)
except EntitlementsUnavailableError:
return Response(
{"detail": "Entitlements service unavailable"}, status=503
)

response_data = {
"can_access": user_entitlements.get("can_access", False),
"can_admin_maildomains": user_entitlements.get(
"can_admin_maildomains", []
),
"operator": user_entitlements.get("operator"),
"mailbox": None,
}

mailbox_id = request.query_params.get("mailbox_id")
if mailbox_id:
try:
mailbox = Mailbox.objects.select_related("domain").get(id=mailbox_id)
mailbox_email = str(mailbox)
mailbox_data = get_mailbox_entitlements(mailbox_email)
response_data["mailbox"] = {
"max_storage": mailbox_data.get("max_storage"),
"storage_used": mailbox_data.get("storage_used"),
}
except Mailbox.DoesNotExist:
response_data["mailbox"] = None
Comment on lines +51 to +62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce mailbox access and validate mailbox_id.

Any authenticated user can query any mailbox id, and malformed ids can surface 500s. Gate mailbox lookups by user access (e.g., MailboxAccess) and handle invalid ids (400/404) before calling the backend.

🔧 Proposed fix
+from django.core.exceptions import ValidationError
@@
-from core.models import Mailbox
+from core.models import Mailbox, MailboxAccess
@@
-        if mailbox_id:
-            try:
-                mailbox = Mailbox.objects.select_related("domain").get(id=mailbox_id)
-                mailbox_email = str(mailbox)
-                mailbox_data = get_mailbox_entitlements(mailbox_email)
-                response_data["mailbox"] = {
-                    "max_storage": mailbox_data.get("max_storage"),
-                    "storage_used": mailbox_data.get("storage_used"),
-                }
-            except Mailbox.DoesNotExist:
-                response_data["mailbox"] = None
-            except EntitlementsUnavailableError:
-                return Response(
-                    {"detail": "Entitlements service unavailable"}, status=503
-                )
+        if mailbox_id:
+            try:
+                if not MailboxAccess.objects.filter(
+                    user=request.user, mailbox_id=mailbox_id
+                ).exists():
+                    return Response({"detail": "Mailbox not found"}, status=404)
+                mailbox = Mailbox.objects.select_related("domain").get(id=mailbox_id)
+                mailbox_email = str(mailbox)
+                mailbox_data = get_mailbox_entitlements(mailbox_email)
+                response_data["mailbox"] = {
+                    "max_storage": mailbox_data.get("max_storage"),
+                    "storage_used": mailbox_data.get("storage_used"),
+                }
+            except (Mailbox.DoesNotExist, ValidationError, ValueError):
+                return Response({"detail": "Invalid mailbox_id"}, status=400)
+            except EntitlementsUnavailableError:
+                return Response(
+                    {"detail": "Entitlements service unavailable"}, status=503
+                )
🤖 Prompt for AI Agents
In `@src/backend/core/api/viewsets/entitlements.py` around lines 51 - 62, Validate
the incoming mailbox_id format and enforce access before fetching entitlements:
first check mailbox_id is a valid identifier (return 400 for malformed ids),
then try to load the Mailbox via
Mailbox.objects.select_related("domain").get(id=mailbox_id) and if it doesn't
exist return 404; after loading, verify the caller has access using the
MailboxAccess relationship (e.g.,
MailboxAccess.objects.filter(user=request.user, mailbox=mailbox).exists()) and
return 404/403 when access is missing; only then call
get_mailbox_entitlements(mailbox_email) and populate response_data["mailbox"]
(keep Mailbox.DoesNotExist handling to map to 404 and avoid raising 500).

except EntitlementsUnavailableError:
return Response(
{"detail": "Entitlements service unavailable"}, status=503
)

return Response(response_data)
79 changes: 78 additions & 1 deletion src/backend/core/authentication/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
)

from core.enums import MailboxRoleChoices
from core.enums import MailboxRoleChoices, MailDomainAccessRoleChoices
from core.models import (
Contact,
DuplicateEmailError,
Mailbox,
MailboxAccess,
MailDomain,
MailDomainAccess,
User,
)

Expand Down Expand Up @@ -88,6 +89,82 @@ def post_get_or_create_user(self, user, claims, _user_created):
"""Post-get or create user."""
if user:
self.autojoin_mailbox(user)
self._sync_entitlements(user)
self._check_can_access(user)

def _check_can_access(self, user):
"""Check if the user has access to the app via entitlements.

Called after _sync_entitlements which populates the cache.
Raises SuspiciousOperation to deny login if can_access is False.
"""
from core.entitlements import (
EntitlementsUnavailableError,
get_user_entitlements,
)

try:
entitlements = get_user_entitlements(user.sub, user.email)
except EntitlementsUnavailableError:
# Fail open at login: if entitlements service is down,
# allow login (sync already logged the error)
return

if not entitlements.get("can_access", False):
raise SuspiciousOperation(_("Access denied by entitlements policy"))

def _sync_entitlements(self, user):
"""Fetch user entitlements and sync MailDomainAccess ADMIN records."""
from core.entitlements import (
EntitlementsUnavailableError,
get_user_entitlements,
)

try:
entitlements = get_user_entitlements(
user.sub, user.email, force_refresh=True
)
except EntitlementsUnavailableError:
logger.error(
"Entitlements service unavailable during login for user %s",
user.sub,
)
return

admin_domains = entitlements.get("can_admin_maildomains")
if admin_domains is None:
# Backend doesn't support this field (e.g. dummy), skip sync
return

# Resolve domain names to MailDomain objects that exist in DB
entitled_domains = MailDomain.objects.filter(name__in=admin_domains)
entitled_domain_ids = set(entitled_domains.values_list("id", flat=True))

# Create missing MailDomainAccess ADMIN records
existing_accesses = MailDomainAccess.objects.filter(
user=user, role=MailDomainAccessRoleChoices.ADMIN
)
existing_domain_ids = set(
existing_accesses.values_list("maildomain_id", flat=True)
)

# Add new accesses
for domain in entitled_domains:
if domain.id not in existing_domain_ids:
MailDomainAccess.objects.create(
user=user,
maildomain=domain,
role=MailDomainAccessRoleChoices.ADMIN,
)
Comment on lines +143 to +158
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid IntegrityError when upgrading admin access.

If the user already has a MailDomainAccess for the domain with a non-admin role, .create will violate the (maildomain, user) uniqueness constraint and can fail login. Use update_or_create (or get_or_create + role update) to safely upgrade roles and avoid races.

🔧 Proposed fix
-        for domain in entitled_domains:
-            if domain.id not in existing_domain_ids:
-                MailDomainAccess.objects.create(
-                    user=user,
-                    maildomain=domain,
-                    role=MailDomainAccessRoleChoices.ADMIN,
-                )
+        for domain in entitled_domains:
+            MailDomainAccess.objects.update_or_create(
+                user=user,
+                maildomain=domain,
+                defaults={"role": MailDomainAccessRoleChoices.ADMIN},
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Create missing MailDomainAccess ADMIN records
existing_accesses = MailDomainAccess.objects.filter(
user=user, role=MailDomainAccessRoleChoices.ADMIN
)
existing_domain_ids = set(
existing_accesses.values_list("maildomain_id", flat=True)
)
# Add new accesses
for domain in entitled_domains:
if domain.id not in existing_domain_ids:
MailDomainAccess.objects.create(
user=user,
maildomain=domain,
role=MailDomainAccessRoleChoices.ADMIN,
)
# Create missing MailDomainAccess ADMIN records
existing_accesses = MailDomainAccess.objects.filter(
user=user, role=MailDomainAccessRoleChoices.ADMIN
)
existing_domain_ids = set(
existing_accesses.values_list("maildomain_id", flat=True)
)
# Add new accesses
for domain in entitled_domains:
MailDomainAccess.objects.update_or_create(
user=user,
maildomain=domain,
defaults={"role": MailDomainAccessRoleChoices.ADMIN},
)
🤖 Prompt for AI Agents
In `@src/backend/core/authentication/backends.py` around lines 121 - 136, The
current loop uses MailDomainAccess.objects.create which can raise IntegrityError
if a non-admin record for (user, maildomain) already exists; change the logic in
the for domain in entitled_domains loop to call
MailDomainAccess.objects.update_or_create (keyed by user=user,
maildomain=domain) with defaults={'role': MailDomainAccessRoleChoices.ADMIN} so
existing non-admin accesses are upgraded safely and races are handled; keep
references to user, entitled_domains, MailDomainAccess, and
MailDomainAccessRoleChoices.ADMIN when making this change.


# Remove stale accesses (domains not in the entitled list)
stale_domain_ids = existing_domain_ids - entitled_domain_ids
if stale_domain_ids:
MailDomainAccess.objects.filter(
user=user,
maildomain_id__in=stale_domain_ids,
role=MailDomainAccessRoleChoices.ADMIN,
).delete()

def get_extra_claims(self, user_info):
"""Get extra claims."""
Expand Down
66 changes: 66 additions & 0 deletions src/backend/core/entitlements/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Entitlements service layer with Django cache."""

import logging

from django.core.cache import cache
from django.conf import settings

from core.entitlements.factory import get_entitlements_backend

logger = logging.getLogger(__name__)


class EntitlementsUnavailableError(Exception):
"""Raised when the entitlements backend cannot be reached or returns an error."""


def get_user_entitlements(
user_sub, user_email, access_token=None, force_refresh=False
):
"""Get user entitlements, using Django cache.

Returns:
dict: {"can_access": bool, "can_admin_maildomains": [str], "operator": dict|None}

Raises:
EntitlementsUnavailableError: If the backend cannot be reached and no cache exists.
"""
cache_key = f"entitlements:user:{user_sub}"

if not force_refresh:
cached = cache.get(cache_key)
if cached is not None:
return cached

backend = get_entitlements_backend()
result = backend.get_user_entitlements(
user_sub, user_email, access_token=access_token
)

cache.set(cache_key, result, settings.ENTITLEMENTS_CACHE_TIMEOUT)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe prefer referring a ENTITLEMENTS_BACKEND_PARAMETERS dict variable to store all the options?
https://github.com/suitenumerique/drive/blob/0c7c381eb74ed409cfb11935dbc2acdf446fbb34/src/backend/drive/settings.py#L1457-L1461

return result
Comment on lines +17 to +41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n src/backend/core/entitlements/__init__.py | head -80

Repository: suitenumerique/messages

Length of output: 2418


Resolve docstring-behavior mismatch: add cache fallback on backend failure.
The docstring states EntitlementsUnavailableError is raised only when the backend cannot be reached and no cache exists. However, when force_refresh=True, the code bypasses the cache check and raises the exception immediately if the backend fails, even if cached data is available. Additionally, the coding guidelines require try-except blocks for business logic exception handling.

Either update the docstring to reflect this behavior, or implement cache fallback on backend failure as suggested below. Both functions (lines 17–41 and 44–66) have this issue.

💡 Suggested implementation
 def get_user_entitlements(
     user_sub, user_email, access_token=None, force_refresh=False
 ):
-    if not force_refresh:
-        cached = cache.get(cache_key)
-        if cached is not None:
-            return cached
+    cached = None
+    if not force_refresh:
+        cached = cache.get(cache_key)
+        if cached is not None:
+            return cached
+
+    try:
+        result = backend.get_user_entitlements(
+            user_sub, user_email, access_token=access_token
+        )
+    except EntitlementsUnavailableError:
+        if cached is not None:
+            return cached
+        raise
🤖 Prompt for AI Agents
In `@src/backend/core/entitlements/__init__.py` around lines 17 - 41, Wrap the
backend call in get_user_entitlements (and the other similar function around
lines 44–66) in a try/except that catches EntitlementsUnavailableError: call
get_entitlements_backend().get_user_entitlements(...) inside the try; on
exception, attempt to read the same cache_key via cache.get(cache_key) and
return cached value if present, otherwise re-raise the
EntitlementsUnavailableError; keep cache.set(cache_key, result,
settings.ENTITLEMENTS_CACHE_TIMEOUT) on success. This keeps force_refresh
semantics but provides the documented cache fallback and follows the guideline
to handle business-logic exceptions.



def get_mailbox_entitlements(mailbox_email, access_token=None, force_refresh=False):
"""Get mailbox entitlements, using Django cache.

Returns:
dict: {"max_storage": int|None, "storage_used": int|None}

Raises:
EntitlementsUnavailableError: If the backend cannot be reached and no cache exists.
"""
cache_key = f"entitlements:mailbox:{mailbox_email}"

if not force_refresh:
cached = cache.get(cache_key)
if cached is not None:
return cached

backend = get_entitlements_backend()
result = backend.get_mailbox_entitlements(
mailbox_email, access_token=access_token
)

cache.set(cache_key, result, settings.ENTITLEMENTS_CACHE_TIMEOUT)
return result
Empty file.
Loading
Loading