From 93945208edb14d853022dda5ea1745160de30eb4 Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Wed, 15 Oct 2025 14:56:36 +0530 Subject: [PATCH 1/2] Add base minecode pipeline - Abstract distributed git repo handling and metadata publication in base pipeline Signed-off-by: Keshav Priyadarshi --- minecode_pipelines/pipelines/__init__.py | 161 +++++++++++++++++++++++ minecode_pipelines/pipes/__init__.py | 37 ++++++ 2 files changed, 198 insertions(+) diff --git a/minecode_pipelines/pipelines/__init__.py b/minecode_pipelines/pipelines/__init__.py index e1521118..8d961b39 100644 --- a/minecode_pipelines/pipelines/__init__.py +++ b/minecode_pipelines/pipelines/__init__.py @@ -6,3 +6,164 @@ # See https://github.com/aboutcode-org/purldb for support or download. # See https://aboutcode.org for more information about nexB OSS projects. # + + +import shutil +from collections.abc import Iterable +import logging +from minecode_pipelines import pipes +from minecode_pipelines.pipes import write_packageurls_to_file + +from aboutcode.hashid import get_package_base_dir +from aboutcode.pipeline import LoopProgress +from scanpipe.pipelines import Pipeline +from scanpipe.pipes import federatedcode + +module_logger = logging.getLogger(__name__) + + +class MineCodeBasePipeline(Pipeline): + """ + Base pipeline for mining PackageURLs. + + Uses: + Subclass this Pipeline and implement ``mine_packageurls`` and ``packages_count`` + method. Also override the ``steps`` and ``commit_message`` as needed. + """ + + download_inputs = False + + @classmethod + def steps(cls): + return ( + cls.check_federatedcode_eligibility, + cls.create_federatedcode_working_dir, + # Add step(s) for downloading/cloning resource as required. + cls.mine_and_publish_packageurls, + cls.delete_working_dir, + ) + + def mine_packageurls(self) -> Iterable[tuple[str, list[str]]]: + """ + Yield (base_purl, package_urls_list) tuples, + + where `base_purl` is a versionless PURL string, + and `package_urls_list` is a list of versioned PURL strings. + """ + raise NotImplementedError + + def packages_count(self) -> int: + """ + Return the estimated number of packages for which PackageURLs are to be mined. + + Used by ``mine_and_publish_packageurls`` to log the progress of PackageURL mining. + """ + raise NotImplementedError + + def check_federatedcode_eligibility(self): + """ + Check if the project fulfills the following criteria for + pushing the project result to FederatedCode. + """ + federatedcode.check_federatedcode_configured_and_available() + + def create_federatedcode_working_dir(self): + """Create temporary working dir.""" + self.working_path = federatedcode.create_federatedcode_working_dir() + + def mine_and_publish_packageurls(self): + """Mine and publish PackageURLs.""" + checked_out_repos = {} + batch_size = 4000 + total_file_processed_count = 0 + total_commit_count = 0 + package_count = self.packages_count() + progress = LoopProgress( + total_iterations=package_count, + logger=self.log, + progress_step=1, + ) + + self.log(f"Mine PackageURL for {package_count:,d} packages.") + for base, purls in progress.iter(self.mine_packageurls()): + package_base_dir = get_package_base_dir(purl=base) + + package_root_dir = package_base_dir.parts[0] + package_group = pipes.get_package_destination_repo(package_root_dir) + + if package_group not in checked_out_repos: + checked_out_repos[package_group] = pipes.init_local_checkout( + repo_name=package_group, + working_path=self.working_path, + logger=self.log, + ) + + checkout = checked_out_repos[package_group] + + purl_file = write_packageurls_to_file( + repo=checkout["repo"], + base_dir=package_base_dir, + packageurls=sorted(purls), + ) + checkout["file_to_commit"].append(purl_file) + checkout["file_processed_count"] += 1 + + if len(checkout["file_to_commit"]) > batch_size: + pipes.commit_and_push_checkout( + local_checkout=checkout, + commit_message=self.commit_message(checkout["commit_count"] + 1), + logger=self.log, + ) + + for checkout in checked_out_repos.values(): + final_commit_count = checkout["commit_count"] + 1 + pipes.commit_and_push_checkout( + local_checkout=checkout, + commit_message=self.commit_message( + commit_count=final_commit_count, + total_commit_count=final_commit_count, + ), + logger=self.log, + ) + total_commit_count += checkout["commit_count"] + total_file_processed_count += checkout["file_processed_count"] + + self.log(f"Processed PackageURL for {total_file_processed_count:,d} NuGet packages.") + self.log( + f"Pushed new PackageURL in {total_commit_count:,d} commits in {len(checked_out_repos):,d} repos." + ) + + def delete_working_dir(self): + """Remove temporary working dir.""" + shutil.rmtree(self.working_path) + + def commit_message(self, commit_count, total_commit_count="many"): + """Return default commit message for pushing mined PackageURLs.""" + from django.conf import settings + from scancodeio import VERSION + + author_name = settings.FEDERATEDCODE_GIT_SERVICE_NAME + author_email = settings.FEDERATEDCODE_GIT_SERVICE_EMAIL + + tool_name = "pkg:github/aboutcode-org/scancode.io" + + return f"""\ + Add newly mined PackageURLs ({commit_count}/{total_commit_count}) + + Tool: {tool_name}@v{VERSION} + Reference: https://{settings.ALLOWED_HOSTS[0]} + + Signed-off-by: {author_name} <{author_email}> + """ + + def log(self, message, level=logging.INFO): + """Log the given `message` to the current module logger and execution_log.""" + from datetime import datetime + from datetime import timezone + + now_local = datetime.now(timezone.utc).astimezone() + timestamp = now_local.strftime("%Y-%m-%d %T.%f %Z") + message = f"{timestamp} {message}" + module_logger.log(level, message) + self.append_to_log(message) + print(message) diff --git a/minecode_pipelines/pipes/__init__.py b/minecode_pipelines/pipes/__init__.py index 43f32f68..4f49b49b 100644 --- a/minecode_pipelines/pipes/__init__.py +++ b/minecode_pipelines/pipes/__init__.py @@ -194,3 +194,40 @@ def get_commit_at_distance_ahead( if len(revs) < num_commits_ahead: raise ValueError(f"Not enough commits ahead; only {len(revs)} available.") return revs[-num_commits_ahead] + + +def get_package_destination_repo(repo_root_name): + # TODO: Replace this with new hashid sharding mechanism. + + repo_rank = int(repo_root_name.rpartition("-")[-1], 16) + repo_count = 5 + return f"aboutcode-packages-data-{repo_rank % repo_count}" + + +def init_local_checkout(repo_name, working_path, logger): + from scanpipe.pipes.federatedcode import get_or_create_repository + + repo_obj = get_or_create_repository( + repo_name, + working_path, + logger, + ) + return { + "repo": repo_obj[-1], + "file_to_commit": [], + "file_processed_count": 0, + "commit_count": 0, + } + + +def commit_and_push_checkout(local_checkout, commit_message, logger): + from scanpipe.pipes.federatedcode import commit_and_push_changes + + if commit_and_push_changes( + commit_message=commit_message, + repo=local_checkout["repo"], + files_to_commit=local_checkout["file_to_commit"], + logger=logger, + ): + local_checkout["commit_count"] += 1 + local_checkout["file_to_commit"].clear() From d0c33956da846b6770bd44a8e8194f360ea57b84 Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Wed, 15 Oct 2025 15:00:52 +0530 Subject: [PATCH 2/2] Refactor NuGet pipeline to use MineCodeBasePipeline Signed-off-by: Keshav Priyadarshi --- minecode_pipelines/pipelines/mine_nuget.py | 40 +++++------ minecode_pipelines/pipes/nuget.py | 77 +--------------------- 2 files changed, 18 insertions(+), 99 deletions(-) diff --git a/minecode_pipelines/pipelines/mine_nuget.py b/minecode_pipelines/pipelines/mine_nuget.py index 3ead2746..7fcd87e9 100644 --- a/minecode_pipelines/pipelines/mine_nuget.py +++ b/minecode_pipelines/pipelines/mine_nuget.py @@ -25,11 +25,11 @@ from minecode_pipelines.pipes import nuget -from scanpipe.pipelines import Pipeline +from minecode_pipelines.pipelines import MineCodeBasePipeline from scanpipe.pipes import federatedcode -class MineNuGet(Pipeline): +class MineNuGet(MineCodeBasePipeline): """ Mine and Publish NuGet PackageURLs. @@ -37,30 +37,24 @@ class MineNuGet(Pipeline): them to FederatedCode Git repository. """ - download_inputs = False CATALOG_REPO_URL = "https://github.com/aboutcode-org/aboutcode-mirror-nuget-catalog.git" @classmethod def steps(cls): return ( cls.check_federatedcode_eligibility, + cls.create_federatedcode_working_dir, cls.fetch_nuget_catalog, cls.mine_nuget_package_versions, - cls.mine_and_publish_nuget_packageurls, - cls.delete_cloned_repos, + cls.mine_and_publish_packageurls, + cls.delete_working_dir, ) - def check_federatedcode_eligibility(self): - """ - Check if the project fulfills the following criteria for - pushing the project result to FederatedCode. - """ - federatedcode.check_federatedcode_configured_and_available() - def fetch_nuget_catalog(self): """Fetch NuGet package catalog from AboutCode mirror.""" self.catalog_repo = federatedcode.clone_repository( repo_url=self.CATALOG_REPO_URL, + clone_path=self.working_path / "aboutcode-mirror-nuget-catalog", logger=self.log, ) @@ -71,15 +65,15 @@ def mine_nuget_package_versions(self): logger=self.log, ) - def mine_and_publish_nuget_packageurls(self): - """Mine and publish PackageURLs from NuGet package versions.""" - nuget.mine_and_publish_nuget_packageurls( - package_versions=self.package_versions, - logger=self.log, - ) + def packages_count(self): + return len(self.package_versions) + + def mine_packageurls(self): + """Yield PackageURLs from NuGet package versions.""" - def delete_cloned_repos(self): - """Remove cloned catalog repository.""" - if self.catalog_repo: - self.log("Removing cloned repository") - federatedcode.delete_local_clone(repo=self.catalog_repo) + for base, versions in self.package_versions.items(): + packageurls = nuget.get_nuget_purls_from_versions( + base_purl=base, + versions=versions, + ) + yield base, packageurls diff --git a/minecode_pipelines/pipes/nuget.py b/minecode_pipelines/pipes/nuget.py index 286caa21..b615a5dc 100644 --- a/minecode_pipelines/pipes/nuget.py +++ b/minecode_pipelines/pipes/nuget.py @@ -24,14 +24,10 @@ import json import re -from django.conf import settings - -from minecode_pipelines.pipes import write_packageurls_to_file from packageurl import PackageURL -from aboutcode.hashid import get_package_base_dir from aboutcode.pipeline import LoopProgress -from scancodeio import VERSION + NUGET_PURL_METADATA_REPO = "https://github.com/aboutcode-data/minecode-data-nuget-test" @@ -86,79 +82,8 @@ def mine_nuget_package_versions(catalog_path, logger): return package_versions, skipped_packages -def commit_message(commit_batch, total_commit_batch="many"): - author_name = settings.FEDERATEDCODE_GIT_SERVICE_NAME - author_email = settings.FEDERATEDCODE_GIT_SERVICE_EMAIL - tool_name = "pkg:github/aboutcode-org/scancode.io" - - return f"""\ - Collect PackageURLs from NuGet catalog ({commit_batch}/{total_commit_batch}) - - Tool: {tool_name}@v{VERSION} - Reference: https://{settings.ALLOWED_HOSTS[0]} - - Signed-off-by: {author_name} <{author_email}> - """ - - def get_nuget_purls_from_versions(base_purl, versions): """Return PURLs for a NuGet `base_purls` from set of `versions`.""" purl_dict = PackageURL.from_string(base_purl).to_dict() del purl_dict["version"] return [PackageURL(**purl_dict, version=v).to_string() for v in versions] - - -def mine_and_publish_nuget_packageurls(package_versions, logger): - """Mine and publish PackageURLs from NuGet package versions.""" - from scanpipe.pipes import federatedcode - - cloned_repo = federatedcode.clone_repository( - repo_url=NUGET_PURL_METADATA_REPO, - logger=logger, - ) - file_to_commit = [] - batch_size = 4000 - file_processed = 0 - commit_count = 1 - nuget_package_count = len(package_versions) - progress = LoopProgress( - total_iterations=nuget_package_count, - logger=logger, - progress_step=1, - ) - - logger(f"Mine packageURL for {nuget_package_count:,d} NuGet packages.") - for base, versions in progress.iter(package_versions.items()): - package_base_dir = get_package_base_dir(purl=base) - packageurls = get_nuget_purls_from_versions(base_purl=base, versions=versions) - - purl_file = write_packageurls_to_file( - repo=cloned_repo, - base_dir=package_base_dir, - packageurls=sorted(packageurls), - ) - file_to_commit.append(purl_file) - file_processed += 1 - - if len(file_to_commit) > batch_size: - if federatedcode.commit_and_push_changes( - commit_message=commit_message(commit_count), - repo=cloned_repo, - files_to_commit=file_to_commit, - logger=logger, - ): - commit_count += 1 - file_to_commit.clear() - - federatedcode.commit_and_push_changes( - commit_message=commit_message( - commit_batch=commit_count, - total_commit_batch=commit_count, - ), - repo=cloned_repo, - files_to_commit=file_to_commit, - logger=logger, - ) - logger(f"Processed PackageURL for {file_processed:,d} NuGet packages.") - logger(f"Pushed new PackageURL in {commit_count:,d} commits.") - federatedcode.delete_local_clone(repo=cloned_repo)