3
3
4
4
import json
5
5
import os
6
+ from collections import defaultdict
6
7
from datetime import datetime
7
8
from logging import Logger
8
9
from typing import Dict , List
9
10
11
+ from cve_bin_tool .cve_scanner import CVEScanner
12
+
10
13
from .cvedb import DISK_LOCATION_DEFAULT
11
14
from .error_handler import (
12
15
ErrorHandler ,
15
18
InvalidJsonError ,
16
19
MissingFieldsError ,
17
20
)
21
+ from .input_engine import TriageData
18
22
from .log import LOGGER
19
- from .util import DirWalk
23
+ from .util import DirWalk , ProductInfo , Remarks
20
24
21
25
REQUIRED_INTERMEDIATE_METADATA = {
22
26
"scanned_dir" ,
@@ -35,9 +39,11 @@ def __init__(
35
39
logger : Logger = None ,
36
40
error_mode = ErrorMode .TruncTrace ,
37
41
cache_dir = DISK_LOCATION_DEFAULT ,
42
+ score = 0 ,
38
43
):
39
44
self .logger = logger or LOGGER .getChild (self .__class__ .__name__ )
40
45
self .merge_files = merge_files
46
+ self .intermediate_cve_data = []
41
47
self .all_cve_data = []
42
48
self .file_stack = []
43
49
self .error_mode = error_mode
@@ -46,6 +52,8 @@ def __init__(
46
52
self .products_with_cve = 0
47
53
self .products_without_cve = 0
48
54
self .cache_dir = cache_dir
55
+ self .merged_files = ["tag" ]
56
+ self .score = score
49
57
50
58
self .walker = DirWalk (
51
59
pattern = ";" .join (
@@ -97,6 +105,9 @@ def scan_intermediate_file(self, filename):
97
105
f"Adding data from { os .path .basename (filename )} with timestamp { inter_data ['metadata' ]['timestamp' ]} "
98
106
)
99
107
self .total_inter_files += 1
108
+ inter_data ["metadata" ]["severity" ] = get_severity_count (
109
+ inter_data ["report" ]
110
+ )
100
111
return inter_data
101
112
102
113
if missing_fields != set ():
@@ -106,41 +117,41 @@ def scan_intermediate_file(self, filename):
106
117
with ErrorHandler (mode = self .error_mode ):
107
118
raise InvalidIntermediateJsonError (filename )
108
119
109
- def merge_reports (self ):
120
+ def merge_intermediate (self ):
110
121
"""Merge valid intermediate dictionaries"""
111
122
112
123
for inter_file in self .recursive_scan (self .merge_files ):
113
- # Remove duplicate paths from cve-entries
114
- self .all_cve_data .append (self .scan_intermediate_file (inter_file ))
115
-
116
- if self .all_cve_data :
124
+ # Create a list of intermediate files dictionary
125
+ self .intermediate_cve_data .append (self .scan_intermediate_file (inter_file ))
126
+
127
+ if self .intermediate_cve_data :
128
+ # sort on basis of timestamp and scans
129
+ self .intermediate_cve_data .sort (
130
+ key = lambda inter : datetime .strptime (
131
+ inter ["metadata" ]["timestamp" ], "%Y-%m-%d.%H-%M-%S"
132
+ )
133
+ )
117
134
self .all_cve_data = self .remove_intermediate_duplicates ()
118
- merged_file_path = self .save_merged_intermediate ()
119
- return merged_file_path
120
-
121
- self .logger .error ("No valid Intermediate reports found!" )
122
- return ""
123
-
124
- def save_merged_intermediate (self ):
125
- """Save a temporary merged report in .cache/cve-bin-tool"""
135
+ merged_cve_scanner = self .get_intermediate_cve_scanner (
136
+ [self .all_cve_data ], self .score
137
+ )[0 ]
138
+ merged_cve_scanner .products_with_cve = self .products_with_cve
139
+ merged_cve_scanner .products_without_cve = self .products_without_cve
126
140
127
- if not os .path .isdir (self .cache_dir ):
128
- os .makedirs (self .cache_dir )
141
+ return merged_cve_scanner
129
142
130
- now = datetime .now ().strftime ("%Y-%m-%d.%H-%M-%S" )
131
- filename = os .path .join (self .cache_dir , f"merged-{ now } .json" )
132
- with open (filename , "w" ) as f :
133
- json .dump (self .all_cve_data , f , indent = " " )
134
-
135
- return filename
143
+ self .logger .error ("No valid Intermediate reports found!" )
144
+ return {}
136
145
137
146
def remove_intermediate_duplicates (self ) -> List [Dict [str , str ]]:
138
147
"""Returns a list of dictionary with same format as cve-bin-tool json output"""
139
148
140
149
output = {}
141
- for inter_data in self .all_cve_data :
142
- self .products_with_cve += inter_data ["metadata" ]["products_with_cve" ]
143
- self .products_without_cve += inter_data ["metadata" ]["products_without_cve" ]
150
+ for inter_data in self .intermediate_cve_data :
151
+ self .products_with_cve += int (inter_data ["metadata" ]["products_with_cve" ])
152
+ self .products_without_cve += int (
153
+ inter_data ["metadata" ]["products_without_cve" ]
154
+ )
144
155
for cve in inter_data ["report" ]:
145
156
if cve ["cve_number" ] != "UNKNOWN" :
146
157
if cve ["cve_number" ] not in output :
@@ -156,3 +167,60 @@ def remove_intermediate_duplicates(self) -> List[Dict[str, str]]:
156
167
output [cve ["cve_number" ]]["path" ] = path_list
157
168
158
169
return list (output .values ())
170
+
171
+ @staticmethod
172
+ def get_intermediate_cve_scanner (cve_data_list , score ) -> List [CVEScanner ]:
173
+ """Returns a list of CVEScanner parsed objects when a list of cve_data json like list is passed"""
174
+ cve_scanner_list = []
175
+ for inter_data in cve_data_list :
176
+ with CVEScanner (score = score ) as cve_scanner :
177
+ triage_data : TriageData
178
+ parsed_data : Dict [ProductInfo , TriageData ] = {}
179
+
180
+ parsed_data = parse_data_from_json (
181
+ inter_data ["report" ] if "report" in inter_data else inter_data
182
+ )
183
+
184
+ for product_info , triage_data in parsed_data .items ():
185
+ LOGGER .debug (f"{ product_info } , { triage_data } " )
186
+ cve_scanner .get_cves (product_info , triage_data )
187
+
188
+ cve_scanner_list .append (cve_scanner )
189
+
190
+ return cve_scanner_list
191
+
192
+
193
+ def parse_data_from_json (
194
+ json_data : List [Dict [str , str ]]
195
+ ) -> Dict [ProductInfo , TriageData ]:
196
+ """Parse CVE JSON dictionary to Dict[ProductInfo, TriageData]"""
197
+
198
+ parsed_data : Dict [ProductInfo , TriageData ] = defaultdict (dict )
199
+
200
+ for row in json_data :
201
+ product_info = ProductInfo (
202
+ row ["vendor" ].strip (), row ["product" ].strip (), row ["version" ].strip ()
203
+ )
204
+ parsed_data [product_info ][row .get ("cve_number" , "" ).strip () or "default" ] = {
205
+ "remarks" : Remarks (str (row .get ("remarks" , "" )).strip ()),
206
+ "comments" : row .get ("comments" , "" ).strip (),
207
+ "severity" : row .get ("severity" , "" ).strip (),
208
+ }
209
+
210
+ parsed_data [product_info ]["paths" ] = set (
211
+ map (lambda x : x .strip (), row .get ("paths" , "" ).split ("," ))
212
+ )
213
+ return parsed_data
214
+
215
+
216
+ def get_severity_count (reports : List [Dict [str , str ]] = []) -> Dict [str , int ]:
217
+ """Returns a list of Severity counts for intermediate report"""
218
+ severity_count = {"LOW" : 0 , "MEDIUM" : 0 , "HIGH" : 0 , "CRITICAL" : 0 , "UNKNOWN" : 0 }
219
+
220
+ for cve in reports :
221
+ if "severity" in cve and cve ["severity" ] in severity_count :
222
+ severity_count [cve ["severity" ]] += 1
223
+ else :
224
+ severity_count ["UNKNOWN" ] += 1
225
+
226
+ return severity_count
0 commit comments