Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.
5 changes: 4 additions & 1 deletion api/public/v2/component/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Any

from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework import viewsets
from rest_framework.request import Request
from rest_framework.response import Response

from api.public.v2.component.serializers import ComponentSerializer
Expand Down Expand Up @@ -34,7 +37,7 @@ class ComponentViewSet(viewsets.ViewSet, RepoPropertyMixin):
permission_classes = [RepositoryArtifactPermissions]

@extend_schema(summary="Component list")
def list(self, request, *args, **kwargs):
def list(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""
Returns a list of components for the specified repository
"""
Expand Down
4 changes: 2 additions & 2 deletions billing/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import List
from typing import Any, List

import stripe
from django.conf import settings
Expand Down Expand Up @@ -410,7 +410,7 @@ def checkout_session_completed(

self._log_updated([owner])

def post(self, request: HttpRequest, *args, **kwargs) -> Response:
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> Response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we know what the args/kwargs are from stripe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code doesn't use args/kwargs at all, I think we just add it in as part of the DRF APIView interface.

if settings.STRIPE_ENDPOINT_SECRET is None:
log.critical(
"Stripe endpoint secret improperly configured -- webhooks will not be processed."
Expand Down
81 changes: 54 additions & 27 deletions codecov_auth/authentication/repo_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import List
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID

from django.core.exceptions import ObjectDoesNotExist
Expand All @@ -10,6 +10,7 @@
from jwt import PyJWTError
from rest_framework import authentication, exceptions, serializers
from rest_framework.exceptions import NotAuthenticated
from rest_framework.response import Response
from rest_framework.views import exception_handler
from shared.django_apps.codecov_auth.models import Owner

Expand All @@ -32,7 +33,9 @@
log = logging.getLogger(__name__)


def repo_auth_custom_exception_handler(exc, context):
def repo_auth_custom_exception_handler(
exc: Exception, context: Dict[str, Any]
) -> Response:
"""
User arrives here if they have correctly supplied a Token or the Tokenless Headers,
but their Token has not matched with any of our Authentication methods. The goal is to
Expand Down Expand Up @@ -60,17 +63,17 @@ def repo_auth_custom_exception_handler(exc, context):


class LegacyTokenRepositoryAuth(RepositoryAuthInterface):
def __init__(self, repository, auth_data):
def __init__(self, repository: Repository, auth_data: Dict[str, Any]) -> None:
self._auth_data = auth_data
self._repository = repository

def get_scopes(self):
def get_scopes(self) -> List[TokenTypeChoices]:
return [TokenTypeChoices.UPLOAD]

def get_repositories(self):
def get_repositories(self) -> List[Repository]:
return [self._repository]

def allows_repo(self, repository):
def allows_repo(self, repository: Repository) -> bool:
return repository in self.get_repositories()


Expand All @@ -79,17 +82,17 @@ class OIDCTokenRepositoryAuth(LegacyTokenRepositoryAuth):


class TableTokenRepositoryAuth(RepositoryAuthInterface):
def __init__(self, repository, token):
def __init__(self, repository: Repository, token: RepositoryToken) -> None:
self._token = token
self._repository = repository

def get_scopes(self):
def get_scopes(self) -> List[str]:
return [self._token.token_type]

def get_repositories(self):
def get_repositories(self) -> List[Repository]:
return [self._repository]

def allows_repo(self, repository):
def allows_repo(self, repository: Repository) -> bool:
return repository in self.get_repositories()


Expand All @@ -98,10 +101,10 @@ def __init__(self, token: OrganizationLevelToken) -> None:
self._token = token
self._org = token.owner

def get_scopes(self):
def get_scopes(self) -> List[str]:
return [self._token.token_type]

def allows_repo(self, repository):
def allows_repo(self, repository: Repository) -> bool:
return repository.author.ownerid == self._org.ownerid

def get_repositories_queryset(self) -> QuerySet:
Expand All @@ -120,18 +123,20 @@ class TokenlessAuth(RepositoryAuthInterface):
def __init__(self, repository: Repository) -> None:
self._repository = repository

def get_scopes(self):
def get_scopes(self) -> List[TokenTypeChoices]:
return [TokenTypeChoices.UPLOAD]

def allows_repo(self, repository):
def allows_repo(self, repository: Repository) -> bool:
return repository in self.get_repositories()

def get_repositories(self) -> List[Repository]:
return [self._repository]


class RepositoryLegacyQueryTokenAuthentication(authentication.BaseAuthentication):
def authenticate(self, request):
def authenticate(
self, request: HttpRequest
) -> Optional[Tuple[RepositoryAsUser, LegacyTokenRepositoryAuth]]:
token = request.GET.get("token")
if not token:
return None
Expand All @@ -150,22 +155,26 @@ def authenticate(self, request):


class RepositoryLegacyTokenAuthentication(authentication.TokenAuthentication):
def authenticate_credentials(self, token):
def authenticate_credentials(
self, token: str
) -> Optional[Tuple[RepositoryAsUser, LegacyTokenRepositoryAuth]]:
try:
token = UUID(token)
repository = Repository.objects.get(upload_token=token)
token_uuid = UUID(token)
repository = Repository.objects.get(upload_token=token_uuid)
except (ValueError, TypeError, Repository.DoesNotExist):
return None # continue to next auth class
return (
RepositoryAsUser(repository),
LegacyTokenRepositoryAuth(repository, {"token": token}),
LegacyTokenRepositoryAuth(repository, {"token": token_uuid}),
)


class RepositoryTokenAuthentication(authentication.TokenAuthentication):
keyword = "Repotoken"

def authenticate_credentials(self, key):
def authenticate_credentials(
self, key: str
) -> Optional[Tuple[RepositoryAsUser, TableTokenRepositoryAuth]]:
try:
token = RepositoryToken.objects.select_related("repository").get(key=key)
except RepositoryToken.DoesNotExist:
Expand All @@ -182,7 +191,9 @@ def authenticate_credentials(self, key):


class GlobalTokenAuthentication(authentication.TokenAuthentication):
def authenticate(self, request):
def authenticate(
self, request: HttpRequest
) -> Optional[Tuple[RepositoryAsUser, LegacyTokenRepositoryAuth]]:
global_tokens = get_global_tokens()
token = self.get_token(request)
using_global_token = token in global_tokens
Expand Down Expand Up @@ -219,7 +230,9 @@ def get_token(self, request: HttpRequest) -> str | None:


class OrgLevelTokenAuthentication(authentication.TokenAuthentication):
def authenticate_credentials(self, key):
def authenticate_credentials(
self, key: str
) -> Optional[Tuple[Owner, OrgLevelTokenRepositoryAuth]]:
if is_uuid(key): # else, continue to next auth class
# Actual verification for org level tokens
token = OrganizationLevelToken.objects.filter(token=key).first()
Expand All @@ -236,7 +249,9 @@ def authenticate_credentials(self, key):


class GitHubOIDCTokenAuthentication(authentication.TokenAuthentication):
def authenticate_credentials(self, token):
def authenticate_credentials(
self, token: str
) -> Optional[Tuple[RepositoryAsUser, OIDCTokenRepositoryAuth]]:
if not token or is_uuid(token):
return None # continue to next auth class

Expand Down Expand Up @@ -283,7 +298,12 @@ def _get_info_from_request_path(

return repo, commitid

def get_branch(self, request, repoid=None, commitid=None):
def get_branch(
self,
request: HttpRequest,
repoid: Optional[int] = None,
commitid: Optional[str] = None,
) -> Optional[str]:
if repoid and commitid:
commit = Commit.objects.filter(
repository_id=repoid, commitid=commitid
Expand All @@ -299,7 +319,9 @@ def get_branch(self, request, repoid=None, commitid=None):
else:
return body.get("branch")

def authenticate(self, request):
def authenticate(
self, request: HttpRequest
) -> Tuple[RepositoryAsUser, TokenlessAuth]:
repository, commitid = self._get_info_from_request_path(request)

if repository is None or repository.private:
Expand Down Expand Up @@ -341,7 +363,12 @@ def _get_info_from_request_path(
# Validate provider
raise exceptions.AuthenticationFailed(self.auth_failed_message)

def get_branch(self, request, repoid=None, commitid=None):
def get_branch(
self,
request: HttpRequest,
repoid: Optional[int] = None,
commitid: Optional[str] = None,
) -> str:
body = json.loads(str(request.body, "utf8"))

# If commit is not created yet (ie first upload for this commit), we just validate branch format.
Expand Down Expand Up @@ -419,7 +446,7 @@ class UploadTokenRequiredGetFromBodyAuthenticationCheck(
then use the same authenticate() as parent class.
"""

def _get_git(self, validated_data):
def _get_git(self, validated_data: Dict[str, str]) -> Optional[str]:
"""
BA sends this in as git_service, TA sends this in as service.
Use this function so this Check class can be used by both views.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from shared.plan.service import PlanService
from shared.upload.utils import query_monthly_coverage_measurements

Expand All @@ -11,7 +13,7 @@

class GetUploadsNumberPerUserInteractor(BaseInteractor):
@sync_to_async
def execute(self, owner: Owner):
def execute(self, owner: Owner) -> Optional[int]:
plan_service = PlanService(current_org=owner)
monthly_limit = plan_service.monthly_uploads_limit
if monthly_limit is not None:
Expand Down
30 changes: 16 additions & 14 deletions core/commands/repository/repository.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import uuid
from typing import Optional
from typing import Awaitable, Optional

from codecov.commands.base import BaseCommand
from codecov_auth.models import Owner
from codecov_auth.models import Owner, RepositoryToken
from core.models import Repository
from timeseries.models import MeasurementName
from timeseries.models import Dataset, MeasurementName

from .interactors.activate_measurements import ActivateMeasurementsInteractor
from .interactors.encode_secret_string import EncodeSecretStringInteractor
Expand All @@ -22,8 +22,8 @@
class RepositoryCommands(BaseCommand):
def fetch_repository(
self,
owner,
name,
owner: Owner,
name: str,
okta_authenticated_accounts: list[int],
exclude_okta_enforced_repos: bool = True,
) -> Repository:
Expand All @@ -38,7 +38,7 @@ def regenerate_repository_upload_token(
self,
repo_name: str,
owner_username: str,
) -> uuid.UUID:
) -> Awaitable[uuid.UUID]:
return self.get_interactor(RegenerateRepositoryUploadTokenInteractor).execute(
repo_name, owner_username
)
Expand All @@ -49,37 +49,39 @@ def update_repository(
owner: Owner,
default_branch: Optional[str],
activated: Optional[bool],
):
) -> None:
return self.get_interactor(UpdateRepositoryInteractor).execute(
repo_name, owner, default_branch, activated
)

def get_upload_token(self, repository):
def get_upload_token(self, repository: Repository) -> uuid.UUID:
return self.get_interactor(GetUploadTokenInteractor).execute(repository)

def get_repository_token(self, repository, token_type):
def get_repository_token(
self, repository: Repository, token_type: RepositoryToken.TokenType
) -> str:
return self.get_interactor(GetRepositoryTokenInteractor).execute(
repository, token_type
)

def regenerate_repository_token(
self, repo_name: str, owner_username: str, token_type: str
):
self, repo_name: str, owner_username: str, token_type: RepositoryToken.TokenType
) -> str:
return self.get_interactor(RegenerateRepositoryTokenInteractor).execute(
repo_name, owner_username, token_type
)

def activate_measurements(
self, repo_name: str, owner_name: str, measurement_type: MeasurementName
):
) -> Dataset:
return self.get_interactor(ActivateMeasurementsInteractor).execute(
repo_name, owner_name, measurement_type
)

def erase_repository(self, repo_name: str, owner: Owner):
def erase_repository(self, repo_name: str, owner: Owner) -> None:
return self.get_interactor(EraseRepositoryInteractor).execute(repo_name, owner)

def encode_secret_string(self, owner: Owner, repo_name: str, value: str):
def encode_secret_string(self, owner: Owner, repo_name: str, value: str) -> str:
return self.get_interactor(EncodeSecretStringInteractor).execute(
owner, repo_name, value
)
4 changes: 2 additions & 2 deletions core/signals.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict, List, Type
from typing import Any, Dict, List, Type, cast

from django.db.models.signals import post_save
from django.dispatch import receiver
Expand All @@ -16,7 +16,7 @@ def update_repository(
sender: Type[Repository], instance: Repository, **kwargs: Dict[str, Any]
) -> None:
log.info(f"Signal triggered for repository {instance.repoid}")
created: bool = kwargs["created"]
created: bool = cast(bool, kwargs["created"])
changes: Dict[str, Any] = instance.tracker.changed()
tracked_fields: List[str] = ["name", "upload_token", "activated", "active"]

Expand Down
10 changes: 6 additions & 4 deletions graphql_api/types/account/account.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any, Coroutine, Optional

from ariadne import ObjectType
from graphql import GraphQLResolveInfo

from codecov.db import sync_to_async
from codecov_auth.models import Account, OktaSettings
from codecov_auth.models import Account, OktaSettings, Owner
from graphql_api.helpers.ariadne import ariadne_load_local_graphql
from graphql_api.helpers.connection import (
build_connection_graphql,
Expand Down Expand Up @@ -43,9 +45,9 @@ def resolve_activated_user_count(account: Account, info: GraphQLResolveInfo) ->
def resolve_organizations(
account: Account,
info: GraphQLResolveInfo,
ordering_direction=OrderingDirection.ASC,
**kwargs,
):
ordering_direction: Optional[OrderingDirection] = OrderingDirection.ASC,
**kwargs: Any,
) -> Coroutine[Any, Any, Owner]:
return queryset_to_connection(
account.organizations,
ordering=("username",),
Expand Down
Loading
Loading