Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions minecode_pipelines/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,164 @@
# See https://github.com/aboutcode-org/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


import shutil
from collections.abc import Iterable
import logging
from minecode_pipelines import pipes
from minecode_pipelines.pipes import write_packageurls_to_file

from aboutcode.hashid import get_package_base_dir
from aboutcode.pipeline import LoopProgress
from scanpipe.pipelines import Pipeline
from scanpipe.pipes import federatedcode

module_logger = logging.getLogger(__name__)


class MineCodeBasePipeline(Pipeline):
"""
Base pipeline for mining PackageURLs.

Uses:
Subclass this Pipeline and implement ``mine_packageurls`` and ``packages_count``
method. Also override the ``steps`` and ``commit_message`` as needed.
"""

download_inputs = False

@classmethod
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
# Add step(s) for downloading/cloning resource as required.
cls.mine_and_publish_packageurls,
cls.delete_working_dir,
)

def mine_packageurls(self) -> Iterable[tuple[str, list[str]]]:
"""
Yield (base_purl, package_urls_list) tuples,

where `base_purl` is a versionless PURL string,
and `package_urls_list` is a list of versioned PURL strings.
"""
raise NotImplementedError

def packages_count(self) -> int:
"""
Return the estimated number of packages for which PackageURLs are to be mined.

Used by ``mine_and_publish_packageurls`` to log the progress of PackageURL mining.
"""
raise NotImplementedError

def check_federatedcode_eligibility(self):
"""
Check if the project fulfills the following criteria for
pushing the project result to FederatedCode.
"""
federatedcode.check_federatedcode_configured_and_available()

def create_federatedcode_working_dir(self):
"""Create temporary working dir."""
self.working_path = federatedcode.create_federatedcode_working_dir()

def mine_and_publish_packageurls(self):
"""Mine and publish PackageURLs."""
checked_out_repos = {}
batch_size = 4000
total_file_processed_count = 0
total_commit_count = 0
package_count = self.packages_count()
progress = LoopProgress(
total_iterations=package_count,
logger=self.log,
progress_step=1,
)

self.log(f"Mine PackageURL for {package_count:,d} packages.")
for base, purls in progress.iter(self.mine_packageurls()):
package_base_dir = get_package_base_dir(purl=base)

package_root_dir = package_base_dir.parts[0]
package_group = pipes.get_package_destination_repo(package_root_dir)

if package_group not in checked_out_repos:
checked_out_repos[package_group] = pipes.init_local_checkout(
repo_name=package_group,
working_path=self.working_path,
logger=self.log,
)

checkout = checked_out_repos[package_group]

purl_file = write_packageurls_to_file(
repo=checkout["repo"],
base_dir=package_base_dir,
packageurls=sorted(purls),
)
checkout["file_to_commit"].append(purl_file)
checkout["file_processed_count"] += 1

if len(checkout["file_to_commit"]) > batch_size:
pipes.commit_and_push_checkout(
local_checkout=checkout,
commit_message=self.commit_message(checkout["commit_count"] + 1),
logger=self.log,
)

for checkout in checked_out_repos.values():
final_commit_count = checkout["commit_count"] + 1
pipes.commit_and_push_checkout(
local_checkout=checkout,
commit_message=self.commit_message(
commit_count=final_commit_count,
total_commit_count=final_commit_count,
),
logger=self.log,
)
total_commit_count += checkout["commit_count"]
total_file_processed_count += checkout["file_processed_count"]

self.log(f"Processed PackageURL for {total_file_processed_count:,d} NuGet packages.")
self.log(
f"Pushed new PackageURL in {total_commit_count:,d} commits in {len(checked_out_repos):,d} repos."
)

def delete_working_dir(self):
"""Remove temporary working dir."""
shutil.rmtree(self.working_path)

def commit_message(self, commit_count, total_commit_count="many"):
"""Return default commit message for pushing mined PackageURLs."""
from django.conf import settings
from scancodeio import VERSION

