Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions scanpipe/pipelines/publish_to_federatedcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
# Visit https://github.com/aboutcode-org/scancode.io for support and download.


import shutil

from scanpipe.pipelines import Pipeline
from scanpipe.pipes import federatedcode

Expand All @@ -41,11 +43,12 @@ class PublishToFederatedCode(Pipeline):
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
cls.get_package_repository,
cls.clone_repository,
cls.add_scan_result,
cls.commit_and_push_changes,
cls.delete_local_clone,
cls.delete_working_dir,
)

def check_federatedcode_eligibility(self):
Expand All @@ -55,9 +58,12 @@ def check_federatedcode_eligibility(self):
"""
federatedcode.check_federatedcode_eligibility(project=self.project)

def create_federatedcode_working_dir(self):
self.working_path = federatedcode.create_federatedcode_working_dir()

def get_package_repository(self):
"""Get the Git repository URL and scan path for a given package."""
self.package_git_repo, self.package_scan_file = (
self.package_repo_name, self.package_git_repo, self.package_scan_file = (
federatedcode.get_package_repository(
project_purl=self.project.purl, logger=self.log
)
Expand All @@ -67,6 +73,7 @@ def clone_repository(self):
"""Clone repository to local_path."""
self.repo = federatedcode.clone_repository(
repo_url=self.package_git_repo,
clone_path=self.working_path / self.package_repo_name,
logger=self.log,
)

Expand All @@ -91,6 +98,6 @@ def commit_and_push_changes(self):
f"Scan result for '{self.project.purl}' pushed to '{self.package_git_repo}'"
)

def delete_local_clone(self):
"""Remove local clone."""
federatedcode.delete_local_clone(repo=self.repo)
def delete_working_dir(self):
"""Remove temporary working dir."""
shutil.rmtree(self.working_dir)
125 changes: 110 additions & 15 deletions scanpipe/pipes/federatedcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import textwrap
from pathlib import Path
from urllib.parse import urljoin
from urllib.parse import urlparse

from django.conf import settings

Expand All @@ -43,6 +44,21 @@
logger = logging.getLogger(__name__)


def url_exists(url, timeout=5):
"""
Check if the given `url` is reachable by doing head request.
Return True if response status is 200, else False.
"""
try:
response = requests.head(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as request_exception:
logger.debug(f"Error while checking {url}: {request_exception}")
return False

return response.status_code == requests.codes.ok


def is_configured():
"""Return True if the required FederatedCode settings have been set."""
if all(
Expand All @@ -57,19 +73,17 @@ def is_configured():
return False


def create_federatedcode_working_dir():
"""Create temporary working dir for cloning federatedcode repositories."""
return Path(tempfile.mkdtemp())


def is_available():
"""Return True if the configured Git account is available."""
if not is_configured():
return False

try:
response = requests.head(settings.FEDERATEDCODE_GIT_ACCOUNT_URL, timeout=5)
response.raise_for_status()
except requests.exceptions.RequestException as request_exception:
logger.debug(f"FederatedCode is_available() error: {request_exception}")
return False

return response.status_code == requests.codes.ok
return url_exists(settings.FEDERATEDCODE_GIT_ACCOUNT_URL)


def get_package_repository(project_purl, logger=None):
Expand All @@ -85,7 +99,7 @@ def get_package_repository(project_purl, logger=None):
)
package_git_repo_url = urljoin(git_account_url, f"{package_repo_name}.git")

return package_git_repo_url, package_scan_path
return package_repo_name, package_git_repo_url, package_scan_path


def check_federatedcode_eligibility(project):
Expand Down Expand Up @@ -146,27 +160,108 @@ def check_federatedcode_configured_and_available(logger=None):
logger("Federatedcode repositories are configured and available.")


def clone_repository(repo_url, logger=None):
"""Clone repository to local_path."""
local_dir = tempfile.mkdtemp()
def clone_repository(repo_url, clone_path, logger, shallow_clone=True):
"""Clone repository to clone_path."""
logger(f"Cloning repository {repo_url}")

authenticated_repo_url = repo_url.replace(
"https://",
f"https://{settings.FEDERATEDCODE_GIT_SERVICE_TOKEN}@",
)
repo = Repo.clone_from(url=authenticated_repo_url, to_path=local_dir, depth=1)

clone_args = {
"url": authenticated_repo_url,
"to_path": clone_path,
}
if shallow_clone:
clone_args["depth"] = 1

repo = Repo.clone_from(**clone_args)
repo.config_writer(config_level="repository").set_value(
"user", "name", settings.FEDERATEDCODE_GIT_SERVICE_NAME
).release()

repo.config_writer(config_level="repository").set_value(
"user", "email", settings.FEDERATEDCODE_GIT_SERVICE_EMAIL
).release()

return repo


def get_github_org(url):
"""Return org username from GitHub account URL."""
github_account_url = urlparse(url)
path_after_domain = github_account_url.path.lstrip("/")
org_name = path_after_domain.split("/")[0]
return org_name


def create_repository(repo_name, clone_path, logger, shallow_clone=True):
"""
Create and initialize remote FederatedCode `repo_name` repository,
perform local checkout, and return it.
"""
account_url = f"{settings.FEDERATEDCODE_GIT_ACCOUNT_URL}/"
repo_url = urljoin(account_url, repo_name)

headers = {
"Authorization": f"token {settings.FEDERATEDCODE_GIT_SERVICE_TOKEN}",
"Accept": "application/vnd.github+json",
}

data = {
"name": repo_name,
"private": False,
"auto_init": True,
"CC-BY-4.0": "cc-by-4.0",
}
org_name = get_github_org(account_url)
create_repo_api = f"https://api.github.com/orgs/{org_name}/repos"
response = requests.post(
create_repo_api,
headers=headers,
json=data,
timeout=5,
)
response.raise_for_status()
return clone_repository(
repo_url=repo_url,
clone_path=clone_path,
shallow_clone=shallow_clone,
logger=logger,
)


def get_or_create_repository(repo_name, working_path, logger, shallow_clone=True):
"""
Return local checkout of the FederatedCode `repo_name` repository.

