Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions descope/management/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class MgmtV1:
user_create_batch_path = "/v1/mgmt/user/create/batch"
user_update_path = "/v1/mgmt/user/update"
user_patch_path = "/v1/mgmt/user/patch"
user_patch_batch_path = "/v1/mgmt/user/patch/batch"
user_delete_path = "/v1/mgmt/user/delete"
user_logout_path = "/v1/mgmt/user/logout"
user_delete_all_test_users_path = "/v1/mgmt/user/test/delete/all"
Expand Down
83 changes: 83 additions & 0 deletions descope/management/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def patch(
verified_email: Optional[bool] = None,
verified_phone: Optional[bool] = None,
sso_app_ids: Optional[List[str]] = None,
status: Optional[str] = None,
test: bool = False,
) -> dict:
"""
Expand All @@ -443,6 +444,7 @@ def patch(
picture (str): Optional url for user picture
custom_attributes (dict): Optional, set the different custom attributes values of the keys that were previously configured in Descope console app
sso_app_ids (List[str]): Optional, list of SSO applications IDs to be associated with the user.
status (str): Optional status field. Can be one of: "enabled", "disabled", "invited".
test (bool, optional): Set to True to update a test user. Defaults to False.

Return value (dict):
Expand All @@ -453,6 +455,12 @@ def patch(
Raise:
AuthException: raised if patch operation fails
"""
if status is not None and status not in ["enabled", "disabled", "invited"]:
raise AuthException(
400,
ERROR_TYPE_INVALID_ARGUMENT,
f"Invalid status value: {status}. Must be one of: enabled, disabled, invited",
)
response = self._auth.do_patch(
MgmtV1.user_patch_path,
User._compose_patch_body(
Expand All @@ -470,12 +478,55 @@ def patch(
verified_email,
verified_phone,
sso_app_ids,
status,
test,
),
pswd=self._auth.management_key,
)
return response.json()

def patch_batch(
self,
users: List[UserObj],
test: bool = False,
) -> dict:
"""
Patch users in batch. Only the provided fields will be updated for each user.

Args:
users (List[UserObj]): A list of UserObj instances representing users to be patched.
Each UserObj should have a login_id and the fields to be updated.
test (bool, optional): Set to True to patch test users. Defaults to False.

Return value (dict):
Return dict in the format
{"patchedUsers": [...], "failedUsers": [...]}
"patchedUsers" contains successfully patched users,
"failedUsers" contains users that failed to be patched with error details.

Raise:
AuthException: raised if patch batch operation fails
"""
# Validate status fields for all users
for user in users:
if user.status is not None and user.status not in [
"enabled",
"disabled",
"invited",
]:
raise AuthException(
400,
ERROR_TYPE_INVALID_ARGUMENT,
f"Invalid status value: {user.status} for user {user.login_id}. Must be one of: enabled, disabled, invited",
)

response = self._auth.do_patch(
MgmtV1.user_patch_batch_path,
User._compose_patch_batch_body(users, test),
pswd=self._auth.management_key,
)
return response.json()

def delete(
self,
login_id: str,
Expand Down Expand Up @@ -1968,6 +2019,7 @@ def _compose_patch_body(
verified_email: Optional[bool],
verified_phone: Optional[bool],
sso_app_ids: Optional[List[str]],
status: Optional[str],
test: bool = False,
) -> dict:
res: dict[str, Any] = {
Expand Down Expand Up @@ -1999,6 +2051,37 @@ def _compose_patch_body(
res["verifiedPhone"] = verified_phone
if sso_app_ids is not None:
res["ssoAppIds"] = sso_app_ids
if status is not None:
res["status"] = status
if test:
res["test"] = test
return res

@staticmethod
def _compose_patch_batch_body(
users: List[UserObj],
test: bool = False,
) -> dict:
users_body = []
for user in users:
user_body = User._compose_patch_body(
login_id=user.login_id,
email=user.email,
phone=user.phone,
display_name=user.display_name,
given_name=user.given_name,
middle_name=user.middle_name,
family_name=user.family_name,
role_names=user.role_names,
user_tenants=user.user_tenants,
picture=user.picture,
custom_attributes=user.custom_attributes,
verified_email=user.verified_email,
verified_phone=user.verified_phone,
sso_app_ids=user.sso_app_ids,
status=user.status,
test=test,
)
users_body.append(user_body)

return {"users": users_body}
201 changes: 193 additions & 8 deletions tests/management/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,175 @@ def test_patch(self):
timeout=DEFAULT_TIMEOUT_SECONDS,
)

def test_patch_with_status(self):
# Test invalid status value
with self.assertRaises(AuthException) as context:
self.client.mgmt.user.patch("valid-id", status="invalid_status")

self.assertEqual(context.exception.status_code, 400)
self.assertIn("Invalid status value: invalid_status", str(context.exception))

# Test valid status values
valid_statuses = ["enabled", "disabled", "invited"]

for status in valid_statuses:
with patch("requests.patch") as mock_patch:
network_resp = mock.Mock()
network_resp.ok = True
network_resp.json.return_value = json.loads(
"""{"user": {"id": "u1"}}"""
)
mock_patch.return_value = network_resp

resp = self.client.mgmt.user.patch("id", status=status)
user = resp["user"]
self.assertEqual(user["id"], "u1")

mock_patch.assert_called_with(
f"{common.DEFAULT_BASE_URL}{MgmtV1.user_patch_path}",
headers={
**common.default_headers,
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}",
"x-descope-project-id": self.dummy_project_id,
},
params=None,
json={
"loginId": "id",
"status": status,
},
allow_redirects=False,
verify=True,
timeout=DEFAULT_TIMEOUT_SECONDS,
)

# Test that status is not included when None
with patch("requests.patch") as mock_patch:
network_resp = mock.Mock()
network_resp.ok = True
network_resp.json.return_value = json.loads("""{"user": {"id": "u1"}}""")
mock_patch.return_value = network_resp

resp = self.client.mgmt.user.patch("id", display_name="test", status=None)
user = resp["user"]
self.assertEqual(user["id"], "u1")

# Verify that status is not in the JSON payload
call_args = mock_patch.call_args
json_payload = call_args[1]["json"]
self.assertNotIn("status", json_payload)
self.assertEqual(json_payload["displayName"], "test")

def test_patch_batch(self):
# Test invalid status value in batch
users_with_invalid_status = [
UserObj(login_id="user1", status="invalid_status"),
UserObj(login_id="user2", status="enabled"),
]

with self.assertRaises(AuthException) as context:
self.client.mgmt.user.patch_batch(users_with_invalid_status)

self.assertEqual(context.exception.status_code, 400)
self.assertIn(
"Invalid status value: invalid_status for user user1",
str(context.exception),
)

# Test successful batch patch
users = [
UserObj(login_id="user1", email="[email protected]", status="enabled"),
UserObj(login_id="user2", display_name="User Two", status="disabled"),
UserObj(login_id="user3", phone="+123456789", status="invited"),
]

with patch("requests.patch") as mock_patch:
network_resp = mock.Mock()
network_resp.ok = True
network_resp.json.return_value = json.loads(
"""{"patchedUsers": [{"id": "u1"}, {"id": "u2"}, {"id": "u3"}], "failedUsers": []}"""
)
mock_patch.return_value = network_resp

resp = self.client.mgmt.user.patch_batch(users)

self.assertEqual(len(resp["patchedUsers"]), 3)
self.assertEqual(len(resp["failedUsers"]), 0)

mock_patch.assert_called_with(
f"{common.DEFAULT_BASE_URL}{MgmtV1.user_patch_batch_path}",
headers={
**common.default_headers,
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}",
"x-descope-project-id": self.dummy_project_id,
},
params=None,
json={
"users": [
{
"loginId": "user1",
"email": "[email protected]",
"status": "enabled",
},
{
"loginId": "user2",
"displayName": "User Two",
"status": "disabled",
},
{
"loginId": "user3",
"phone": "+123456789",
"status": "invited",
},
]
},
allow_redirects=False,
verify=True,
timeout=DEFAULT_TIMEOUT_SECONDS,
)

# Test batch with mixed success/failure response
with patch("requests.patch") as mock_patch:
network_resp = mock.Mock()
network_resp.ok = True
network_resp.json.return_value = json.loads(
"""{"patchedUsers": [{"id": "u1"}], "failedUsers": [{"failure": "User not found", "user": {"loginId": "user2"}}]}"""
)
mock_patch.return_value = network_resp

resp = self.client.mgmt.user.patch_batch(
[UserObj(login_id="user1"), UserObj(login_id="user2")]
)

self.assertEqual(len(resp["patchedUsers"]), 1)
self.assertEqual(len(resp["failedUsers"]), 1)
self.assertEqual(resp["failedUsers"][0]["failure"], "User not found")

# Test failed batch operation
with patch("requests.patch") as mock_patch:
mock_patch.return_value.ok = False
self.assertRaises(
AuthException,
self.client.mgmt.user.patch_batch,
[UserObj(login_id="user1")],
)

# Test with test users flag
with patch("requests.patch") as mock_patch:
network_resp = mock.Mock()
network_resp.ok = True
network_resp.json.return_value = json.loads(
"""{"patchedUsers": [{"id": "u1"}], "failedUsers": []}"""
)
mock_patch.return_value = network_resp

resp = self.client.mgmt.user.patch_batch(
[UserObj(login_id="test_user1")], test=True
)

call_args = mock_patch.call_args
json_payload = call_args[1]["json"]
self.assertTrue(json_payload["users"][0]["test"])

def test_delete(self):
# Test failed flows
with patch("requests.post") as mock_post:
Expand Down Expand Up @@ -1056,8 +1225,12 @@ def test_search_all(self):
)
mock_post.return_value = network_resp
resp = self.client.mgmt.user.search_all(
tenant_role_ids={"tenant1": {"values": ["roleA", "roleB"], "and": True}},
tenant_role_names={"tenant2": {"values": ["admin", "user"], "and": False}},
tenant_role_ids={
"tenant1": {"values": ["roleA", "roleB"], "and": True}
},
tenant_role_names={
"tenant2": {"values": ["admin", "user"], "and": False}
},
)
users = resp["users"]
self.assertEqual(len(users), 2)
Expand All @@ -1078,8 +1251,12 @@ def test_search_all(self):
"page": 0,
"testUsersOnly": False,
"withTestUser": False,
"tenantRoleIds": {"tenant1": {"values": ["roleA", "roleB"], "and": True}},
"tenantRoleNames": {"tenant2": {"values": ["admin", "user"], "and": False}},
"tenantRoleIds": {
"tenant1": {"values": ["roleA", "roleB"], "and": True}
},
"tenantRoleNames": {
"tenant2": {"values": ["admin", "user"], "and": False}
},
},
allow_redirects=False,
verify=True,
Expand Down Expand Up @@ -1302,8 +1479,12 @@ def test_search_all_test_users(self):
)
mock_post.return_value = network_resp
resp = self.client.mgmt.user.search_all_test_users(
tenant_role_ids={"tenant1": {"values": ["roleA", "roleB"], "and": True}},
tenant_role_names={"tenant2": {"values": ["admin", "user"], "and": False}},
tenant_role_ids={
"tenant1": {"values": ["roleA", "roleB"], "and": True}
},
tenant_role_names={
"tenant2": {"values": ["admin", "user"], "and": False}
},
)
users = resp["users"]
self.assertEqual(len(users), 2)
Expand All @@ -1324,8 +1505,12 @@ def test_search_all_test_users(self):
"page": 0,
"testUsersOnly": True,
"withTestUser": True,
"tenantRoleIds": {"tenant1": {"values": ["roleA", "roleB"], "and": True}},
"tenantRoleNames": {"tenant2": {"values": ["admin", "user"], "and": False}},
"tenantRoleIds": {
"tenant1": {"values": ["roleA", "roleB"], "and": True}
},
"tenantRoleNames": {
"tenant2": {"values": ["admin", "user"], "and": False}
},
},
allow_redirects=False,
verify=True,
Expand Down
Loading