author_name = settings.FEDERATEDCODE_GIT_SERVICE_NAME
author_email = settings.FEDERATEDCODE_GIT_SERVICE_EMAIL

tool_name = "pkg:github/aboutcode-org/scancode.io"

return f"""\
Add newly mined PackageURLs ({commit_count}/{total_commit_count})

Tool: {tool_name}@v{VERSION}
Reference: https://{settings.ALLOWED_HOSTS[0]}

Signed-off-by: {author_name} <{author_email}>
"""

def log(self, message, level=logging.INFO):
"""Log the given `message` to the current module logger and execution_log."""
from datetime import datetime
from datetime import timezone

now_local = datetime.now(timezone.utc).astimezone()
timestamp = now_local.strftime("%Y-%m-%d %T.%f %Z")
message = f"{timestamp} {message}"
module_logger.log(level, message)
self.append_to_log(message)
print(message)
40 changes: 17 additions & 23 deletions minecode_pipelines/pipelines/mine_nuget.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,36 @@

from minecode_pipelines.pipes import nuget

from scanpipe.pipelines import Pipeline
from minecode_pipelines.pipelines import MineCodeBasePipeline
from scanpipe.pipes import federatedcode


class MineNuGet(Pipeline):
class MineNuGet(MineCodeBasePipeline):
"""
Mine and Publish NuGet PackageURLs.

Mine PackageURLs from AboutCode NuGet catalog mirror and publish
them to FederatedCode Git repository.
"""

download_inputs = False
CATALOG_REPO_URL = "https://github.com/aboutcode-org/aboutcode-mirror-nuget-catalog.git"

@classmethod
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
cls.fetch_nuget_catalog,
cls.mine_nuget_package_versions,
cls.mine_and_publish_nuget_packageurls,
cls.delete_cloned_repos,
cls.mine_and_publish_packageurls,
cls.delete_working_dir,
)

def check_federatedcode_eligibility(self):
"""
Check if the project fulfills the following criteria for
pushing the project result to FederatedCode.
"""
federatedcode.check_federatedcode_configured_and_available()

def fetch_nuget_catalog(self):
"""Fetch NuGet package catalog from AboutCode mirror."""
self.catalog_repo = federatedcode.clone_repository(
repo_url=self.CATALOG_REPO_URL,
clone_path=self.working_path / "aboutcode-mirror-nuget-catalog",
logger=self.log,
)

Expand All @@ -71,15 +65,15 @@ def mine_nuget_package_versions(self):
logger=self.log,
)

def mine_and_publish_nuget_packageurls(self):
"""Mine and publish PackageURLs from NuGet package versions."""
nuget.mine_and_publish_nuget_packageurls(
package_versions=self.package_versions,
logger=self.log,
)
def packages_count(self):
return len(self.package_versions)

def mine_packageurls(self):
"""Yield PackageURLs from NuGet package versions."""

