11#
2- #
32# Copyright (c) nexB Inc. and others. All rights reserved.
43# VulnerableCode is a trademark of nexB Inc.
54# SPDX-License-Identifier: Apache-2.0
2120
2221from vulnerabilities .importer import AdvisoryData
2322from vulnerabilities .importer import AffectedPackage
24- from vulnerabilities .importer import Importer
23+ from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipeline
2524from vulnerabilities .references import WireSharkReference
2625from vulnerabilities .references import XsaReference
2726from vulnerabilities .references import ZbxReference
2827from vulnerabilities .utils import fetch_response
2928from vulnerabilities .utils import is_cve
3029
31- LOGGER = logging .getLogger (__name__ )
32- BASE_URL = "https://secdb.alpinelinux.org/"
3330
31+ class AlpineLinuxImporterPipeline (VulnerableCodeBaseImporterPipeline ):
32+ """Collect Alpine Linux advisories."""
33+
34+ pipeline_id = "alpine_linux_importer"
3435
35- class AlpineImporter (Importer ):
3636 spdx_license_expression = "CC-BY-SA-4.0"
3737 license_url = "https://secdb.alpinelinux.org/license.txt"
38+ url = "https://secdb.alpinelinux.org/"
3839 importer_name = "Alpine Linux Importer"
3940
40- def advisory_data (self ) -> Iterable [AdvisoryData ]:
41- page_response_content = fetch_response (BASE_URL ).content
42- advisory_directory_links = fetch_advisory_directory_links (page_response_content )
41+ @classmethod
42+ def steps (cls ):
43+ return (
44+ cls .collect_and_store_advisories ,
45+ cls .import_new_advisories ,
46+ )
47+
48+ def advisories_count (self ) -> int :
49+ return 0
50+
51+ def collect_advisories (self ) -> Iterable [AdvisoryData ]:
52+ page_response_content = fetch_response (self .url ).content
53+ advisory_directory_links = fetch_advisory_directory_links (
54+ page_response_content , self .url , self .log
55+ )
4356 advisory_links = []
4457 for advisory_directory_link in advisory_directory_links :
4558 advisory_directory_page = fetch_response (advisory_directory_link ).content
4659 advisory_links .extend (
47- fetch_advisory_links (advisory_directory_page , advisory_directory_link )
60+ fetch_advisory_links (advisory_directory_page , advisory_directory_link , self . log )
4861 )
4962 for link in advisory_links :
5063 record = fetch_response (link ).json ()
5164 if not record ["packages" ]:
52- LOGGER .error (f'"packages" not found in { link !r} ' )
65+ self .log (
66+ f'"packages" not found in { link !r} ' ,
67+ level = logging .DEBUG ,
68+ )
5369 continue
54- yield from process_record (record = record , url = link )
70+ yield from process_record (record = record , url = link , logger = self . log )
5571
5672
57- def fetch_advisory_directory_links (page_response_content : str ) -> List [str ]:
73+ def fetch_advisory_directory_links (
74+ page_response_content : str ,
75+ base_url : str ,
76+ logger : callable = None ,
77+ ) -> List [str ]:
5878 """
5979 Return a list of advisory directory links present in `page_response_content` html string
6080 """
@@ -66,60 +86,83 @@ def fetch_advisory_directory_links(page_response_content: str) -> List[str]:
6686 ]
6787
6888 if not alpine_versions :
69- LOGGER .error (f"No versions found in { BASE_URL !r} " )
89+ if logger :
90+ logger (
91+ f"No versions found in { base_url !r} " ,
92+ level = logging .DEBUG ,
93+ )
7094 return []
7195
72- advisory_directory_links = [urljoin (BASE_URL , version ) for version in alpine_versions ]
96+ advisory_directory_links = [urljoin (base_url , version ) for version in alpine_versions ]
7397
7498 return advisory_directory_links
7599
76100
77101def fetch_advisory_links (
78- advisory_directory_page : str , advisory_directory_link : str
102+ advisory_directory_page : str ,
103+ advisory_directory_link : str ,
104+ logger : callable = None ,
79105) -> Iterable [str ]:
80106 """
81107 Yield json file urls present in `advisory_directory_page`
82108 """
83109 advisory_directory_page = BeautifulSoup (advisory_directory_page , features = "lxml" )
84110 anchor_tags = advisory_directory_page .find_all ("a" )
85111 if not anchor_tags :
86- LOGGER .error (f"No anchor tags found in { advisory_directory_link !r} " )
112+ if logger :
113+ logger (
114+ f"No anchor tags found in { advisory_directory_link !r} " ,
115+ level = logging .DEBUG ,
116+ )
87117 return iter ([])
88118 for anchor_tag in anchor_tags :
89119 if anchor_tag .text .endswith ("json" ):
90120 yield urljoin (advisory_directory_link , anchor_tag .text )
91121
92122
93- def check_for_attributes (record ) -> bool :
123+ def check_for_attributes (record , logger ) -> bool :
94124 attributes = ["distroversion" , "reponame" , "archs" ]
95125 for attribute in attributes :
96126 if attribute not in record :
97- LOGGER .error (f'"{ attribute !r} " not found in { record !r} ' )
127+ if logger :
128+ logger (
129+ f'"{ attribute !r} " not found in { record !r} ' ,
130+ level = logging .DEBUG ,
131+ )
98132 return False
99133 return True
100134
101135
102- def process_record (record : dict , url : str ) -> Iterable [AdvisoryData ]:
136+ def process_record (record : dict , url : str , logger : callable = None ) -> Iterable [AdvisoryData ]:
103137 """
104138 Return a list of AdvisoryData objects by processing data
105139 present in that `record`
106140 """
107141 if not record .get ("packages" ):
108- LOGGER .error (f'"packages" not found in this record { record !r} ' )
142+ if logger :
143+ logger (
144+ f'"packages" not found in this record { record !r} ' ,
145+ level = logging .DEBUG ,
146+ )
109147 return []
110148
111149 for package in record ["packages" ]:
112150 if not package ["pkg" ]:
113- LOGGER .error (f'"pkg" not found in this package { package !r} ' )
151+ if logger :
152+ logger (
153+ f'"pkg" not found in this package { package !r} ' ,
154+ level = logging .DEBUG ,
155+ )
114156 continue
115- if not check_for_attributes (record ):
157+ if not check_for_attributes (record , logger ):
116158 continue
117159 yield from load_advisories (
118160 pkg_infos = package ["pkg" ],
119161 distroversion = record ["distroversion" ],
120162 reponame = record ["reponame" ],
121163 archs = record ["archs" ],
122164 url = url ,
165+ logger = logger ,
123166 )
124167
125168
@@ -129,24 +172,37 @@ def load_advisories(
129172 reponame : str ,
130173 archs : List [str ],
131174 url : str ,
175+ logger : callable = None ,
132176) -> Iterable [AdvisoryData ]:
133177 """
134178 Yield AdvisoryData by mapping data from `pkg_infos`
135179 and form PURL for AffectedPackages by using
136180 `distroversion`, `reponame`, `archs`
137181 """
138182 if not pkg_infos .get ("name" ):
139- LOGGER .error (f'"name" is not available in package { pkg_infos !r} ' )
183+ if logger :
184+ logger (
185+ f'"name" is not available in package { pkg_infos !r} ' ,
186+ level = logging .DEBUG ,
187+ )
140188 return []
141189
142190 for version , fixed_vulns in pkg_infos ["secfixes" ].items ():
143191 if not fixed_vulns :
144- LOGGER .error (f"No fixed vulnerabilities in version { version !r} " )
192+ if logger :
193+ logger (
194+ f"No fixed vulnerabilities in version { version !r} " ,
195+ level = logging .DEBUG ,
196+ )
145197 continue
146198
147199 for vuln_ids in fixed_vulns :
148200 if not isinstance (vuln_ids , str ):
149- LOGGER .error (f"{ vuln_ids !r} is not of `str` instance" )
201+ if logger :
202+ logger (
203+ f"{ vuln_ids !r} is not of `str` instance" ,
204+ level = logging .DEBUG ,
205+ )
150206 continue
151207 vuln_ids = vuln_ids .split ()
152208 aliases = []
@@ -179,10 +235,18 @@ def load_advisories(
179235 try :
180236 fixed_version = AlpineLinuxVersion (version )
181237 except Exception as e :
182- LOGGER .error (f"{ version !r} is not a valid AlpineVersion { e !r} " )
238+ if logger :
239+ logger (
240+ f"{ version !r} is not a valid AlpineVersion { e !r} " ,
241+ level = logging .DEBUG ,
242+ )
183243 continue
184244 if not isinstance (archs , List ):
185- LOGGER .error (f"{ archs !r} is not of `List` instance" )
245+ if logger :
246+ logger (
247+ f"{ archs !r} is not of `List` instance" ,
248+ level = logging .DEBUG ,
249+ )
186250 continue
187251 if archs :
188252 for arch in archs :
0 commit comments