|
14 | 14 | from typing import List |
15 | 15 | from typing import Optional |
16 | 16 |
|
17 | | -import requests |
18 | | -from bs4 import BeautifulSoup |
19 | 17 | from cwe2.database import Database |
20 | 18 | from dateutil import parser as dateparser |
21 | 19 | from packageurl import PackageURL |
@@ -61,36 +59,19 @@ def steps(cls): |
61 | 59 | # "GO": "golang", |
62 | 60 | } |
63 | 61 |
|
64 | | - github_ecosystem_by_package_type = { |
65 | | - value: key for (key, value) in package_type_by_github_ecosystem.items() |
66 | | - } |
67 | | - |
68 | 62 | def advisories_count(self): |
69 | | - normalized_github_ecosystems = [ |
70 | | - k.lower() for k in self.package_type_by_github_ecosystem.keys() |
71 | | - ] |
72 | | - |
73 | | - try: |
74 | | - response = requests.get("https://github.com/advisories") |
75 | | - response.raise_for_status() |
76 | | - except requests.HTTPError as http_err: |
77 | | - self.log( |
78 | | - f"HTTP error occurred: {http_err} \n {traceback_format_exc()}", |
79 | | - level=logging.ERROR, |
80 | | - ) |
81 | | - return 0 |
82 | | - |
83 | | - soup = BeautifulSoup(response.text, "html.parser") |
| 63 | + advisory_query = """ |
| 64 | + query{ |
| 65 | + securityVulnerabilities(first: 0, ecosystem: %s) { |
| 66 | + totalCount |
| 67 | + } |
| 68 | + } |
| 69 | + """ |
84 | 70 | advisory_counts = 0 |
85 | | - for li in soup.select("ul.filter-list li") or []: |
86 | | - if link := li.find("a", class_="filter-item"): |
87 | | - ecosystem, _, _ = link.text.strip().rpartition(" ") |
88 | | - if count_span := li.find("span", class_="count"): |
89 | | - count = int(count_span.text.strip().replace(",", "")) |
90 | | - ecosystem = ecosystem.strip().lower() |
91 | | - if ecosystem in normalized_github_ecosystems: |
92 | | - advisory_counts += count |
93 | | - |
| 71 | + for ecosystem in self.package_type_by_github_ecosystem.keys(): |
| 72 | + graphql_query = {"query": advisory_query % (ecosystem)} |
| 73 | + response = utils.fetch_github_graphql_query(graphql_query) |
| 74 | + advisory_counts += get_item(response, "data", "securityVulnerabilities", "totalCount") |
94 | 75 | return advisory_counts |
95 | 76 |
|
96 | 77 | def collect_advisories(self) -> Iterable[AdvisoryData]: |
|
0 commit comments