- If local checkout for `repo_name` already exists in `working_path`, return it.
- If no local checkout exists but the remote repository `repo_name` exists,
clone it locally and return the checkout.
- If the remote repository does not exist, create and initialize `repo_name`
repository, perform local checkout, and return it.
"""
account_url = f"{settings.FEDERATEDCODE_GIT_ACCOUNT_URL}/"
repo_url = urljoin(account_url, repo_name)
clone_path = working_path / repo_name

if clone_path.exists():
return False, Repo(clone_path)
if url_exists(repo_url):
return False, clone_repository(
repo_url=repo_url,
clone_path=clone_path,
logger=logger,
shallow_clone=shallow_clone,
)

return True, create_repository(
repo_name=repo_name,
clone_path=clone_path,
logger=logger,
shallow_clone=shallow_clone,
)


def add_scan_result(project, repo, package_scan_file, logger=None):
"""Add package scan result to the local Git repository."""
relative_scan_file_path = Path(*package_scan_file.parts[1:])
Expand Down
4 changes: 3 additions & 1 deletion scanpipe/tests/pipes/test_federatedcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ def test_scanpipe_pipes_federatedcode_get_package_repository(self):
version="v.1.2.3",
)
project_purl = "pkg:npm/[email protected]"
expected_repo_name = "aboutcode-packages-npm-3f1"
expected_git_repo = "https://github.com/test/aboutcode-packages-npm-3f1.git"
expected_scan_path = (
"aboutcode-packages-npm-3f1/npm/foobar/v1.2.3/scancodeio.json"
)
git_repo, scan_path = federatedcode.get_package_repository(
repo_name, git_repo, scan_path = federatedcode.get_package_repository(
project_purl=project_purl
)

self.assertEqual(expected_repo_name, repo_name)
self.assertEqual(expected_git_repo, git_repo)
self.assertEqual(expected_scan_path, str(scan_path))

Expand Down