Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/commands/branch/interactors/fetch_branches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from django.db.models import OuterRef, Q, Subquery
from typing import Any

from django.db.models import OuterRef, Q, QuerySet, Subquery
from shared.django_apps.core.models import Repository

from codecov.commands.base import BaseInteractor
from codecov.db import sync_to_async
Expand All @@ -7,13 +10,17 @@

class FetchRepoBranchesInteractor(BaseInteractor):
@sync_to_async
def execute(self, repository, filters):
def execute(self, repository: Repository, filters: dict[str, Any]) -> QuerySet:
queryset = repository.branches.all()

filters = filters or {}
search_value = filters.get("search_value")
if search_value:
queryset = queryset.filter(name__icontains=search_value)
# force use of ILIKE to optimize search; django icontains doesn't work
# see https://github.com/codecov/engineering-team/issues/2537
queryset = queryset.extra(
where=['"branches"."branch" ILIKE %s'], params=[f"%{search_value}%"]
)

merged = filters.get("merged_branches", False)
if not merged:
Expand Down
39 changes: 34 additions & 5 deletions core/commands/branch/interactors/tests/test_fetch_branches.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from asgiref.sync import async_to_sync
from django.test import TransactionTestCase
from shared.django_apps.core.tests.factories import (
Expand All @@ -11,7 +13,7 @@


class FetchRepoBranchesInteractorTest(TransactionTestCase):
def setUp(self):
def setUp(self) -> None:
self.org = OwnerFactory(username="codecov")
self.repo = RepositoryFactory(
author=self.org, name="gazebo", private=False, branch="main"
Expand All @@ -23,20 +25,20 @@ def setUp(self):
BranchFactory(repository=self.repo, head=self.head.commitid, name="test2"),
]

def execute(self, owner, repository, filters):
def execute(self, owner, repository, filters) -> FetchRepoBranchesInteractor:
service = owner.service if owner else "github"
return FetchRepoBranchesInteractor(owner, service).execute(repository, filters)

def test_fetch_branches(self):
def test_fetch_branches(self) -> None:
repository = self.repo
filters = {}
filters: dict[str, Any] = {}
branches = async_to_sync(self.execute)(None, repository, filters)
assert any(branch.name == "main" for branch in branches)
assert any(branch.name == "test1" for branch in branches)
assert any(branch.name == "test2" for branch in branches)
assert len(branches) == 3

def test_fetch_branches_unmerged(self):
def test_fetch_branches_unmerged(self) -> None:
merged = CommitFactory(repository=self.repo, merged=True)
BranchFactory(repository=self.repo, head=merged.commitid, name="merged")
branches = [
Expand All @@ -50,3 +52,30 @@ def test_fetch_branches_unmerged(self):
)
]
assert "merged" in branches

def test_fetch_branches_filtered_by_name(self) -> None:
repository = self.repo
filters = {"search_value": "tESt", "merged_branches": True}
branches = async_to_sync(self.execute)(None, repository, filters)
assert not any(branch.name == "main" for branch in branches)
assert any(branch.name == "test1" for branch in branches)
assert any(branch.name == "test2" for branch in branches)
assert len(branches) == 2

def test_fetch_branches_filtered_by_name_no_sql_injection(self) -> None:
repository = self.repo
malicious_filters = {
"search_value": "'; DROP TABLE branches; --",
"merged_branches": True,
}
find_branches_sql_injection_attempt = async_to_sync(self.execute)(
None, repository, malicious_filters
)
assert (
# assert no branches found with that branch name
len(find_branches_sql_injection_attempt) == 0
)

# confirm data is unaltered after sql injection attempt
find_branches = async_to_sync(self.execute)(None, repository, {})
assert len(find_branches) == 3
Loading