Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Unreleased
----------
* nothing unreleased

[6.7.0] - 2026-03-10
---------------------
* feat: Invite admin endpoints with validation (ENT-11238)

[6.6.9] - 2026-03-10
---------------------
* fix: handle duplicate enterprise group name validation error (ENT-11506)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Your project description goes here.
"""

__version__ = "6.6.9"
__version__ = "6.7.0"
260 changes: 260 additions & 0 deletions enterprise/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,285 @@
"""
Utility functions for the Enterprise API.
"""
import logging
from typing import List, Set

from django.conf import settings
from django.contrib import auth
from django.db import DatabaseError, transaction
from django.db.models import F
from django.db.models.functions import Lower
from django.utils.translation import gettext as _

from enterprise.constants import (
BRAZE_ADMIN_INVITE_CAMPAIGN_SETTING,
BRAZE_LEARNER_INVITE_CAMPAIGN_SETTING,
ENTERPRISE_ADMIN_ROLE,
ENTERPRISE_CATALOG_ADMIN_ROLE,
ENTERPRISE_DASHBOARD_ADMIN_ROLE,
ENTERPRISE_REPORTING_CONFIG_ADMIN_ROLE,
AdminInviteStatus,
)
from enterprise.models import (
EnterpriseCustomer,
EnterpriseCustomerAdmin,
EnterpriseCustomerCatalog,
EnterpriseCustomerInviteKey,
EnterpriseCustomerReportingConfiguration,
EnterpriseCustomerUser,
EnterpriseFeatureRole,
EnterpriseFeatureUserRoleAssignment,
EnterpriseGroup,
PendingEnterpriseCustomerAdminUser,
SystemWideEnterpriseUserRoleAssignment,
)
from enterprise.tasks import send_enterprise_admin_invite_email

logger = logging.getLogger(__name__)


def get_existing_admin_emails(enterprise_customer: EnterpriseCustomer) -> Set[str]:
"""
Retrieve normalized email addresses of existing ACTIVE enterprise admins.

Only includes admins who have:
1. An EnterpriseCustomerAdmin record
2. An active EnterpriseCustomerUser (active=True)
3. An active admin role assignment in SystemWideEnterpriseUserRoleAssignment

Args:
enterprise_customer: The enterprise customer instance.

Returns:
Set of lowercased email addresses of active admins with valid role assignments.

Raises:
DatabaseError: If database query fails.

Example:
>>> emails = get_existing_admin_emails(customer)
>>> 'admin@example.com' in emails
True
"""
try:
# Get user IDs with active admin role assignments
users_with_admin_role = set(
SystemWideEnterpriseUserRoleAssignment.objects.filter(
enterprise_customer=enterprise_customer,
role__name=ENTERPRISE_ADMIN_ROLE,
).values_list('user_id', flat=True)
)

# Return emails of admins who have active ECU AND active role assignment
return set(
EnterpriseCustomerAdmin.objects.filter(
enterprise_customer_user__enterprise_customer=enterprise_customer,
enterprise_customer_user__active=True,
enterprise_customer_user__user_id__in=users_with_admin_role,
)
.annotate(email_l=Lower(F("enterprise_customer_user__user_fk__email")))
.values_list("email_l", flat=True)
)
except DatabaseError:
logger.exception(
"Database error retrieving existing admin emails for enterprise customer: %s",
enterprise_customer.uuid,
)
raise


def get_existing_pending_emails(
enterprise_customer: EnterpriseCustomer,
normalized_emails: List[str]
) -> Set[str]:
"""
Retrieve normalized email addresses of pending admin invitations.

Args:
enterprise_customer: The enterprise customer instance.
normalized_emails: List of normalized email addresses to check.

Returns:
Set of lowercased email addresses that have pending invitations.

Raises:
DatabaseError: If database query fails.

