@@ -60,6 +60,69 @@ class CVEDB:
6060 nvd_source .NVD_Source , # last to avoid data overwrites
6161 ]
6262
63+ TABLE_SCHEMAS = {
64+ "cve_severity" : """
65+ CREATE TABLE IF NOT EXISTS cve_severity (
66+ cve_number TEXT,
67+ severity TEXT,
68+ description TEXT,
69+ score INTEGER,
70+ cvss_version INTEGER,
71+ cvss_vector TEXT,
72+ data_source TEXT,
73+ last_modified TIMESTAMP,
74+ PRIMARY KEY(cve_number, data_source)
75+ )
76+ """ ,
77+ "cve_range" : """
78+ CREATE TABLE IF NOT EXISTS cve_range (
79+ cve_number TEXT,
80+ vendor TEXT,
81+ product TEXT,
82+ version TEXT,
83+ versionStartIncluding TEXT,
84+ versionStartExcluding TEXT,
85+ versionEndIncluding TEXT,
86+ versionEndExcluding TEXT,
87+ data_source TEXT,
88+ FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source)
89+ )
90+ """ ,
91+ "cve_exploited" : """
92+ CREATE TABLE IF NOT EXISTS cve_exploited (
93+ cve_number TEXT,
94+ product TEXT,
95+ description TEXT,
96+ PRIMARY KEY(cve_number)
97+ )
98+ """ ,
99+ "cve_metrics" : """
100+ CREATE TABLE IF NOT EXISTS cve_metrics (
101+ cve_number TEXT,
102+ metric_id INTEGER,
103+ metric_score REAL,
104+ metric_field TEXT,
105+ FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number),
106+ FOREIGN KEY(metric_id) REFERENCES metrics(metric_id)
107+ )
108+ """ ,
109+ "metrics" : """
110+ CREATE TABLE IF NOT EXISTS metrics (
111+ metrics_id INTEGER,
112+ metrics_name TEXT,
113+ PRIMARY KEY(metrics_id)
114+ )
115+ """ ,
116+ }
117+
118+ EMPTY_SELECT_QUERIES = {
119+ "cve_severity" : "SELECT * FROM cve_severity WHERE 1=0" ,
120+ "cve_range" : "SELECT * FROM cve_range WHERE 1=0" ,
121+ "cve_exploited" : "SELECT * FROM cve_exploited WHERE 1=0" ,
122+ "cve_metrics" : "SELECT * FROM cve_metrics WHERE 1=0" ,
123+ "metrics" : "SELECT * FROM metrics WHERE 1=0" ,
124+ }
125+
63126 INSERT_QUERIES = {
64127 "insert_severity" : """
65128 INSERT or REPLACE INTO cve_severity(
@@ -226,9 +289,15 @@ def get_cvelist_if_stale(self) -> None:
226289 severity_schema ,
227290 range_schema ,
228291 exploit_schema ,
229- cve_metrics_schema ,
230- metrics_schema ,
231- ) = self .table_schemas ()
292+ # cve_metrics_schema,
293+ # metrics_schema,
294+ ) = (
295+ self .TABLE_SCHEMAS ["cve_severity" ],
296+ self .TABLE_SCHEMAS ["cve_range" ],
297+ self .TABLE_SCHEMAS ["cve_exploited" ],
298+ # self.TABLE_SCHEMAS["cve_metrics"],
299+ # self.TABLE_SCHEMAS["metrics"],
300+ )
232301 if (
233302 not self .latest_schema ("cve_severity" , severity_schema )
234303 or not self .latest_schema ("cve_range" , range_schema )
@@ -248,7 +317,7 @@ def latest_schema(
248317 """Check database is using latest schema"""
249318 if table_name == "" :
250319 # If no table specified, check cve_range (the last one changed)
251- _ , range_schema , __ , _ , _ = self .table_schemas ()
320+ range_schema = self .TABLE_SCHEMAS [ "cve_range" ]
252321 return self .latest_schema ("cve_range" , range_schema )
253322
254323 self .LOGGER .debug ("Check database is using latest schema" )
@@ -310,69 +379,6 @@ async def get_data(self):
310379 for r in await asyncio .gather (* tasks ):
311380 self .data .append (r )
312381
313- def table_schemas (self ):
314- """Returns sql commands for creating cve_severity, cve_range and cve_exploited tables."""
315- cve_data_create = """
316- CREATE TABLE IF NOT EXISTS cve_severity (
317- cve_number TEXT,
318- severity TEXT,
319- description TEXT,
320- score INTEGER,
321- cvss_version INTEGER,
322- cvss_vector TEXT,
323- data_source TEXT,
324- last_modified TIMESTAMP,
325- PRIMARY KEY(cve_number, data_source)
326- )
327- """
328- version_range_create = """
329- CREATE TABLE IF NOT EXISTS cve_range (
330- cve_number TEXT,
331- vendor TEXT,
332- product TEXT,
333- version TEXT,
334- versionStartIncluding TEXT,
335- versionStartExcluding TEXT,
336- versionEndIncluding TEXT,
337- versionEndExcluding TEXT,
338- data_source TEXT,
339- FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source)
340- )
341- """
342- exploit_table_create = """
343- CREATE TABLE IF NOT EXISTS cve_exploited (
344- cve_number TEXT,
345- product TEXT,
346- description TEXT,
347- PRIMARY KEY(cve_number)
348- )
349- """
350- cve_metrics_table = """
351- CREATE TABLE IF NOT EXISTS cve_metrics (
352- cve_number TEXT,
353- metric_id INTEGER,
354- metric_score REAL,
355- metric_field TEXT,
356- FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number),
357- FOREIGN KEY(metric_id) REFERENCES metrics(metric_id)
358- )
359- """
360- metrics_table = """
361- CREATE TABLE IF NOT EXISTS metrics (
362- metrics_id INTEGER,
363- metrics_name TEXT,
364- PRIMARY KEY(metrics_id)
365- )
366- """
367-
368- return (
369- cve_data_create ,
370- version_range_create ,
371- exploit_table_create ,
372- cve_metrics_table ,
373- metrics_table ,
374- )
375-
376382 def init_database (self ) -> None :
377383 """Initialize db tables used for storing cve/version data."""
378384
@@ -383,7 +389,13 @@ def init_database(self) -> None:
383389 exploit_table_create ,
384390 cve_metrics_table_create ,
385391 metrics_table_create ,
386- ) = self .table_schemas ()
392+ ) = (
393+ self .TABLE_SCHEMAS ["cve_severity" ],
394+ self .TABLE_SCHEMAS ["cve_range" ],
395+ self .TABLE_SCHEMAS ["cve_exploited" ],
396+ self .TABLE_SCHEMAS ["cve_metrics" ],
397+ self .TABLE_SCHEMAS ["metrics" ],
398+ )
387399 index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
388400 cursor .execute (cve_data_create )
389401 cursor .execute (version_range_create )
@@ -398,7 +410,13 @@ def init_database(self) -> None:
398410 exploit_schema ,
399411 cve_metrics_schema ,
400412 metrics_schema ,
401- ) = self .table_schemas ()
413+ ) = (
414+ self .TABLE_SCHEMAS ["cve_severity" ],
415+ self .TABLE_SCHEMAS ["cve_range" ],
416+ self .TABLE_SCHEMAS ["cve_exploited" ],
417+ self .TABLE_SCHEMAS ["cve_metrics" ],
418+ self .TABLE_SCHEMAS ["metrics" ],
419+ )
402420 # Check schema on cve_severity
403421 if not self .latest_schema ("cve_severity" , severity_schema , cursor ):
404422 # Recreate table using latest schema
@@ -831,7 +849,7 @@ def get_exploits_count(self) -> int:
831849 def create_exploit_db (self ):
832850 """Create table of exploits in database if it does not already exist."""
833851 cursor = self .db_open_and_get_cursor ()
834- ( _ , _ , create_exploit_table , _ , _ ) = self .table_schemas ()
852+ create_exploit_table = self .TABLE_SCHEMAS [ "cve_exploited" ]
835853 cursor = self .db_open_and_get_cursor ()
836854 cursor .execute (create_exploit_table )
837855 self .connection .commit ()
@@ -1063,7 +1081,13 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error
10631081 exploit_table_create ,
10641082 cve_metrics_create ,
10651083 metrics_create ,
1066- ) = self .table_schemas ()
1084+ ) = (
1085+ self .TABLE_SCHEMAS ["cve_severity" ],
1086+ self .TABLE_SCHEMAS ["cve_range" ],
1087+ self .TABLE_SCHEMAS ["cve_exploited" ],
1088+ self .TABLE_SCHEMAS ["cve_metrics" ],
1089+ self .TABLE_SCHEMAS ["metrics" ],
1090+ )
10671091 index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
10681092 cursor .execute (cve_data_create )
10691093 cursor .execute (version_range_create )
0 commit comments