|  | 
| 20 | 20 | # ScanCode.io is a free software code scanning tool from nexB Inc. and others. | 
| 21 | 21 | # Visit https://github.com/nexB/scancode.io for support and download. | 
| 22 | 22 | 
 | 
|  | 23 | + | 
|  | 24 | +import json | 
|  | 25 | +from shutil import copytree | 
|  | 26 | + | 
|  | 27 | +from fetchcode import fetch | 
|  | 28 | +from fetchcode.vcs.git import fetch_via_git | 
| 23 | 29 | from packagedcode import alpine | 
| 24 | 30 | 
 | 
|  | 31 | +from scanpipe.models import DiscoveredPackage | 
|  | 32 | + | 
|  | 33 | +APORTS_URL = "https://gitlab.alpinelinux.org/alpine/aports.git" | 
|  | 34 | +APORTS_DIR_NAME = "aports" | 
|  | 35 | +APORTS_SUBDIRS = ["main", "non-free", "testing", "community", "unmaintained"] | 
|  | 36 | + | 
|  | 37 | + | 
|  | 38 | +def download_or_checkout_aports(aports_dir_path, alpine_version, commit_id=None): | 
|  | 39 | +    """ | 
|  | 40 | +    Download aports repository and it's branch based on `alpine_version`. | 
|  | 41 | +    Checkout to a branch (alpine version). | 
|  | 42 | +    If `commit_id` is provided also checkout to a commit. | 
|  | 43 | +    Return `aports_dir_path` if checkout(s) succeded. #TODO Proper fetchcode patch required (extending #54) | 
|  | 44 | +    """ | 
|  | 45 | +    major, minor = alpine_version.split(".")[:2] | 
|  | 46 | +    aports_dir_path = str(aports_dir_path / APORTS_DIR_NAME) | 
|  | 47 | +    fetch_via_git( | 
|  | 48 | +        url=f"git+{APORTS_URL}@{major}.{minor}-stable", location=aports_dir_path | 
|  | 49 | +    ) | 
|  | 50 | +    if commit_id: | 
|  | 51 | +        fetch_via_git(url=f"git+{APORTS_URL}@{commit_id}", location=aports_dir_path) | 
|  | 52 | +    return aports_dir_path | 
|  | 53 | + | 
|  | 54 | + | 
|  | 55 | +def get_unscanned_packages_from_db(project, alpine_versions): | 
|  | 56 | +    """ | 
|  | 57 | +    Return an iterator of 5-tuples (alpine_version, commit_id, scan_target_path, scan_result_path, package) where: | 
|  | 58 | +    `alpine_version` is an alpine version from which a package comes from (obtained from `alpine_versions` dict), | 
|  | 59 | +    `commit_id` is an id of aports repository commit that added corresponding version of a package, | 
|  | 60 | +    `scan_target_path` is a path of the directory on which a scan will be performed, | 
|  | 61 | +    `scan_result_path` is a path of the scan result json file, | 
|  | 62 | +    `package` is a DiscoveredPackage instance that belongs to a `project` with an alpine package type. | 
|  | 63 | +    The returned iterator contains not-a-subpackage alpine packages that don't have an existing scan result file. | 
|  | 64 | +    """ | 
|  | 65 | +    for package in DiscoveredPackage.objects.filter(project=project, type="alpine"): | 
|  | 66 | +        scan_id = f"{package.name}_{package.version}" | 
|  | 67 | +        scan_result_path = project.output_path / (scan_id + ".json") | 
|  | 68 | +        alpine_version = alpine_versions.get(package.extra_data["image_id"]) | 
|  | 69 | +        commit_id = package.vcs_url.split("id=")[1] | 
|  | 70 | +        scan_target_path = project.tmp_path / scan_id | 
|  | 71 | +        not_a_subpackage = ( | 
|  | 72 | +            not package.source_packages or package.source_packages[0] in package.purl | 
|  | 73 | +        ) | 
|  | 74 | +        scan_result_nonexistent = not scan_result_path.exists() | 
|  | 75 | +        if not_a_subpackage and scan_result_nonexistent: | 
|  | 76 | +            yield alpine_version, commit_id, scan_target_path, scan_result_path, package | 
|  | 77 | + | 
|  | 78 | + | 
|  | 79 | +def prepare_scan_dir(package_name, scan_target_path, aports_dir_path=None): | 
|  | 80 | +    """ | 
|  | 81 | +    A function to gather all the package's source files in `scan_target_path`. | 
|  | 82 | +    Source files of an alpine package are obtained from it's aports directory whose location has to be guessed. | 
|  | 83 | +    Such directory is present in one of the five aports repository subdirectories (main, non-free, testing, community, unmaintained). | 
|  | 84 | +    It's name is the same as the value of the corresponding package's `name` field (hence the `package_name` parameter). | 
|  | 85 | +    Here are some path examples: | 
|  | 86 | +    .../aports/main/acf-db | 
|  | 87 | +    .../aports/non-free/mongodb | 
|  | 88 | +    Inside, there are some extra files (patches) and an APKBUILD which contains urls to source tarballs. | 
|  | 89 | +    The function copies all these files (including APKBUILD) and downloads all the source tarballs to `scan_target_path`. | 
|  | 90 | +    The default value of `aports_dir_path` is set to the parent of the `scan_target_path`. | 
|  | 91 | +    If the package's aports path is found/guessed and it's also not empty the returned value is `scan_target_path`. | 
|  | 92 | +    """ | 
|  | 93 | +    if aports_dir_path is None: | 
|  | 94 | +        aports_dir_path = scan_target_path.parent | 
|  | 95 | +    for subdir_name in APORTS_SUBDIRS: | 
|  | 96 | +        apkbuild_dir = aports_dir_path / APORTS_DIR_NAME / subdir_name / package_name | 
|  | 97 | +        if not apkbuild_dir.exists(): | 
|  | 98 | +            continue | 
|  | 99 | +        if not any(apkbuild_dir.iterdir()): | 
|  | 100 | +            break | 
|  | 101 | +        copytree(apkbuild_dir, scan_target_path) | 
|  | 102 | +        package_sources = ( | 
|  | 103 | +            alpine.parse_apkbuild(scan_target_path / "APKBUILD") | 
|  | 104 | +            .to_dict() | 
|  | 105 | +            .get("extra_data") | 
|  | 106 | +            .get("sources") | 
|  | 107 | +            or [] | 
|  | 108 | +        ) | 
|  | 109 | +        for source in package_sources: | 
|  | 110 | +            source_url = source.get("url") | 
|  | 111 | +            if source_url: | 
|  | 112 | +                fetch(source_url, scan_target_path) | 
|  | 113 | +        return scan_target_path | 
|  | 114 | + | 
|  | 115 | + | 
|  | 116 | +def extract_summary_fields(scan_result_path, summary_field_names): | 
|  | 117 | +    """ | 
|  | 118 | +    Having a scancode result file extract all the values from the `summary` section of the scan result file (`scan_result_path`). | 
|  | 119 | +    Put them in the arrays inside the `result` object (result[`field_name`]). | 
|  | 120 | +    Return `result`. | 
|  | 121 | +    """ | 
|  | 122 | +    scan_result = open(scan_result_path) | 
|  | 123 | +    summaries = json.load(scan_result)["summary"] | 
|  | 124 | +    scan_result.close() | 
|  | 125 | +    result = {} | 
|  | 126 | +    for field_name in summary_field_names: | 
|  | 127 | +        values = (summary["value"] for summary in summaries.get(field_name, [])) | 
|  | 128 | +        result[field_name] = [v for v in values if v] | 
|  | 129 | +    return result | 
|  | 130 | + | 
| 25 | 131 | 
 | 
| 26 | 132 | def package_getter(root_dir, **kwargs): | 
| 27 | 133 |     """ | 
|  | 
0 commit comments