Skip to content

Commit c56e390

Browse files
authored
Add support for on-demand data collection in Alpine Linux APK. (#711)
* Add initial support for on-demand data collection in Alpine Linux APK. Signed-off-by: ziad hany <[email protected]> * Add test for alpine Signed-off-by: ziad hany <[email protected]> --------- Signed-off-by: ziad hany <[email protected]>
1 parent 32e3817 commit c56e390

File tree

8 files changed

+216713
-0
lines changed

8 files changed

+216713
-0
lines changed

minecode/collectors/alpine.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# purldb is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0
6+
# See https://github.com/aboutcode-org/purldb for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
from packageurl import PackageURL
12+
from minecode import priority_router
13+
from minecode.miners.alpine import build_packages
14+
from minecode.utils import fetch_http, get_temp_file
15+
from minecode.utils import extract_file
16+
from packagedb.models import PackageContentType
17+
18+
logger = logging.getLogger(__name__)
19+
handler = logging.StreamHandler()
20+
logger.addHandler(handler)
21+
logger.setLevel(logging.INFO)
22+
23+
24+
def map_apk_package(package_url, pipelines, priority=0):
25+
"""
26+
Add a Alpine Linux ( APK ) distribution `package_url` to the PackageDB.
27+
"""
28+
from minecode.model_utils import add_package_to_scan_queue
29+
from minecode.model_utils import merge_or_create_package
30+
31+
name = package_url.name
32+
version = package_url.version
33+
arch = package_url.qualifiers.get("arch")
34+
repo = package_url.qualifiers.get("repo")
35+
alpine_version = package_url.qualifiers.get("alpine_version")
36+
37+
if not name or not version or not arch or not repo or not alpine_version:
38+
return None
39+
40+
download_url = (
41+
f"https://dl-cdn.alpinelinux.org/alpine/{alpine_version}/{repo}/{arch}/APKINDEX.tar.gz"
42+
)
43+
apk_download_url = (
44+
f"https://dl-cdn.alpinelinux.org/alpine/{alpine_version}/{repo}/{arch}/{name}-{version}.apk"
45+
)
46+
47+
content = fetch_http(download_url)
48+
location = get_temp_file("NonPersistentHttpVisitor")
49+
with open(location, "wb") as tmp:
50+
tmp.write(content)
51+
52+
extracted_location = extract_file(location)
53+
54+
packages = build_packages(extracted_location, apk_download_url, package_url)
55+
56+
error = None
57+
for package in packages:
58+
package.extra_data["package_content"] = PackageContentType.SOURCE_ARCHIVE
59+
db_package, _, _, error = merge_or_create_package(package, visit_level=0)
60+
if error:
61+
break
62+
63+
if db_package:
64+
add_package_to_scan_queue(package=db_package, pipelines=pipelines, priority=priority)
65+
66+
return error
67+
68+
69+
@priority_router.route("pkg:apk/.*")
70+
def process_request(purl_str, **kwargs):
71+
"""
72+
Process Alpine Linux ( APK ) Package URL (PURL).
73+
"""
74+
from minecode.model_utils import DEFAULT_PIPELINES
75+
76+
addon_pipelines = kwargs.get("addon_pipelines", [])
77+
pipelines = DEFAULT_PIPELINES + tuple(addon_pipelines)
78+
priority = kwargs.get("priority", 0)
79+
80+
package_url = PackageURL.from_string(purl_str)
81+
error_msg = map_apk_package(package_url, pipelines, priority)
82+
83+
if error_msg:
84+
return error_msg

minecode/miners/alpine.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# purldb is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/purldb for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from minecode import debutils
11+
from pathlib import Path
12+
from packagedcode import models as scan_models
13+
import base64
14+
15+
16+
def build_packages(extracted_location, apk_download_url, purl=None):
17+
"""
18+
Yield ScannedPackage built from Alpine Linux ( APK ) a `metadata` content
19+
"""
20+
21+
apk_index_file = Path(extracted_location) / "APKINDEX"
22+
23+
with open(apk_index_file, encoding="utf-8") as f:
24+
parsed_pkginfo = parse_apkindex(f.read())
25+
26+
extracted_pkginfo = get_package_by_name(parsed_pkginfo, purl.name)
27+
if not extracted_pkginfo:
28+
return
29+
30+
description = extracted_pkginfo.get("description")
31+
version = extracted_pkginfo.get("version")
32+
extracted_license_statement = extracted_pkginfo.get("license")
33+
34+
parties = []
35+
maintainers = extracted_pkginfo.get("maintainer")
36+
if maintainers:
37+
name, email = debutils.parse_email(maintainers)
38+
if name:
39+
party = scan_models.Party(name=name, role="maintainer", email=email)
40+
parties.append(party)
41+
42+
repository_homepage_url = extracted_pkginfo.get("url")
43+
size = extracted_pkginfo.get("size")
44+
apk_checksum = extracted_pkginfo.get("checksum")
45+
sha1 = apk_checksum_to_sha1(apk_checksum)
46+
47+
download_data = dict(
48+
type="apk",
49+
name=purl.name,
50+
version=version,
51+
qualifiers=purl.qualifiers,
52+
description=description,
53+
repository_homepage_url=repository_homepage_url,
54+
extracted_license_statement=extracted_license_statement,
55+
parties=parties,
56+
size=size,
57+
sha1=sha1,
58+
download_url=apk_download_url,
59+
)
60+
61+
package = scan_models.PackageData.from_data(download_data)
62+
package.datasource_id = "alpine_metadata"
63+
package.set_purl(purl)
64+
yield package
65+
66+
67+
def parse_apkindex(data: str):
68+
"""
69+
Parse an APKINDEX format string into a list of package dictionaries.
70+
https://wiki.alpinelinux.org/wiki/Apk_spec
71+
"""
72+
packages = []
73+
current_pkg = {}
74+
75+
for line in data.splitlines():
76+
line = line.strip()
77+
if not line:
78+
if current_pkg:
79+
packages.append(current_pkg)
80+
current_pkg = {}
81+
continue
82+
83+
if ":" not in line:
84+
continue
85+
key, value = line.split(":", 1)
86+
key, value = key.strip(), value.strip()
87+
88+
mapping = {
89+
"C": "checksum",
90+
"P": "name",
91+
"V": "version",
92+
"A": "arch",
93+
"S": "size",
94+
"I": "installed_size",
95+
"T": "description",
96+
"U": "url",
97+
"L": "license",
98+
"o": "origin",
99+
"m": "maintainer",
100+
"t": "build_time",
101+
"c": "commit",
102+
"k": "provider_priority",
103+
"D": "depends",
104+
"p": "provides",
105+
"i": "install_if",
106+
}
107+
108+
field = mapping.get(key, key)
109+
110+
if key in ("D", "p", "i"):
111+
current_pkg[field] = value.split()
112+
elif key in ("S", "I", "t", "k"):
113+
try:
114+
current_pkg[field] = int(value)
115+
except ValueError:
116+
current_pkg[field] = value
117+
else:
118+
current_pkg[field] = value
119+
120+
if current_pkg:
121+
packages.append(current_pkg)
122+
123+
return packages
124+
125+
126+
def get_package_by_name(packages, name):
127+
return next((pkg for pkg in packages if pkg["name"] == name), None)
128+
129+
130+
def apk_checksum_to_sha1(apk_checksum: str) -> str:
131+
"""
132+
Convert an Alpine APKINDEX package checksum (Q1... format)
133+
into its SHA-1 hex digest.
134+
"""
135+
if not apk_checksum.startswith("Q1"):
136+
raise ValueError("Invalid checksum format: must start with 'Q1'")
137+
138+
# Drop the "Q1" prefix
139+
b64_part = apk_checksum[2:]
140+
141+
# Decode from base64
142+
sha1_bytes = base64.b64decode(b64_part)
143+
144+
# Convert to hex
145+
return sha1_bytes.hex()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# purldb is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/purldb for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import os
11+
from django.test import TestCase
12+
from packageurl import PackageURL
13+
import packagedb
14+
from minecode.collectors import alpine
15+
from minecode.utils_test import JsonBasedTesting
16+
17+
18+
class AlpinePriorityQueueTests(JsonBasedTesting, TestCase):
19+
test_data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "testfiles")
20+
21+
def setUp(self):
22+
super().setUp()
23+
self.package_url = PackageURL.from_string(
24+
"pkg:apk/[email protected]?arch=x86&repo=main&alpine_version=v3.0"
25+
)
26+
self.download_url = (
27+
"https://dl-cdn.alpinelinux.org/alpine/v3.0/main/x86/ansible-1.6.7-r0.apk"
28+
)
29+
30+
def test_map_alpine_package(self):
31+
package_count = packagedb.models.Package.objects.all().count()
32+
self.assertEqual(package_count, 0)
33+
34+
alpine.map_apk_package(self.package_url, ("test_pipelines"))
35+
package_count = packagedb.models.Package.objects.all().count()
36+
self.assertEqual(package_count, 1)
37+
package = packagedb.models.Package.objects.all().first()
38+
expected_conda_download_url = self.download_url
39+
40+
self.assertEqual(package.purl, str(self.package_url))
41+
self.assertEqual(package.download_url, expected_conda_download_url)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# purldb is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/purldb for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import os
11+
from packageurl import PackageURL
12+
from minecode.miners import alpine
13+
from minecode.tests import FIXTURES_REGEN
14+
from minecode.utils_test import JsonBasedTesting
15+
from django.test import TestCase as DjangoTestCase
16+
17+
18+
class AlpineMapperTest(JsonBasedTesting, DjangoTestCase):
19+
test_data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "testfiles")
20+
21+
def test_build_packages_metafile_alpine1(self):
22+
package_url = PackageURL.from_string(
23+
"pkg:apk/[email protected]?arch=x86_64&repo=main&alpine_version=latest-stable"
24+
)
25+
apk_download_url = "https://dl-cdn.alpinelinux.org/alpine/latest-stable/main/x86_64/postgresql16-contrib-16.10-r0.apk"
26+
location = self.get_test_loc("alpine/postgresql16-contrib_v3.14-community-armhf")
27+
28+
result = alpine.build_packages(location, apk_download_url, package_url)
29+
result = [p.to_dict() for p in result]
30+
expected_loc = self.get_test_loc("alpine/mapper_postgresql16_contrib_expected.json")
31+
self.check_expected_results(result, expected_loc, regen=FIXTURES_REGEN)
32+
33+
def test_build_packages_metafile_alpine2(self):
34+
package_url = PackageURL.from_string(
35+
"pkg:apk/[email protected]?arch=armhf&repo=community&alpine_version=v3.14"
36+
)
37+
38+
apk_download_url = "https://dl-cdn.alpinelinux.org/v3.14/community/armhf/perf-bash-completion-5.10.42-r0.apk"
39+
location = self.get_test_loc("alpine/perf-bash-completion_latest-stable_main_x86_64")
40+
41+
result = alpine.build_packages(location, apk_download_url, package_url)
42+
result = [p.to_dict() for p in result]
43+
expected_loc = self.get_test_loc("alpine/mapper_perf_bash_completion_expected.json")
44+
self.check_expected_results(result, expected_loc, regen=FIXTURES_REGEN)

0 commit comments

Comments
 (0)