Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ninja_extra/permissions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def __invert__( # type:ignore[misc]
return SingleOperandHolder(NOT, self)


class BasePermissionMetaclass(OperationHolderMixin, ABCMeta):
...
class BasePermissionMetaclass(OperationHolderMixin, ABCMeta): ...


class BasePermission(ABC, metaclass=BasePermissionMetaclass): # pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions ninja_extra/permissions/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class IsAdminUser(BasePermission):
"""
Allows access only to admin users.
"""

message: str = "You must be an admin user to access this resource."

def has_permission(
Expand Down
10 changes: 8 additions & 2 deletions ninja_extra/security/session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.conf import settings
from django.http import HttpRequest
from ninja.signature import is_async

from ninja_extra.security.api_key import AsyncAPIKeyCookie

Expand All @@ -16,6 +17,11 @@ class AsyncSessionAuth(AsyncAPIKeyCookie):
async def authenticate(
self, request: HttpRequest, key: Optional[str]
) -> Optional[Any]:
if request.user.is_authenticated:
return request.user
if hasattr(request, "auser") and is_async(request.auser):
current_user = await request.auser()
else:
current_user = request.user

if current_user.is_authenticated:
return current_user
return None
61 changes: 30 additions & 31 deletions tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@


class TestPermissionsCompositions:

@classmethod
def get_real_user_request(cls):
_request = Mock()
Expand Down Expand Up @@ -43,8 +42,8 @@ def test_is_authenticated_and_read_only(self, method, auth, result):
request = self.get_real_user_request()
request.method = method
assert (
permissions.IsAuthenticatedOrReadOnly().has_permission(request, Mock())
== result
permissions.IsAuthenticatedOrReadOnly().has_permission(request, Mock())
== result
)

def test_and_false(self):
Expand Down Expand Up @@ -83,8 +82,8 @@ def test_not_false(self):
composed_perm = ~permissions.IsAuthenticated
assert composed_perm().has_permission(anonymous_request, None) is True
assert (
composed_perm().has_object_permission(anonymous_request, None, None)
is False
composed_perm().has_object_permission(anonymous_request, None, None)
is False
)
# Message
assert composed_perm().message == permissions.IsAuthenticated.message
Expand All @@ -99,10 +98,10 @@ def test_not_true(self):
def test_several_levels_without_negation(self):
request = self.get_real_user_request()
composed_perm = (
permissions.IsAuthenticated
& permissions.IsAuthenticated
& permissions.IsAuthenticated
& permissions.IsAuthenticated
permissions.IsAuthenticated
& permissions.IsAuthenticated
& permissions.IsAuthenticated
& permissions.IsAuthenticated
)
assert composed_perm().has_permission(request, None) is True
assert composed_perm().has_object_permission(request, None, None) is True
Expand All @@ -111,28 +110,28 @@ def test_several_levels_without_negation(self):
def test_several_levels_and_precedence_with_negation(self):
request = self.get_real_user_request()
composed_perm = (
permissions.IsAuthenticated
& ~permissions.IsAdminUser
& permissions.IsAuthenticated
& ~(permissions.IsAdminUser & permissions.IsAdminUser)
permissions.IsAuthenticated
& ~permissions.IsAdminUser
& permissions.IsAuthenticated
& ~(permissions.IsAdminUser & permissions.IsAdminUser)
)
assert composed_perm().has_permission(request, None) is True

@pytest.mark.django_db
def test_several_levels_and_precedence(self):
request = self.get_real_user_request()
composed_perm = (
permissions.IsAuthenticated & permissions.IsAuthenticated
| permissions.IsAuthenticated & permissions.IsAuthenticated
permissions.IsAuthenticated & permissions.IsAuthenticated
| permissions.IsAuthenticated & permissions.IsAuthenticated
)
assert composed_perm().has_permission(request, None) is True

def test_or_lazyness(self):
with mock.patch.object(
permissions.AllowAny, "has_permission", return_value=True
permissions.AllowAny, "has_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_permission", return_value=False
permissions.IsAuthenticated, "has_permission", return_value=False
) as mock_deny:
composed_perm = permissions.AllowAny | permissions.IsAuthenticated
hasperm = composed_perm().has_permission(anonymous_request, None)
Expand All @@ -141,10 +140,10 @@ def test_or_lazyness(self):
mock_deny.assert_not_called()

with mock.patch.object(
permissions.AllowAny, "has_permission", return_value=True
permissions.AllowAny, "has_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_permission", return_value=False
permissions.IsAuthenticated, "has_permission", return_value=False
) as mock_deny:
composed_perm = permissions.IsAuthenticated | permissions.AllowAny
hasperm = composed_perm().has_permission(anonymous_request, None)
Expand All @@ -154,10 +153,10 @@ def test_or_lazyness(self):

def test_object_or_lazyness(self):
with mock.patch.object(
permissions.AllowAny, "has_object_permission", return_value=True
permissions.AllowAny, "has_object_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_object_permission", return_value=False
permissions.IsAuthenticated, "has_object_permission", return_value=False
) as mock_deny:
composed_perm = permissions.AllowAny | permissions.IsAuthenticated
hasperm = composed_perm().has_object_permission(
Expand All @@ -168,10 +167,10 @@ def test_object_or_lazyness(self):
mock_deny.assert_not_called()

with mock.patch.object(
permissions.AllowAny, "has_object_permission", return_value=True
permissions.AllowAny, "has_object_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_object_permission", return_value=False
permissions.IsAuthenticated, "has_object_permission", return_value=False
) as mock_deny:
composed_perm = permissions.IsAuthenticated | permissions.AllowAny
hasperm = composed_perm().has_object_permission(
Expand All @@ -183,10 +182,10 @@ def test_object_or_lazyness(self):

def test_and_lazyness(self):
with mock.patch.object(
permissions.AllowAny, "has_permission", return_value=True
permissions.AllowAny, "has_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_permission", return_value=False
permissions.IsAuthenticated, "has_permission", return_value=False
) as mock_deny:
composed_perm = permissions.AllowAny & permissions.IsAuthenticated
hasperm = composed_perm().has_permission(anonymous_request, None)
Expand All @@ -195,10 +194,10 @@ def test_and_lazyness(self):
assert mock_deny.call_count == 1

with mock.patch.object(
permissions.AllowAny, "has_permission", return_value=True
permissions.AllowAny, "has_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_permission", return_value=False
permissions.IsAuthenticated, "has_permission", return_value=False
) as mock_deny:
composed_perm = permissions.IsAuthenticated & permissions.AllowAny
hasperm = composed_perm().has_permission(anonymous_request, None)
Expand All @@ -208,10 +207,10 @@ def test_and_lazyness(self):

def test_object_and_lazyness(self):
with mock.patch.object(
permissions.AllowAny, "has_object_permission", return_value=True
permissions.AllowAny, "has_object_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_object_permission", return_value=False
permissions.IsAuthenticated, "has_object_permission", return_value=False
) as mock_deny:
composed_perm = permissions.AllowAny & permissions.IsAuthenticated
hasperm = composed_perm().has_object_permission(
Expand All @@ -222,10 +221,10 @@ def test_object_and_lazyness(self):
assert mock_deny.call_count == 1

with mock.patch.object(
permissions.AllowAny, "has_object_permission", return_value=True
permissions.AllowAny, "has_object_permission", return_value=True
) as mock_allow:
with mock.patch.object(
permissions.IsAuthenticated, "has_object_permission", return_value=False
permissions.IsAuthenticated, "has_object_permission", return_value=False
) as mock_deny:
composed_perm = permissions.IsAuthenticated & permissions.AllowAny
hasperm = composed_perm().has_object_permission(
Expand Down
40 changes: 40 additions & 0 deletions tests/test_security/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from unittest.mock import AsyncMock, Mock

import pytest
from django.http import HttpRequest

from ninja_extra.security.session import AsyncSessionAuth


@pytest.mark.asyncio
async def test_async_session_auth():
auth = AsyncSessionAuth()
request = HttpRequest()

# Test async authenticated user
async_user = AsyncMock()
async_user.is_authenticated = True
request.auser = AsyncMock(return_value=async_user)

authenticated_user = await auth.authenticate(request, None)
assert authenticated_user == async_user
request.auser.assert_called_once()

# Test async non-authenticated user
async_user.is_authenticated = False
authenticated_user = await auth.authenticate(request, None)
assert authenticated_user is None

# Test non-async authenticated user
delattr(request, "auser")
sync_user = Mock()
sync_user.is_authenticated = True
request.user = sync_user

authenticated_user = await auth.authenticate(request, None)
assert authenticated_user == sync_user

# Test non-async non-authenticated user
sync_user.is_authenticated = False
authenticated_user = await auth.authenticate(request, None)
assert authenticated_user is None