Skip to content

Commit 1bb7ecb

Browse files
committed
increasing coverage
1 parent 5ba0176 commit 1bb7ecb

File tree

2 files changed

+120
-11
lines changed

2 files changed

+120
-11
lines changed

services/web/server/src/simcore_service_webserver/security/_authz_access_model.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
""" hierarchical role-based access control (HRBAC)
1+
"""hierarchical role-based access control (HRBAC)
22
33
4-
References:
5-
https://b_logger.nodeswat.com/implement-access-control-in-node-js-8567e7b484d1
4+
References:
5+
https://b_logger.nodeswat.com/implement-access-control-in-node-js-8567e7b484d1
66
"""
77

88
import inspect
99
import logging
1010
import re
11-
from collections.abc import Callable
11+
from collections.abc import Awaitable, Callable
1212
from dataclasses import dataclass, field
1313
from typing import TypeAlias, TypedDict
1414

@@ -27,16 +27,32 @@ class AuthContextDict(TypedDict, total=False):
2727

2828
OptionalContext: TypeAlias = AuthContextDict | dict | None
2929

30+
CheckFunction: TypeAlias = (
31+
# Type for check functions that can be either sync or async
32+
Callable[[OptionalContext], bool]
33+
| Callable[[OptionalContext], Awaitable[bool]]
34+
)
35+
3036

3137
@dataclass
3238
class _RolePermissions:
3339
role: UserRole
34-
# named permissions allowed
35-
allowed: list[str] = field(default_factory=list)
36-
# checked permissions: permissions with conditions
37-
check: dict[str, Callable[[OptionalContext], bool]] = field(default_factory=dict)
38-
# inherited permission
39-
inherits: list[UserRole] = field(default_factory=list)
40+
41+
allowed: list[str] = field(
42+
default_factory=list, metadata={"description": "list of allowed operations"}
43+
)
44+
check: dict[str, CheckFunction] = field(
45+
default_factory=dict,
46+
metadata={
47+
"description": "checked permissions: dict of operations with conditions"
48+
},
49+
)
50+
inherits: list[UserRole] = field(
51+
default_factory=list,
52+
metadata={
53+
"description": "list of parent roles that inherit permissions from this role"
54+
},
55+
)
4056

4157
@classmethod
4258
def from_rawdata(cls, role: str | UserRole, value: dict) -> "_RolePermissions":

services/web/server/tests/unit/isolated/test_security__authz.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,38 @@ def _can_update_inputs(context):
9090
return RoleBasedAccessModel.from_rawdata(fake_roles_permissions)
9191

9292

93+
async def test_operation_in_role_check(access_model: RoleBasedAccessModel):
94+
"""Tests the branch where operation is in role_access.check in the can method"""
95+
R = UserRole # alias
96+
97+
# The "study.pipeline.node.inputs.update" operation has a check function in ANONYMOUS role
98+
99+
# Test with proper context
100+
current_data = {"workbench": {}}
101+
candidate_data = {"workbench": {}} # no changes
102+
context = {"current": current_data, "candidate": candidate_data}
103+
104+
assert await access_model.can(
105+
R.ANONYMOUS, "study.pipeline.node.inputs.update", context=context
106+
)
107+
108+
# Test with invalid context that would make the check function fail
109+
invalid_context = {"wrong_key": "value"} # missing expected keys
110+
assert not await access_model.can(
111+
R.ANONYMOUS, "study.pipeline.node.inputs.update", context=invalid_context
112+
)
113+
114+
# Test with None context (should fail safely)
115+
assert not await access_model.can(
116+
R.ANONYMOUS, "study.pipeline.node.inputs.update", context=None
117+
)
118+
119+
# Test inheritance - USER role inherits ANONYMOUS role's check function
120+
assert await access_model.can(
121+
R.USER, "study.pipeline.node.inputs.update", context=context
122+
)
123+
124+
93125
def test_unique_permissions():
94126
used = []
95127
for role in ROLES_PERMISSIONS:
@@ -292,7 +324,8 @@ async def test_authorization_policy_cache(mocker: MockerFixture, mock_db: MagicM
292324

293325
# new value in db
294326
mock_db.users_db["[email protected]"]["id"] = 2
295-
assert (await autz_cache.get("_get_auth_or_none/[email protected]"))["id"] == 1
327+
got = await autz_cache.get("_get_auth_or_none/[email protected]")
328+
assert got["id"] == 1
296329

297330
# gets cache, db is NOT called
298331
got = await authz_policy._get_authorized_user_or_none(email="[email protected]")
@@ -319,3 +352,63 @@ async def test_authorization_policy_cache(mocker: MockerFixture, mock_db: MagicM
319352
# should raise web.HTTPServiceUnavailable on db failure
320353
with pytest.raises(web.HTTPServiceUnavailable):
321354
await authz_policy._get_authorized_user_or_none(email="[email protected]")
355+
356+
357+
async def test_operation_with_check_callbacks(access_model: RoleBasedAccessModel):
358+
"""Tests operations with different types of check callbacks"""
359+
R = UserRole # alias
360+
361+
# Add a synchronous check callback
362+
def sync_check(context) -> bool:
363+
return context.get("allowed", False) if context else False
364+
365+
# Add an async check callback
366+
async def async_check(context) -> bool:
367+
return context.get("allowed", False) if context else False
368+
369+
# Add a callback that raises an exception
370+
def failing_check(context) -> bool:
371+
raise ValueError("This check always fails")
372+
373+
# Register the callbacks for different operations
374+
access_model.roles[R.USER].check["operation.sync.check"] = sync_check
375+
access_model.roles[R.USER].check["operation.async.check"] = async_check
376+
access_model.roles[R.USER].check["operation.failing.check"] = failing_check
377+
378+
# Test synchronous check callback
379+
assert await access_model.can(
380+
R.USER, "operation.sync.check", context={"allowed": True}
381+
)
382+
assert not await access_model.can(
383+
R.USER, "operation.sync.check", context={"allowed": False}
384+
)
385+
assert not await access_model.can(R.USER, "operation.sync.check", context=None)
386+
387+
# Test asynchronous check callback
388+
assert await access_model.can(
389+
R.USER, "operation.async.check", context={"allowed": True}
390+
)
391+
assert not await access_model.can(
392+
R.USER, "operation.async.check", context={"allowed": False}
393+
)
394+
395+
# Test exception handling in check callback
396+
assert not await access_model.can(
397+
R.USER, "operation.failing.check", context={"allowed": True}
398+
)
399+
400+
# Test inheritance of checked operations
401+
assert await access_model.can(
402+
R.TESTER, "operation.sync.check", context={"allowed": True}
403+
)
404+
assert not await access_model.can(
405+
R.ANONYMOUS, "operation.sync.check", context={"allowed": True}
406+
)
407+
408+
# Test who_can with checked operations
409+
who_can = await access_model.who_can(
410+
"operation.sync.check", context={"allowed": True}
411+
)
412+
assert R.USER in who_can
413+
assert R.TESTER in who_can
414+
assert R.ANONYMOUS not in who_can

0 commit comments

Comments
 (0)