def delete_cloned_repos(self):
"""Remove cloned catalog repository."""
if self.catalog_repo:
self.log("Removing cloned repository")
federatedcode.delete_local_clone(repo=self.catalog_repo)
for base, versions in self.package_versions.items():
packageurls = nuget.get_nuget_purls_from_versions(
base_purl=base,
versions=versions,
)
yield base, packageurls
37 changes: 37 additions & 0 deletions minecode_pipelines/pipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,40 @@ def get_commit_at_distance_ahead(
if len(revs) < num_commits_ahead:
raise ValueError(f"Not enough commits ahead; only {len(revs)} available.")
return revs[-num_commits_ahead]


def get_package_destination_repo(repo_root_name):
# TODO: Replace this with new hashid sharding mechanism.

repo_rank = int(repo_root_name.rpartition("-")[-1], 16)
repo_count = 5
return f"aboutcode-packages-data-{repo_rank % repo_count}"


def init_local_checkout(repo_name, working_path, logger):
from scanpipe.pipes.federatedcode import get_or_create_repository

repo_obj = get_or_create_repository(
repo_name,
working_path,
logger,
)
return {
"repo": repo_obj[-1],
"file_to_commit": [],
"file_processed_count": 0,
"commit_count": 0,
}


def commit_and_push_checkout(local_checkout, commit_message, logger):
from scanpipe.pipes.federatedcode import commit_and_push_changes

if commit_and_push_changes(
commit_message=commit_message,
repo=local_checkout["repo"],
files_to_commit=local_checkout["file_to_commit"],
logger=logger,
):
local_checkout["commit_count"] += 1
local_checkout["file_to_commit"].clear()
77 changes: 1 addition & 76 deletions minecode_pipelines/pipes/nuget.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@
import json
import re

from django.conf import settings

from minecode_pipelines.pipes import write_packageurls_to_file
from packageurl import PackageURL

from aboutcode.hashid import get_package_base_dir
from aboutcode.pipeline import LoopProgress
from scancodeio import VERSION


NUGET_PURL_METADATA_REPO = "https://github.com/aboutcode-data/minecode-data-nuget-test"

Expand Down Expand Up @@ -86,79 +82,8 @@ def mine_nuget_package_versions(catalog_path, logger):
return package_versions, skipped_packages


def commit_message(commit_batch, total_commit_batch="many"):
author_name = settings.FEDERATEDCODE_GIT_SERVICE_NAME
author_email = settings.FEDERATEDCODE_GIT_SERVICE_EMAIL
tool_name = "pkg:github/aboutcode-org/scancode.io"

return f"""\
Collect PackageURLs from NuGet catalog ({commit_batch}/{total_commit_batch})

Tool: {tool_name}@v{VERSION}
Reference: https://{settings.ALLOWED_HOSTS[0]}

Signed-off-by: {author_name} <{author_email}>
"""


def get_nuget_purls_from_versions(base_purl, versions):
"""Return PURLs for a NuGet `base_purls` from set of `versions`."""
purl_dict = PackageURL.from_string(base_purl).to_dict()
del purl_dict["version"]
return [PackageURL(**purl_dict, version=v).to_string() for v in versions]


def mine_and_publish_nuget_packageurls(package_versions, logger):
"""Mine and publish PackageURLs from NuGet package versions."""
from scanpipe.pipes import federatedcode

cloned_repo = federatedcode.clone_repository(
repo_url=NUGET_PURL_METADATA_REPO,
logger=logger,
)
file_to_commit = []
batch_size = 4000
file_processed = 0
commit_count = 1
nuget_package_count = len(package_versions)
progress = LoopProgress(
total_iterations=nuget_package_count,
logger=logger,
progress_step=1,
)

logger(f"Mine packageURL for {nuget_package_count:,d} NuGet packages.")
for base, versions in progress.iter(package_versions.items()):
package_base_dir = get_package_base_dir(purl=base)
packageurls = get_nuget_purls_from_versions(base_purl=base, versions=versions)

purl_file = write_packageurls_to_file(
repo=cloned_repo,
base_dir=package_base_dir,
packageurls=sorted(packageurls),
)
file_to_commit.append(purl_file)
file_processed += 1

if len(file_to_commit) > batch_size:
if federatedcode.commit_and_push_changes(
commit_message=commit_message(commit_count),
repo=cloned_repo,
files_to_commit=file_to_commit,
logger=logger,
):
commit_count += 1
file_to_commit.clear()

federatedcode.commit_and_push_changes(
commit_message=commit_message(
commit_batch=commit_count,
total_commit_batch=commit_count,
),
repo=cloned_repo,
files_to_commit=file_to_commit,
logger=logger,
)
logger(f"Processed PackageURL for {file_processed:,d} NuGet packages.")
logger(f"Pushed new PackageURL in {commit_count:,d} commits.")
federatedcode.delete_local_clone(repo=cloned_repo)
Loading