Example:
>>> pending = get_existing_pending_emails(customer, ['user@example.com'])
>>> 'user@example.com' in pending
True
"""
try:
return set(
PendingEnterpriseCustomerAdminUser.objects.filter(
enterprise_customer=enterprise_customer,
)
.annotate(email_l=Lower(F("user_email")))
.filter(email_l__in=normalized_emails)
.values_list("email_l", flat=True)
)
except DatabaseError:
logger.exception(
"Database error retrieving existing pending emails for enterprise customer: %s",
enterprise_customer.uuid,
)
raise


def create_pending_invites(
enterprise_customer: EnterpriseCustomer,
emails_to_invite: List[str]
) -> List[PendingEnterpriseCustomerAdminUser]:
"""
Create pending admin invitations and trigger email notifications.

Creates PendingEnterpriseCustomerAdminUser records for new admin invites
and enqueues Braze email tasks to be sent after transaction commits.

Args:
enterprise_customer: The enterprise customer instance.
emails_to_invite: List of normalized email addresses to invite.

Returns:
List of created PendingEnterpriseCustomerAdminUser instances.

Raises:
DatabaseError: If database operation fails.
ValueError: If emails_to_invite is empty.
RuntimeError: If called outside a transaction.atomic block.

Note:
- Caller must wrap in transaction.atomic() to ensure atomicity
- Uses get_or_create per email to avoid duplicate invite emails in race conditions
- Emails are queued via transaction.on_commit() to send after transaction commits
- Emails are routed to different Braze campaigns based on EnterpriseCustomerUser existence
determined at invite creation time (before transaction commits)
- This ensures emails only send if database changes succeed

Example:
>>> with transaction.atomic():
... invites = create_pending_invites(customer, ['new@example.com'])
>>> len(invites) > 0
True
"""
if not emails_to_invite:
raise ValueError("emails_to_invite cannot be empty")

if not transaction.get_connection().in_atomic_block:
raise RuntimeError("create_pending_invites must be called inside transaction.atomic().")

def _enqueue_email_jobs(customer, invites, ecu_emails):
"""Enqueue invite emails, routing to appropriate Braze campaigns."""
if not invites:
return

try:
created_invite_emails = [invite.user_email for invite in invites]

# Split emails based on pre-determined ECU existence
# Using existing_ecu_emails captured before transaction to avoid race conditions
learner_emails = []
new_admin_emails = []
for email in created_invite_emails:
if email in ecu_emails:
learner_emails.append(email)
else:
new_admin_emails.append(email)

# Send to existing learners with learner campaign
if learner_emails:
send_enterprise_admin_invite_email.delay(
str(customer.uuid),
learner_emails,
campaign_setting_name=BRAZE_LEARNER_INVITE_CAMPAIGN_SETTING
)

# Send to new admins with admin campaign
if new_admin_emails:
send_enterprise_admin_invite_email.delay(
str(customer.uuid),
new_admin_emails,
campaign_setting_name=BRAZE_ADMIN_INVITE_CAMPAIGN_SETTING
)

except Exception: # pylint: disable=broad-except
# Log email queueing failures but don't fail the transaction
# Invites are created successfully, emails can be re-sent manually
logger.exception(
"Failed to enqueue admin invite emails for enterprise customer: %s. "
"Invite count: %d",
customer.uuid,
len(invites)
)

try:
# Query existing active EnterpriseCustomerUsers BEFORE creating invites to determine routing
# This prevents race conditions where ECU is created between invite and email sending
existing_ecu_emails = set(
EnterpriseCustomerUser.objects.filter(
enterprise_customer=enterprise_customer,
user_id__isnull=False,
active=True,
).select_related('user_fk').annotate(
email_lower=Lower('user_fk__email')
).filter(
email_lower__in=emails_to_invite
).values_list('email_lower', flat=True)
)

created_invites = []
for email in emails_to_invite:
pending_invite, created = PendingEnterpriseCustomerAdminUser.objects.get_or_create(
enterprise_customer=enterprise_customer,
user_email=email,
)
if created:
created_invites.append(pending_invite)

transaction.on_commit(
lambda: _enqueue_email_jobs(enterprise_customer, created_invites, existing_ecu_emails)
)
return created_invites

except DatabaseError:
logger.exception(
"Database error creating pending invites for enterprise customer: %s",
enterprise_customer.uuid,
)
raise


def get_invite_status(
email: str,
existing_admin_emails: Set[str],
existing_pending_emails: Set[str]
) -> str:
"""
Determine the invitation status for a given email address.

