Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 70 additions & 11 deletions ee/api/scim/test/test_users_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ee.api.scim.auth import generate_scim_token
from ee.api.test.base import APILicensedTest
from ee.models.rbac.role import RoleMembership
from ee.models.scim_provisioned_user import SCIMProvisionedUser


class TestSCIMUsersAPI(APILicensedTest):
Expand Down Expand Up @@ -56,13 +57,27 @@ def test_users_list_filter_exact_match(self):
OrganizationMembership.objects.create(
user=user_a, organization=self.organization, level=OrganizationMembership.Level.MEMBER
)
SCIMProvisionedUser.objects.create(
user=user_a,
organization_domain=self.domain,
username="[email protected]",
identity_provider=SCIMProvisionedUser.IdentityProvider.OTHER,
active=True,
)

user_b = User.objects.create_user(
email="[email protected]", password=None, first_name="Alex", last_name="Other", is_email_verified=True
)
OrganizationMembership.objects.create(
user=user_b, organization=self.organization, level=OrganizationMembership.Level.MEMBER
)
SCIMProvisionedUser.objects.create(
user=user_b,
organization_domain=self.domain,
username="[email protected]",
identity_provider=SCIMProvisionedUser.IdentityProvider.OTHER,
active=True,
)

# Exact match should return only [email protected]
response = self.client.get(
Expand Down Expand Up @@ -129,9 +144,9 @@ def test_users_list_filter_unrecognized_returns_empty_list(self):
def test_create_user(self):
user_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "newuser@example.com",
"userName": "Newuser@example.com",
"name": {"givenName": "New", "familyName": "User"},
"emails": [{"value": "newuser@example.com", "primary": True}],
"emails": [{"value": "Newuser@example.com", "primary": True}],
"active": True,
}

Expand All @@ -141,11 +156,11 @@ def test_create_user(self):

assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert data["userName"] == "newuser@example.com"
assert data["userName"] == "Newuser@example.com"
assert data["name"]["givenName"] == "New"
assert data["name"]["familyName"] == "User"

# Verify user was created
# Verify user was created with lowercase email
user = User.objects.get(email="[email protected]")
assert user.first_name == "New"
assert user.last_name == "User"
Expand All @@ -155,6 +170,12 @@ def test_create_user(self):
membership = OrganizationMembership.objects.get(user=user, organization=self.organization)
assert membership.level == OrganizationMembership.Level.MEMBER

# Verify SCIM provisioned user record was created
scim_user = SCIMProvisionedUser.objects.get(user=user, organization_domain=self.domain)
assert scim_user.username == "[email protected]"
assert scim_user.active is True
assert scim_user.identity_provider == SCIMProvisionedUser.IdentityProvider.OTHER

def test_existing_user_is_added_to_org(self):
# Create user in different org
other_org = Organization.objects.create(name="Other Org")
Expand Down Expand Up @@ -184,9 +205,11 @@ def test_existing_user_is_added_to_org(self):
assert OrganizationMembership.objects.filter(user=existing_user, organization=self.organization).exists()
assert OrganizationMembership.objects.filter(user=existing_user, organization=other_org).exists()

def test_repeated_post_does_not_create_duplicate_user(self):
# In case the IdP failed to match user by id, it can send POST request to create a new user.
# The user should be merged with existing one by email, not create a duplicate.
# Verify SCIM provisioned user record was created for this domain
scim_user = SCIMProvisionedUser.objects.get(user=existing_user, organization_domain=self.domain)
assert scim_user.active is True

def test_repeated_post_returns_409_for_already_provisioned_user(self):
user_data_first = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "[email protected]",
Expand All @@ -202,7 +225,7 @@ def test_repeated_post_does_not_create_duplicate_user(self):
assert response.status_code == status.HTTP_201_CREATED
first_user = User.objects.get(email="[email protected]")

# IdP sends POST request again with same email
# IdP sends POST request again with same email - should fail with 409
user_data_second = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "[email protected]",
Expand All @@ -215,14 +238,14 @@ def test_repeated_post_does_not_create_duplicate_user(self):
f"/scim/v2/{self.domain.id}/Users", data=user_data_second, content_type="application/scim+json"
)

assert response.status_code == status.HTTP_201_CREATED
assert response.status_code == status.HTTP_409_CONFLICT

# Should NOT create duplicate user
assert User.objects.filter(email="[email protected]").count() == 1

# User should be updated with new data from second POST
# User should NOT be updated (still has first POST data)
first_user.refresh_from_db()
assert first_user.first_name == "Second"
assert first_user.first_name == "First"
assert first_user.last_name == "Time"

# User should have only one membership
Expand Down Expand Up @@ -261,6 +284,14 @@ def test_deactivate_user(self):
OrganizationMembership.objects.create(
user=user, organization=self.organization, level=OrganizationMembership.Level.MEMBER
)
# Create SCIM provisioned user record
SCIMProvisionedUser.objects.create(
user=user,
organization_domain=self.domain,
username="[email protected]",
identity_provider=SCIMProvisionedUser.IdentityProvider.OTHER,
active=True,
)

patch_data = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
Expand All @@ -280,13 +311,25 @@ def test_deactivate_user(self):
user.refresh_from_db()
assert user.is_active is True # User is still active globally

# Verify SCIM provisioned user record still exists but is marked inactive
scim_user = SCIMProvisionedUser.objects.get(user=user, organization_domain=self.domain)
assert scim_user.active is False

def test_delete_user(self):
user = User.objects.create_user(
email="[email protected]", password=None, first_name="Delete", is_email_verified=True
)
OrganizationMembership.objects.create(
user=user, organization=self.organization, level=OrganizationMembership.Level.MEMBER
)
# Create SCIM provisioned user record
SCIMProvisionedUser.objects.create(
user=user,
organization_domain=self.domain,
username="[email protected]",
identity_provider=SCIMProvisionedUser.IdentityProvider.OTHER,
active=True,
)

response = self.client.delete(f"/scim/v2/{self.domain.id}/Users/{user.id}")

Expand All @@ -295,13 +338,24 @@ def test_delete_user(self):
# Verify membership was removed
assert not OrganizationMembership.objects.filter(user=user, organization=self.organization).exists()

# Verify SCIM provisioned user record was deleted
assert not SCIMProvisionedUser.objects.filter(user=user, organization_domain=self.domain).exists()

def test_put_user(self):
user = User.objects.create_user(
email="[email protected]", password=None, first_name="Old", last_name="Name", is_email_verified=True
)
OrganizationMembership.objects.create(
user=user, organization=self.organization, level=OrganizationMembership.Level.MEMBER
)
# Create SCIM provisioned user record
SCIMProvisionedUser.objects.create(
user=user,
organization_domain=self.domain,
username="[email protected]",
identity_provider=SCIMProvisionedUser.IdentityProvider.OTHER,
active=True,
)

put_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
Expand All @@ -321,6 +375,11 @@ def test_put_user(self):
assert user.last_name == "User"
assert user.email == "[email protected]"

# Verify SCIM provisioned user was updated
scim_user = SCIMProvisionedUser.objects.get(user=user, organization_domain=self.domain)
assert scim_user.username == "[email protected]"
assert scim_user.active is True

def test_put_user_not_found(self):
put_data = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
Expand Down
Loading
Loading