3
3
4
4
from __future__ import annotations
5
5
6
- import glob
7
6
import json
8
7
import logging
9
- import re
10
8
from pathlib import Path
11
9
12
10
import aiohttp
13
- from bs4 import BeautifulSoup , NavigableString , ResultSet
14
11
15
12
from cve_bin_tool .async_utils import FileIO , RateLimiter
16
13
from cve_bin_tool .data_sources import (
17
14
DISK_LOCATION_BACKUP ,
18
15
DISK_LOCATION_DEFAULT ,
19
16
Data_Source ,
20
17
)
21
- from cve_bin_tool .error_handler import (
22
- CVEDataForCurlVersionNotInCache ,
23
- ErrorHandler ,
24
- ErrorMode ,
25
- )
18
+ from cve_bin_tool .error_handler import ErrorMode
26
19
from cve_bin_tool .log import LOGGER
27
20
28
21
logging .basicConfig (level = logging .DEBUG )
@@ -33,7 +26,7 @@ class Curl_Source(Data_Source):
33
26
CACHEDIR = DISK_LOCATION_DEFAULT
34
27
BACKUPCACHEDIR = DISK_LOCATION_BACKUP
35
28
LOGGER = LOGGER .getChild ("CVEDB" )
36
- CURL_CVE_FILENAME_TEMPLATE = "curlcve-{} .json"
29
+ DATA_SOURCE_LINK = "https://curl.se/docs/vuln .json"
37
30
38
31
def __init__ (self , error_mode = ErrorMode .TruncTrace ):
39
32
self .cve_list = None
@@ -43,6 +36,7 @@ def __init__(self, error_mode=ErrorMode.TruncTrace):
43
36
self .session = None
44
37
self .affected_data = None
45
38
self .source_name = self .SOURCE
39
+ self .vulnerbility_data = []
46
40
47
41
async def get_cve_data (self ):
48
42
await self .fetch_cves ()
@@ -56,101 +50,34 @@ async def fetch_cves(self):
56
50
self .session = RateLimiter (
57
51
aiohttp .ClientSession (connector = connector , trust_env = True )
58
52
)
59
-
60
- versions = await self .get_curl_versions (self .session )
61
-
62
- for version in versions :
63
- await self .download_curl_version (self .session , version )
64
-
53
+ await self .download_curl_vulnerabilities (self .session )
65
54
await self .session .close ()
66
55
67
- @staticmethod
68
- async def get_curl_versions (session : RateLimiter ) -> list [str ]:
69
- regex = re .compile (r"vuln-(\d+.\d+.\d+)\.html" )
70
- async with await session .get (
71
- "https://curl.haxx.se/docs/vulnerabilities.html"
72
- ) as response :
73
- response .raise_for_status ()
74
- html = await response .text ()
75
- matches = regex .finditer (html )
76
- return [match .group (1 ) for match in matches ]
77
-
78
- async def download_curl_version (self , session : RateLimiter , version : str ) -> None :
79
- async with await session .get (
80
- f"https://curl.haxx.se/docs/vuln-{ version } .html"
81
- ) as response :
56
+ async def download_curl_vulnerabilities (self , session : RateLimiter ) -> None :
57
+ async with await session .get (self .DATA_SOURCE_LINK ) as response :
82
58
response .raise_for_status ()
83
- html = await response .text ()
84
- soup = BeautifulSoup (html , "html.parser" )
85
- table = soup .find ("table" )
86
- if not table or isinstance (table , NavigableString ):
87
- return
88
- headers : ResultSet | list = table .find_all ("th" )
89
- headers = list (map (lambda x : x .text .strip ().lower (), headers ))
90
- self .LOGGER .debug (headers )
91
- rows = table .find_all ("tr" )
92
- json_data = []
93
- for row in rows :
94
- cols = row .find_all ("td" )
95
- values = (ele .text .strip () for ele in cols )
96
- data = dict (zip (headers , values ))
97
- if data :
98
- json_data .append (data )
99
- path = Path (str (Path (self .cachedir ) / f"curlcve-{ version } .json" ))
59
+ self .vulnerbility_data = await response .json ()
60
+ path = Path (str (Path (self .cachedir ) / "vuln.json" ))
100
61
filepath = path .resolve ()
101
62
async with FileIO (filepath , "w" ) as f :
102
- await f .write (json .dumps (json_data , indent = 4 ))
103
-
104
- def load_curl_version (self , version : str ) -> list [dict [str , str ]]:
105
- """
106
- Return the dict of CVE data for the given curl version.
107
- """
108
- filename = Path (
109
- str (Path (self .cachedir ) / self .CURL_CVE_FILENAME_TEMPLATE .format (version ))
110
- )
111
- # Check if file exists
112
- if not filename .is_file ():
113
- with ErrorHandler (mode = self .error_mode , logger = self .LOGGER ):
114
- raise CVEDataForCurlVersionNotInCache (version )
115
- # Open the file and load the JSON data, log the number of CVEs loaded
116
- with open (filename , "rb" ) as fileobj :
117
- cves_for_version = json .load (fileobj )
118
- self .LOGGER .debug (
119
- f"Curl Version { version } has { len (cves_for_version )} CVEs in dataset"
120
- )
121
- return cves_for_version
122
-
123
- def curl_versions (self ) -> list [str ]:
124
- """
125
- Return the versions we have Curl data for.
126
- """
127
- regex = re .compile (r"curlcve-(\d+.\d+.\d).json" )
128
- versions = []
129
- for filename in glob .glob (str (Path (self .cachedir ) / "curlcve-*.json" )):
130
- match = regex .search (filename )
131
- if match :
132
- version = match .group (1 )
133
- versions .append (version )
134
- return versions
63
+ await f .write (json .dumps (self .vulnerbility_data , indent = 4 ))
135
64
136
65
def get_cve_list (self ):
137
66
self .affected_data = []
138
67
139
- for version in self .curl_versions ():
140
- cve_list = self .load_curl_version (version )
141
-
142
- for cve in cve_list :
143
- affected = {
144
- "cve_id" : cve ["cve" ],
145
- "vendor" : "haxx" ,
146
- "product" : "curl" ,
147
- "version" : version ,
148
- "versionStartIncluding" : cve ["from version" ],
149
- "versionStartExcluding" : "" ,
150
- "versionEndIncluding" : cve ["to and including" ],
151
- "versionEndExcluding" : "" ,
152
- }
153
-
154
- self .affected_data .append (affected )
68
+ for cve in self .vulnerbility_data :
69
+ affected = {
70
+ "cve_id" : cve ["aliases" ][0 ],
71
+ "vendor" : "haxx" ,
72
+ "product" : "curl" ,
73
+ "version" : "*" ,
74
+ "versionStartIncluding" : cve ["affected" ][0 ]["ranges" ][0 ]["events" ][0 ][
75
+ "introduced"
76
+ ],
77
+ "versionStartExcluding" : "" ,
78
+ "versionEndIncluding" : cve ["affected" ][0 ]["versions" ][0 ],
79
+ "versionEndExcluding" : "" ,
80
+ }
81
+ self .affected_data .append (affected )
155
82
156
83
return self .affected_data
0 commit comments