Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions api/internal/tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
GetAdminErrorProviderAdapter,
GetAdminProviderAdapter,
)
from api.shared.permissions import RepositoryPermissionsService, UserIsAdminPermissions
from api.shared.permissions import (
RepositoryPermissionsService,
UserIsAdminPermissions,
is_admin_on_provider,
)


class MockedPermissionsAdapter:
Expand Down Expand Up @@ -178,7 +182,7 @@ def test_is_admin_on_provider_invokes_torngit_adapter_when_user_not_in_admin_arr
user = OwnerFactory()

mocked_get_adapter.return_value = GetAdminProviderAdapter()
self.permissions_class._is_admin_on_provider(user, org)
is_admin_on_provider(user, org)
assert mocked_get_adapter.return_value.last_call_args == {
"username": user.username,
"service_id": user.service_id,
Expand All @@ -192,4 +196,4 @@ def test_is_admin_on_provider_handles_torngit_exception(self, mock_get_provider)
user = OwnerFactory()

with self.assertRaises(APIException):
self.permissions_class._is_admin_on_provider(user, org)
is_admin_on_provider(user, org)
36 changes: 16 additions & 20 deletions api/shared/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ def has_permission(self, request: HttpRequest, view: Any) -> bool:
return True


@torngit_safe
def is_admin_on_provider(current_user: Owner, owner: Owner) -> bool:
torngit_provider_adapter = get_provider(
owner.service,
{
**get_generic_adapter_params(current_user, owner.service),
"owner": {"username": owner.username, "service_id": owner.service_id},
},
)

return async_to_sync(torngit_provider_adapter.get_is_admin)(
user={"username": current_user.username, "service_id": current_user.service_id}
)


class UserIsAdminPermissions(BasePermission):
"""
Permissions class for asserting the user is an admin of the 'owner'
Expand All @@ -158,29 +173,10 @@ def has_permission(self, request: HttpRequest, view: Any) -> bool:
and request.current_owner
and (
view.owner.is_admin(request.current_owner)
or self._is_admin_on_provider(request.current_owner, view.owner)
or is_admin_on_provider(request.current_owner, view.owner)
)
)

@torngit_safe
def _is_admin_on_provider(self, user: Owner, owner: Owner) -> bool:
torngit_provider_adapter = get_provider(
owner.service,
{
**get_generic_adapter_params(user, owner.service),
**{
"owner": {
"username": owner.username,
"service_id": owner.service_id,
}
},
},
)

return async_to_sync(torngit_provider_adapter.get_is_admin)(
user={"username": user.username, "service_id": user.service_id}
)


class MemberOfOrgPermissions(BasePermission):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,9 @@
from asgiref.sync import async_to_sync, sync_to_async
from asgiref.sync import sync_to_async
from django.conf import settings

import services.self_hosted as self_hosted
from api.shared.permissions import is_admin_on_provider
from codecov.commands.base import BaseInteractor
from services.decorators import torngit_safe
from services.repo_providers import get_generic_adapter_params, get_provider


@torngit_safe
@sync_to_async
def _is_admin_on_provider(owner, current_user):
torngit_provider_adapter = get_provider(
owner.service,
{
**get_generic_adapter_params(current_user, owner.service),
**{
"owner": {
"username": owner.username,
"service_id": owner.service_id,
}
},
},
)

isAdmin = async_to_sync(torngit_provider_adapter.get_is_admin)(
user={"username": current_user.username, "service_id": current_user.service_id}
)
return isAdmin


class GetIsCurrentUserAnAdminInteractor(BaseInteractor):
Expand All @@ -44,7 +21,7 @@ def execute(self, owner, current_owner):
return True
else:
try:
isAdmin = async_to_sync(_is_admin_on_provider)(owner, current_owner)
isAdmin = is_admin_on_provider(current_owner, owner)
if isAdmin:
# save admin provider in admins list
owner.add_admin(current_owner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
OwnerFactory,
)

from ..get_is_current_user_an_admin import (
GetIsCurrentUserAnAdminInteractor,
_is_admin_on_provider,
)
from api.shared.permissions import is_admin_on_provider

from ..get_is_current_user_an_admin import GetIsCurrentUserAnAdminInteractor


class GetIsCurrentUserAnAdminInteractorTest(TestCase):
Expand Down Expand Up @@ -43,22 +42,18 @@ def test_user_not_a_provider_admin(self):
)(owner, current_user)
assert isAdmin == False

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_admin_on_provider_invokes_torngit_adapter(self, mocked_get_adapter):
current_user = OwnerFactory(ownerid=3)
owner = self.owner_has_no_admins
mocked_get_adapter.return_value = GetAdminProviderAdapter()
async_to_sync(_is_admin_on_provider)(owner, current_user)
is_admin_on_provider(current_user, owner)
assert mocked_get_adapter.return_value.last_call_args == {
"username": current_user.username,
"service_id": current_user.service_id,
}

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_admin_in_org_not_on_provider(self, mocked_get_adapter):
current_user = OwnerFactory(ownerid=2)
owner = self.owner_has_admins
Expand All @@ -68,9 +63,7 @@ def test_is_admin_in_org_not_on_provider(self, mocked_get_adapter):
)(owner, current_user)
assert isAdmin == True

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_admin_on_provider(self, mocked_get_adapter):
current_user = OwnerFactory(ownerid=3)
owner = self.owner_has_no_admins
Expand All @@ -81,9 +74,7 @@ def test_is_admin_on_provider(self, mocked_get_adapter):
assert current_user.ownerid in owner.admins
assert isAdmin == True

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_admin_on_provider_only_once(self, mocked_get_adapter):
# Ensure duplicate ownerids won't be saved in admins upon multiple calls
current_user = OwnerFactory(ownerid=3)
Expand All @@ -98,9 +89,7 @@ def test_is_admin_on_provider_only_once(self, mocked_get_adapter):
assert owner.admins == [3]
assert current_user.ownerid in owner.admins

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_admin_on_provider_initially_is_null(self, mocked_get_adapter):
# Owner model defaults admins to null, check can handle first update
current_user = OwnerFactory(ownerid=3)
Expand All @@ -113,9 +102,7 @@ def test_admin_on_provider_initially_is_null(self, mocked_get_adapter):
assert current_user.ownerid in owner.admins
assert isAdmin == True

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_admin_not_in_org_or_on_provider(self, mocked_get_adapter):
current_user = OwnerFactory(ownerid=3)
owner = self.owner_has_no_admins
Expand Down
4 changes: 1 addition & 3 deletions graphql_api/tests/test_owner.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,7 @@ def test_is_current_user_not_an_admin(self):
data = self.gql_request(query, owner=user, with_errors=True)
assert data["data"]["owner"]["isAdmin"] is None

@patch(
"codecov_auth.commands.owner.interactors.get_is_current_user_an_admin.get_provider"
)
@patch("api.shared.permissions.get_provider")
def test_is_current_user_an_admin(self, mocked_get_adapter):
query_current_user_is_admin = """{
owner(username: "%s") {
Expand Down
Loading