33
44import json
55import os
6+ from collections import defaultdict
67from datetime import datetime
78from logging import Logger
89from typing import Dict , List
910
11+ from cve_bin_tool .cve_scanner import CVEScanner
12+
1013from .cvedb import DISK_LOCATION_DEFAULT
1114from .error_handler import (
1215 ErrorHandler ,
1518 InvalidJsonError ,
1619 MissingFieldsError ,
1720)
21+ from .input_engine import TriageData
1822from .log import LOGGER
19- from .util import DirWalk
23+ from .util import DirWalk , ProductInfo , Remarks
2024
2125REQUIRED_INTERMEDIATE_METADATA = {
2226 "scanned_dir" ,
@@ -35,9 +39,11 @@ def __init__(
3539 logger : Logger = None ,
3640 error_mode = ErrorMode .TruncTrace ,
3741 cache_dir = DISK_LOCATION_DEFAULT ,
42+ score = 0 ,
3843 ):
3944 self .logger = logger or LOGGER .getChild (self .__class__ .__name__ )
4045 self .merge_files = merge_files
46+ self .intermediate_cve_data = []
4147 self .all_cve_data = []
4248 self .file_stack = []
4349 self .error_mode = error_mode
@@ -46,6 +52,8 @@ def __init__(
4652 self .products_with_cve = 0
4753 self .products_without_cve = 0
4854 self .cache_dir = cache_dir
55+ self .merged_files = ["tag" ]
56+ self .score = score
4957
5058 self .walker = DirWalk (
5159 pattern = ";" .join (
@@ -97,6 +105,9 @@ def scan_intermediate_file(self, filename):
97105 f"Adding data from { os .path .basename (filename )} with timestamp { inter_data ['metadata' ]['timestamp' ]} "
98106 )
99107 self .total_inter_files += 1
108+ inter_data ["metadata" ]["severity" ] = get_severity_count (
109+ inter_data ["report" ]
110+ )
100111 return inter_data
101112
102113 if missing_fields != set ():
@@ -106,41 +117,41 @@ def scan_intermediate_file(self, filename):
106117 with ErrorHandler (mode = self .error_mode ):
107118 raise InvalidIntermediateJsonError (filename )
108119
109- def merge_reports (self ):
120+ def merge_intermediate (self ):
110121 """Merge valid intermediate dictionaries"""
111122
112123 for inter_file in self .recursive_scan (self .merge_files ):
113- # Remove duplicate paths from cve-entries
114- self .all_cve_data .append (self .scan_intermediate_file (inter_file ))
115-
116- if self .all_cve_data :
124+ # Create a list of intermediate files dictionary
125+ self .intermediate_cve_data .append (self .scan_intermediate_file (inter_file ))
126+
127+ if self .intermediate_cve_data :
128+ # sort on basis of timestamp and scans
129+ self .intermediate_cve_data .sort (
130+ key = lambda inter : datetime .strptime (
131+ inter ["metadata" ]["timestamp" ], "%Y-%m-%d.%H-%M-%S"
132+ )
133+ )
117134 self .all_cve_data = self .remove_intermediate_duplicates ()
118- merged_file_path = self .save_merged_intermediate ()
119- return merged_file_path
120-
121- self .logger .error ("No valid Intermediate reports found!" )
122- return ""
123-
124- def save_merged_intermediate (self ):
125- """Save a temporary merged report in .cache/cve-bin-tool"""
135+ merged_cve_scanner = self .get_intermediate_cve_scanner (
136+ [self .all_cve_data ], self .score
137+ )[0 ]
138+ merged_cve_scanner .products_with_cve = self .products_with_cve
139+ merged_cve_scanner .products_without_cve = self .products_without_cve
126140
127- if not os .path .isdir (self .cache_dir ):
128- os .makedirs (self .cache_dir )
141+ return merged_cve_scanner
129142
130- now = datetime .now ().strftime ("%Y-%m-%d.%H-%M-%S" )
131- filename = os .path .join (self .cache_dir , f"merged-{ now } .json" )
132- with open (filename , "w" ) as f :
133- json .dump (self .all_cve_data , f , indent = " " )
134-
135- return filename
143+ self .logger .error ("No valid Intermediate reports found!" )
144+ return {}
136145
137146 def remove_intermediate_duplicates (self ) -> List [Dict [str , str ]]:
138147 """Returns a list of dictionary with same format as cve-bin-tool json output"""
139148
140149 output = {}
141- for inter_data in self .all_cve_data :
142- self .products_with_cve += inter_data ["metadata" ]["products_with_cve" ]
143- self .products_without_cve += inter_data ["metadata" ]["products_without_cve" ]
150+ for inter_data in self .intermediate_cve_data :
151+ self .products_with_cve += int (inter_data ["metadata" ]["products_with_cve" ])
152+ self .products_without_cve += int (
153+ inter_data ["metadata" ]["products_without_cve" ]
154+ )
144155 for cve in inter_data ["report" ]:
145156 if cve ["cve_number" ] != "UNKNOWN" :
146157 if cve ["cve_number" ] not in output :
@@ -156,3 +167,60 @@ def remove_intermediate_duplicates(self) -> List[Dict[str, str]]:
156167 output [cve ["cve_number" ]]["path" ] = path_list
157168
158169 return list (output .values ())
170+
171+ @staticmethod
172+ def get_intermediate_cve_scanner (cve_data_list , score ) -> List [CVEScanner ]:
173+ """Returns a list of CVEScanner parsed objects when a list of cve_data json like list is passed"""
174+ cve_scanner_list = []
175+ for inter_data in cve_data_list :
176+ with CVEScanner (score = score ) as cve_scanner :
177+ triage_data : TriageData
178+ parsed_data : Dict [ProductInfo , TriageData ] = {}
179+
180+ parsed_data = parse_data_from_json (
181+ inter_data ["report" ] if "report" in inter_data else inter_data
182+ )
183+
184+ for product_info , triage_data in parsed_data .items ():
185+ LOGGER .debug (f"{ product_info } , { triage_data } " )
186+ cve_scanner .get_cves (product_info , triage_data )
187+
188+ cve_scanner_list .append (cve_scanner )
189+
190+ return cve_scanner_list
191+
192+
193+ def parse_data_from_json (
194+ json_data : List [Dict [str , str ]]
195+ ) -> Dict [ProductInfo , TriageData ]:
196+ """Parse CVE JSON dictionary to Dict[ProductInfo, TriageData]"""
197+
198+ parsed_data : Dict [ProductInfo , TriageData ] = defaultdict (dict )
199+
200+ for row in json_data :
201+ product_info = ProductInfo (
202+ row ["vendor" ].strip (), row ["product" ].strip (), row ["version" ].strip ()
203+ )
204+ parsed_data [product_info ][row .get ("cve_number" , "" ).strip () or "default" ] = {
205+ "remarks" : Remarks (str (row .get ("remarks" , "" )).strip ()),
206+ "comments" : row .get ("comments" , "" ).strip (),
207+ "severity" : row .get ("severity" , "" ).strip (),
208+ }
209+
210+ parsed_data [product_info ]["paths" ] = set (
211+ map (lambda x : x .strip (), row .get ("paths" , "" ).split ("," ))
212+ )
213+ return parsed_data
214+
215+
216+ def get_severity_count (reports : List [Dict [str , str ]] = []) -> Dict [str , int ]:
217+ """Returns a list of Severity counts for intermediate report"""
218+ severity_count = {"LOW" : 0 , "MEDIUM" : 0 , "HIGH" : 0 , "CRITICAL" : 0 , "UNKNOWN" : 0 }
219+
220+ for cve in reports :
221+ if "severity" in cve and cve ["severity" ] in severity_count :
222+ severity_count [cve ["severity" ]] += 1
223+ else :
224+ severity_count ["UNKNOWN" ] += 1
225+
226+ return severity_count
0 commit comments