2424from core .models import Commit , Repository
2525from upload .helpers import get_global_tokens , get_repo_with_github_actions_oidc_token
2626from upload .views .helpers import (
27+ get_repository_and_owner_from_slug_and_commit ,
2728 get_repository_and_owner_from_string ,
2829 get_repository_from_string ,
2930)
@@ -345,7 +346,7 @@ def get_branch(self, request, repoid=None, commitid=None):
345346 body = json .loads (str (request .body , "utf8" ))
346347
347348 # If commit is not created yet (ie first upload for this commit), we just validate branch format.
348- # However if a commit exists already (ie not the first upload for this commit), we must additionally
349+ # However, if a commit exists already (ie not the first upload for this commit), we must additionally
349350 # validate the saved commit branch matches what is requested in this upload call.
350351 commit = Commit .objects .filter (repository_id = repoid , commitid = commitid ).first ()
351352 if commit and commit .branch != body .get ("branch" ):
@@ -381,10 +382,15 @@ def _get_info_from_request_path(
381382
382383 return repository , owner
383384
385+ def get_repository_and_owner (
386+ self , request : HttpRequest
387+ ) -> tuple [Repository | None , Owner | None ]:
388+ return self ._get_info_from_request_path (request )
389+
384390 def authenticate (
385391 self , request : HttpRequest
386392 ) -> tuple [RepositoryAsUser , TokenlessAuth ] | None :
387- repository , owner = self ._get_info_from_request_path (request )
393+ repository , owner = self .get_repository_and_owner (request )
388394
389395 if (
390396 repository is None
@@ -398,3 +404,86 @@ def authenticate(
398404 RepositoryAsUser (repository ),
399405 TokenlessAuth (repository ),
400406 )
407+
408+
409+ class TestResultsUploadTokenRequiredAuthenticationCheck (
410+ UploadTokenRequiredAuthenticationCheck
411+ ):
412+ """
413+ Repository and Owner are not in the path for this endpoint, have to get them another way,
414+ then use the same authenticate() as parent class.
415+ """
416+
417+ def _get_info_from_request_data (
418+ self , request : HttpRequest
419+ ) -> tuple [Repository | None , Owner | None ]:
420+ from upload .views .test_results import UploadSerializer # circular imports
421+
422+ try :
423+ body = json .loads (str (request .body , "utf8" ))
424+ except json .JSONDecodeError :
425+ return None , None # continue to next auth class
426+
427+ serializer = UploadSerializer (data = body )
428+ if not serializer .is_valid ():
429+ return None , None # continue to next auth class
430+
431+ # these fields are required=True
432+ slug = serializer .validated_data ["slug" ]
433+ commitid = serializer .validated_data ["commit" ]
434+
435+ return get_repository_and_owner_from_slug_and_commit (
436+ slug = slug , commitid = commitid
437+ )
438+
439+ def get_repository_and_owner (
440+ self , request : HttpRequest
441+ ) -> tuple [Repository | None , Owner | None ]:
442+ return self ._get_info_from_request_data (request )
443+
444+
445+ class BundleAnalysisUploadTokenRequiredAuthenticationCheck (
446+ UploadTokenRequiredAuthenticationCheck
447+ ):
448+ """
449+ Repository and Owner are not in the path for this endpoint, have to get them another way,
450+ then use the same authenticate() as parent class.
451+ """
452+
453+ def _get_info_from_request_data (
454+ self , request : HttpRequest
455+ ) -> tuple [Repository | None , Owner | None ]:
456+ from upload .views .bundle_analysis import UploadSerializer # circular imports
457+
458+ try :
459+ body = json .loads (str (request .body , "utf8" ))
460+ except json .JSONDecodeError :
461+ return None , None # continue to next auth class
462+
463+ serializer = UploadSerializer (data = body )
464+ if not serializer .is_valid ():
465+ return None , None # continue to next auth class
466+
467+ # these fields are required=True
468+ slug = serializer .validated_data ["slug" ]
469+ commitid = serializer .validated_data ["commit" ]
470+
471+ # this field is required=False but is much better for getting repository and owner
472+ service = serializer .validated_data .get ("git_service" )
473+ if service :
474+ try :
475+ service_enum = Service (service )
476+ except ValueError :
477+ return None , None
478+ return get_repository_and_owner_from_string (
479+ service = service_enum , repo_identifier = slug
480+ )
481+
482+ return get_repository_and_owner_from_slug_and_commit (
483+ slug = slug , commitid = commitid
484+ )
485+
486+ def get_repository_and_owner (
487+ self , request : HttpRequest
488+ ) -> tuple [Repository | None , Owner | None ]:
489+ return self ._get_info_from_request_data (request )
0 commit comments