|
3 | 3 | and malicious open source software packages. |
4 | 4 | """ |
5 | 5 |
|
| 6 | +import functools |
6 | 7 | import logging |
7 | 8 |
|
8 | 9 | import requests |
9 | 10 |
|
10 | 11 | from scfw.ecosystem import ECOSYSTEM |
11 | 12 | from scfw.target import InstallTarget |
12 | 13 | from scfw.verifier import FindingSeverity, InstallTargetVerifier |
| 14 | +from scfw.verifiers.osv_verifier.osv_advisory import OsvAdvisory |
13 | 15 |
|
14 | 16 | _log = logging.getLogger(__name__) |
15 | 17 |
|
@@ -53,16 +55,12 @@ def verify(self, target: InstallTarget) -> list[tuple[FindingSeverity, str]]: |
53 | 55 | requests.HTTPError: |
54 | 56 | An error occurred while querying an installation target against the OSV.dev API. |
55 | 57 | """ |
56 | | - def mal_finding(id: str) -> str: |
| 58 | + def finding(osv: OsvAdvisory) -> str: |
| 59 | + kind = "malicious package " if osv.id.startswith("MAL") else "" |
| 60 | + severity_tag = f"[{osv.severity}] " if osv.severity else "" |
57 | 61 | return ( |
58 | | - f"An OSV.dev malicious package disclosure exists for package {target}:\n" |
59 | | - f" * {_OSV_DEV_VULN_URL_PREFIX}/{id}" |
60 | | - ) |
61 | | - |
62 | | - def non_mal_finding(id: str) -> str: |
63 | | - return ( |
64 | | - f"An OSV.dev disclosure exists for package {target}:\n" |
65 | | - f" * {_OSV_DEV_VULN_URL_PREFIX}/{id}" |
| 62 | + f"An OSV.dev {kind}disclosure exists for package {target}:\n" |
| 63 | + f" * {severity_tag}{_OSV_DEV_VULN_URL_PREFIX}/{osv.id}" |
66 | 64 | ) |
67 | 65 |
|
68 | 66 | def error_message(e: str) -> str: |
@@ -102,19 +100,27 @@ def error_message(e: str) -> str: |
102 | 100 | if not vulns: |
103 | 101 | return [] |
104 | 102 |
|
105 | | - osv_ids = set(filter(lambda id: id is not None, map(lambda vuln: vuln.get("id"), vulns))) |
106 | | - mal_ids = set(filter(lambda id: id.startswith("MAL"), osv_ids)) |
107 | | - non_mal_ids = osv_ids - mal_ids |
| 103 | + osvs = set(map(OsvAdvisory.from_json, filter(lambda vuln: vuln.get("id"), vulns))) |
| 104 | + mal_osvs = set(filter(lambda osv: osv.id.startswith("MAL"), osvs)) |
| 105 | + non_mal_osvs = osvs - mal_osvs |
| 106 | + |
| 107 | + osv_sort_key = functools.cmp_to_key(OsvAdvisory.compare_severities) |
| 108 | + sorted_mal_osvs = sorted(mal_osvs, reverse=True, key=osv_sort_key) |
| 109 | + sorted_non_mal_osvs = sorted(non_mal_osvs, reverse=True, key=osv_sort_key) |
108 | 110 |
|
109 | 111 | return ( |
110 | | - [(FindingSeverity.CRITICAL, mal_finding(id)) for id in mal_ids] |
111 | | - + [(FindingSeverity.WARNING, non_mal_finding(id)) for id in non_mal_ids] |
| 112 | + [(FindingSeverity.CRITICAL, finding(osv)) for osv in sorted_mal_osvs] |
| 113 | + + [(FindingSeverity.WARNING, finding(osv)) for osv in sorted_non_mal_osvs] |
112 | 114 | ) |
113 | 115 |
|
114 | 116 | except requests.exceptions.RequestException as e: |
115 | 117 | _log.warning(f"Failed to query OSV.dev API: returning WARNING finding for target {target}") |
116 | 118 | return [(FindingSeverity.WARNING, error_message(str(e)))] |
117 | 119 |
|
| 120 | + except Exception as e: |
| 121 | + _log.warning(f"Target verification failed: returning WARNING finding for target {target}") |
| 122 | + return [(FindingSeverity.WARNING, error_message(str(e)))] |
| 123 | + |
118 | 124 |
|
119 | 125 | def load_verifier() -> InstallTargetVerifier: |
120 | 126 | """ |
|
0 commit comments