Args:
email (str): The email address to check.
existing_admin_emails (Set[str]): Set of existing active admin email addresses.
existing_pending_emails (Set[str]): Set of pending invitation email addresses.

Returns:
str: Status constant indicating email state:
- AdminInviteStatus.EXISTING_ADMIN if user is already an active admin
- AdminInviteStatus.PENDING_INVITE if invitation already sent
- AdminInviteStatus.NEW_INVITE if this is a new invitation

Example:
>>> status = get_invite_status('new@example.com', set(), set())
>>> status == 'invite sent'
True
"""
if email in existing_admin_emails:
return AdminInviteStatus.EXISTING_ADMIN
if email in existing_pending_emails:
return AdminInviteStatus.PENDING_INVITE
return AdminInviteStatus.NEW_INVITE


User = auth.get_user_model()
SERVICE_USERNAMES = (
Expand Down
56 changes: 56 additions & 0 deletions enterprise/api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2403,3 +2403,59 @@ class EnterpriseAdminMemberSerializer(serializers.Serializer):
format="%b %d, %Y",
)
status = serializers.CharField()


class AdminInviteSerializer(serializers.Serializer):
"""
Accepts a list of email addresses for processing.

Example::

{
"emails": ["a@x.com", "b@x.com"]
}

Validation:

- Emails are validated for proper format.
- Emails are stripped and lowercased.
- Empty lists are not allowed.
- Duplicate emails are not allowed.
- (Optional) Additional business rules such as domain restrictions can be applied.
"""
emails = serializers.ListField(
child=serializers.EmailField(),
allow_empty=False,
required=True,
error_messages={
"required": "The 'emails' field is required.",
"empty": "This list may not be empty.",
},
)

def validate_emails(self, value):
"""
Normalize emails and check for duplicates.

Args:
value: List of email strings

Returns:
List of normalized (stripped, lowercased) emails

Raises:
ValidationError: If duplicate emails exist
"""
normalized_emails = []

for email in value:
# Strip and lowercase
normalized_email = email.strip().lower()

normalized_emails.append(normalized_email)

# Check for duplicates
if len(normalized_emails) != len(set(normalized_emails)):
raise serializers.ValidationError("Duplicate emails are not allowed.")

return normalized_emails
10 changes: 5 additions & 5 deletions enterprise/api/v1/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@


urlpatterns = [
re_path(
r'^enterprise-customer/(?P<enterprise_customer_uuid>[A-Za-z0-9-]+)/admins/(?P<id>[^/.]+)/?$',
EnterpriseCustomerAdminViewSet.as_view({'delete': 'delete_admin'}),
name='enterprise-customer-delete-admin',
),
re_path(
r'^enterprise_customer_catalog/',
enterprise_customer_catalog.EnterpriseCustomerCatalogWriteViewSet.as_view(
Expand Down Expand Up @@ -249,11 +254,6 @@
enterprise_admin_members.EnterpriseAdminMembersViewSet.as_view({'get': 'list'}),
name='enterprise-admin-members',
),
re_path(
r'^enterprise-customer/(?P<enterprise_customer_uuid>[A-Za-z0-9-]+)/admins/(?P<admin_pk>[A-Za-z0-9-]+)/?$',
EnterpriseCustomerAdminViewSet.as_view({'delete': 'delete_admin'}),
name='enterprise-customer-admin-delete'
),
]

urlpatterns += router.urls
Loading
Loading