Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions codecov_auth/authentication/repo_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.http import HttpRequest
from django.utils import timezone
from jwt import PyJWTError
from rest_framework import authentication, exceptions
from rest_framework import authentication, exceptions, serializers
from rest_framework.exceptions import NotAuthenticated
from rest_framework.views import exception_handler
from shared.django_apps.codecov_auth.models import Owner
Expand Down Expand Up @@ -345,7 +345,7 @@ def get_branch(self, request, repoid=None, commitid=None):
body = json.loads(str(request.body, "utf8"))

# If commit is not created yet (ie first upload for this commit), we just validate branch format.
# However if a commit exists already (ie not the first upload for this commit), we must additionally
# However, if a commit exists already (ie not the first upload for this commit), we must additionally
# validate the saved commit branch matches what is requested in this upload call.
commit = Commit.objects.filter(repository_id=repoid, commitid=commitid).first()
if commit and commit.branch != body.get("branch"):
Expand Down Expand Up @@ -381,10 +381,15 @@ def _get_info_from_request_path(

return repository, owner

def get_repository_and_owner(
self, request: HttpRequest
) -> tuple[Repository | None, Owner | None]:
return self._get_info_from_request_path(request)

def authenticate(
self, request: HttpRequest
) -> tuple[RepositoryAsUser, TokenlessAuth] | None:
repository, owner = self._get_info_from_request_path(request)
repository, owner = self.get_repository_and_owner(request)

if (
repository is None
Expand All @@ -398,3 +403,54 @@ def authenticate(
RepositoryAsUser(repository),
TokenlessAuth(repository),
)


class UploadTokenRequiredGetFromBodySerializer(serializers.Serializer):
slug = serializers.CharField(required=True)
service = serializers.CharField(required=False) # git_service from TA
git_service = serializers.CharField(required=False) # git_service from BA


class UploadTokenRequiredGetFromBodyAuthenticationCheck(
UploadTokenRequiredAuthenticationCheck
):
"""
Get Repository and Owner from request body instead of path,
then use the same authenticate() as parent class.
"""

def _get_git(self, validated_data):
"""
BA sends this in as git_service, TA sends this in as service.
Use this function so this Check class can be used by both views.
"""
git_service = validated_data.get("git_service") or validated_data.get("service")
return git_service

def _get_info_from_request_body(
self, request: HttpRequest
) -> tuple[Repository | None, Owner | None]:
try:
body = json.loads(str(request.body, "utf8"))

serializer = UploadTokenRequiredGetFromBodySerializer(data=body)

if serializer.is_valid():
git_service = self._get_git(validated_data=serializer.validated_data)
service_enum = Service(git_service)
return get_repository_and_owner_from_string(
service=service_enum,
repo_identifier=serializer.validated_data["slug"],
)

except (json.JSONDecodeError, ValueError):
# exceptions raised by json.loads() and Service()
# catch rather than raise to continue to next auth class
pass

return None, None # continue to next auth class

def get_repository_and_owner(
self, request: HttpRequest
) -> tuple[Repository | None, Owner | None]:
return self._get_info_from_request_body(request)
231 changes: 229 additions & 2 deletions codecov_auth/tests/unit/test_repo_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from jwt import PyJWTError
from rest_framework import exceptions
from rest_framework.test import APIRequestFactory
from shared.django_apps.codecov_auth.models import Owner, Service
from shared.django_apps.core.models import Repository
from shared.django_apps.core.tests.factories import (
CommitFactory,
OwnerFactory,
Expand All @@ -27,6 +29,7 @@
TokenlessAuth,
TokenlessAuthentication,
UploadTokenRequiredAuthenticationCheck,
UploadTokenRequiredGetFromBodyAuthenticationCheck,
)
from codecov_auth.models import SERVICE_GITHUB, OrganizationLevelToken, RepositoryToken

Expand Down Expand Up @@ -597,7 +600,7 @@ def test_token_not_required_unknown_owner(self, db):
@pytest.mark.parametrize("request_uri,repo_slug,commitid", valid_params_to_test)
@pytest.mark.parametrize("private", [False, True])
@pytest.mark.parametrize("token_required", [False, True])
def test_token_not_required_matches_paths(
def test_get_repository_and_owner(
self, request_uri, repo_slug, commitid, private, token_required, db
):
author_name, repo_name = repo_slug.split("/")
Expand All @@ -612,7 +615,7 @@ def test_token_not_required_matches_paths(
request_uri, {"branch": "fork:branch"}, format="json"
)
authentication = UploadTokenRequiredAuthenticationCheck()
assert authentication._get_info_from_request_path(request) == (
assert authentication.get_repository_and_owner(request) == (
repo,
repo.author,
)
Expand Down Expand Up @@ -670,3 +673,227 @@ def test_token_not_required_fork_branch_public_private(
else:
res = authentication.authenticate(request)
assert res is None


class TestUploadTokenRequiredGetFromBodyAuthenticationCheck(object):
def test_token_not_required_invalid_data(self):
request = APIRequestFactory().post(
"/endpoint",
data={"slug": 123, "git_service": "github"},
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
res = authentication.authenticate(request)
assert res is None

def test_token_not_required_no_data(self):
request = APIRequestFactory().post(
"/endpoint",
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
res = authentication.authenticate(request)
assert res is None

def test_token_not_required_no_git_service(self, db):
owner = OwnerFactory(upload_token_required_for_public_repos=False)
# their repo
repo = RepositoryFactory(author=owner, private=False)
request_uri = f"/upload/github/{owner.username}::::{repo.name}/commits"
request = APIRequestFactory().post(
request_uri,
data={
"slug": f"{owner.username}::::{repo.name}",
},
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
assert authentication.get_repository_and_owner(request) == (None, None)
res = authentication.authenticate(request)
assert res is None

def test_token_not_required_unknown_repository(self, db):
an_owner = OwnerFactory(upload_token_required_for_public_repos=False)
# their repo
RepositoryFactory(author=an_owner, private=False)
request_uri = f"/upload/github/{an_owner.username}::::bad/commits"
request = APIRequestFactory().post(
request_uri,
data={
"slug": f"{an_owner.username}::::bad",
"service": an_owner.service,
},
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
assert authentication.get_repository_and_owner(request) == (None, None)
res = authentication.authenticate(request)
assert res is None

def test_token_not_required_unknown_owner(self, db):
repo = RepositoryFactory(
private=False, author__upload_token_required_for_public_repos=False
)
request_uri = f"/upload/github/bad::::{repo.name}/commits"
request = APIRequestFactory().post(
request_uri,
data={
"slug": f"bad::::{repo.name}",
"service": "github",
},
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
res = authentication.authenticate(request)
assert res is None

@pytest.mark.parametrize("request_uri,repo_slug,commitid", valid_params_to_test)
@pytest.mark.parametrize("private", [False, True])
@pytest.mark.parametrize("token_required", [False, True])
def test_get_repository_and_owner(
self, request_uri, repo_slug, commitid, private, token_required, db
):
author_name, repo_name = repo_slug.split("/")
repo = RepositoryFactory(
name=repo_name,
author__username=author_name,
private=private,
author__upload_token_required_for_public_repos=token_required,
)
assert repo.service == "github"
request = APIRequestFactory().post(
request_uri,
data={"slug": f"{author_name}::::{repo_name}", "service": "github"},
format="json",
)
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()

assert authentication.get_repository_and_owner(request) == (
repo,
repo.author,
)

def test_get_repository_and_owner_with_service(self, db):
authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()
repo_name = "the-repo"
owner_username = "the-author"
for name, _ in Service.choices:
owner = OwnerFactory(service=name, username=owner_username)
RepositoryFactory(name=repo_name, author=owner)

request = APIRequestFactory().post(
"endpoint/",
data={
"slug": f"{owner_username}::::{repo_name}",
"git_service": Service.BITBUCKET.value,
},
format="json",
)
matching_owner = Owner.objects.get(
service=Service.BITBUCKET.value, username=owner_username
)
matching_repo = Repository.objects.get(
name=repo_name, author__service=Service.BITBUCKET.value
)
assert authentication.get_repository_and_owner(request) == (
matching_repo,
matching_owner,
)

request = APIRequestFactory().post(
"endpoint/",
data={
"slug": f"{owner_username}::::{repo_name}",
"git_service": Service.GITLAB.value,
},
format="json",
)
matching_owner = Owner.objects.get(
service=Service.GITLAB.value, username=owner_username
)
matching_repo = Repository.objects.get(
name=repo_name, author__service=Service.GITLAB.value
)
assert authentication.get_repository_and_owner(request) == (
matching_repo,
matching_owner,
)

request = APIRequestFactory().post(
"endpoint/",
data={
"slug": f"{owner_username}::::{repo_name}",
"git_service": Service.GITHUB.value,
},
format="json",
)
matching_owner = Owner.objects.get(
service=Service.GITHUB.value, username=owner_username
)
matching_repo = Repository.objects.get(
name=repo_name, author__service=Service.GITHUB.value
)
assert authentication.get_repository_and_owner(request) == (
matching_repo,
matching_owner,
)

@pytest.mark.parametrize("private", [False, True])
@pytest.mark.parametrize("branch", ["branch", "fork:branch"])
@pytest.mark.parametrize(
"existing_commit,commit_branch",
[(False, None), (True, "branch"), (True, "fork:branch")],
)
@pytest.mark.parametrize("token_required", [False, True])
def test_token_not_required_fork_branch_public_private(
self,
db,
mocker,
private,
branch,
existing_commit,
commit_branch,
token_required,
):
repo = RepositoryFactory(
private=private,
author__upload_token_required_for_public_repos=token_required,
)

if existing_commit:
commit = CommitFactory()
commit.branch = commit_branch
commit.repository = repo
commit.save()

request = APIRequestFactory().post(
f"/upload/github/{repo.author.username}::::{repo.name}/commits/{commit.commitid}/reports/report_code/uploads",
data={
"slug": f"{repo.author.username}::::{repo.name}",
"git_service": repo.author.service,
},
format="json",
)

else:
request = APIRequestFactory().post(
f"/upload/github/{repo.author.username}::::{repo.name}/commits",
data={
"slug": f"{repo.author.username}::::{repo.name}",
"git_service": repo.author.service,
},
format="json",
)

authentication = UploadTokenRequiredGetFromBodyAuthenticationCheck()

if not private and not token_required:
res = authentication.authenticate(request)
assert res is not None
repo_as_user, auth_class = res

assert repo_as_user.is_authenticated() is True
assert isinstance(auth_class, TokenlessAuth)
else:
res = authentication.authenticate(request)
assert res is None
Loading
Loading