|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -import sys |
4 | 3 | from pathlib import Path |
5 | | -from typing import TYPE_CHECKING |
| 4 | +from typing import TYPE_CHECKING, List |
| 5 | + |
| 6 | +from pydantic import BaseModel |
| 7 | +from semver import Version |
6 | 8 |
|
7 | 9 | import config |
8 | | -from analysis.PluginBase import AnalysisBasePlugin |
| 10 | +from analysis.plugin import AnalysisPluginV0, Tag |
9 | 11 | from helperFunctions.tag import TagColor |
| 12 | +from plugins.analysis.cve_lookup.internal.database.db_connection import DbConnection |
| 13 | +from plugins.analysis.cve_lookup.internal.lookup import CveMatch, CvssScore, Lookup |
10 | 14 | from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE |
11 | 15 |
|
12 | 16 | if TYPE_CHECKING: |
13 | | - from objects.file import FileObject |
| 17 | + from io import FileIO |
14 | 18 |
|
15 | | -try: |
16 | | - from ..internal.database.db_connection import DbConnection |
17 | | - from ..internal.lookup import Lookup |
18 | | -except ImportError: |
19 | | - sys.path.append(str(Path(__file__).parent.parent / 'internal')) |
20 | | - from database.db_connection import DbConnection |
21 | | - from lookup import Lookup |
| 19 | + from plugins.analysis.software_components.code.software_components import AnalysisPlugin as SoftwarePlugin |
22 | 20 |
|
23 | 21 | DB_PATH = str(Path(__file__).parent / '../internal/database/cve_cpe.db') |
24 | 22 |
|
25 | 23 |
|
26 | | -class AnalysisPlugin(AnalysisBasePlugin): |
| 24 | +class CveResult(BaseModel): |
| 25 | + software_name: str |
| 26 | + cve_list: List[CveMatch] |
| 27 | + |
| 28 | + def __lt__(self, other): |
| 29 | + if not isinstance(other, self.__class__): |
| 30 | + raise TypeError(f'Wrong type: {type(other)}') |
| 31 | + return self.software_name < other.software_name # to enable sorting |
| 32 | + |
| 33 | + |
| 34 | +class AnalysisPlugin(AnalysisPluginV0): |
27 | 35 | """ |
28 | 36 | lookup vulnerabilities from CVE feeds using ID from CPE dictionary |
29 | 37 | """ |
30 | 38 |
|
31 | | - NAME = 'cve_lookup' |
32 | | - DESCRIPTION = 'lookup CVE vulnerabilities' |
33 | | - MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE |
34 | | - DEPENDENCIES = ['software_components'] # noqa: RUF012 |
35 | | - VERSION = '0.2.0' |
36 | | - FILE = __file__ |
37 | | - |
38 | | - def additional_setup(self): |
| 39 | + class Schema(BaseModel): |
| 40 | + cve_results: List[CveResult] |
| 41 | + |
| 42 | + def __init__(self): |
| 43 | + super().__init__( |
| 44 | + metadata=( |
| 45 | + self.MetaData( |
| 46 | + name='cve_lookup', |
| 47 | + description='lookup CVE vulnerabilities', |
| 48 | + mime_blacklist=MIME_BLACKLIST_NON_EXECUTABLE, |
| 49 | + version=Version(1, 0, 0), |
| 50 | + dependencies=['software_components'], |
| 51 | + Schema=self.Schema, |
| 52 | + ) |
| 53 | + ) |
| 54 | + ) |
39 | 55 | self.min_crit_score = getattr(config.backend.plugin.get(self.NAME, {}), 'min-critical-score', 9.0) |
40 | 56 | self.match_any = getattr(config.backend.plugin.get(self.NAME, {}), 'match-any', False) |
41 | 57 |
|
42 | | - def process_object(self, file_object: FileObject) -> FileObject: |
| 58 | + def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema: |
43 | 59 | """ |
44 | 60 | Process the given file object and look up vulnerabilities for each software component. |
45 | 61 | """ |
46 | | - cves = {'cve_results': {}} |
| 62 | + del virtual_file_path |
47 | 63 | connection = DbConnection(f'sqlite:///{DB_PATH}') |
48 | | - lookup = Lookup(file_object, connection, match_any=self.match_any) |
49 | | - for sw_dict in file_object.processed_analysis['software_components']['result'].get('software_components', []): |
50 | | - product = sw_dict['name'] |
51 | | - version = sw_dict['versions'][0] if sw_dict['versions'] else None |
| 64 | + |
| 65 | + cve_results = [] |
| 66 | + lookup = Lookup(file_handle.name, connection, match_any=self.match_any) |
| 67 | + sw_analysis: SoftwarePlugin.Schema = analyses['software_components'] |
| 68 | + for sw_dict in sw_analysis.software_components: |
| 69 | + product = sw_dict.name |
| 70 | + version = sw_dict.versions[0] if sw_dict.versions else None |
52 | 71 | if product and version: |
53 | 72 | vulnerabilities = lookup.lookup_vulnerabilities(product, version) |
54 | 73 | if vulnerabilities: |
55 | 74 | component = f'{product} {version}' |
56 | | - cves['cve_results'][component] = vulnerabilities |
57 | | - |
58 | | - cves['summary'] = self._create_summary(cves['cve_results']) |
59 | | - file_object.processed_analysis[self.NAME] = cves |
60 | | - self.add_tags(cves['cve_results'], file_object) |
61 | | - return file_object |
62 | | - |
63 | | - def _create_summary(self, cve_results: dict[str, dict[str, dict[str, str]]]) -> list[str]: |
64 | | - """ |
65 | | - Creates a summary of the CVE results. |
66 | | - """ |
67 | | - return list( |
68 | | - { |
69 | | - software if not self._software_has_critical_cve(entry) else f'{software} (CRITICAL)' |
70 | | - for software, entry in cve_results.items() |
71 | | - } |
72 | | - ) |
73 | | - |
74 | | - def _software_has_critical_cve(self, cve_dict: dict[str, dict[str, str]]) -> bool: |
| 75 | + cve_results.append( |
| 76 | + CveResult( |
| 77 | + software_name=component, |
| 78 | + cve_list=vulnerabilities, |
| 79 | + ) |
| 80 | + ) |
| 81 | + |
| 82 | + return self.Schema(cve_results=cve_results) |
| 83 | + |
| 84 | + def summarize(self, result: Schema) -> list[str]: |
| 85 | + summary = { |
| 86 | + entry.software_name |
| 87 | + if not self._software_has_critical_cve(entry.cve_list) |
| 88 | + else f'{entry.software_name} (CRITICAL)' |
| 89 | + for entry in result.cve_results |
| 90 | + } |
| 91 | + return sorted(summary) |
| 92 | + |
| 93 | + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: |
| 94 | + del summary |
| 95 | + return [ |
| 96 | + Tag(name='CVE', value='critical CVE', color=TagColor.RED, propagate=True) |
| 97 | + for component in result.cve_results |
| 98 | + for cve in component.cve_list |
| 99 | + if self._entry_has_critical_rating(cve.scores) |
| 100 | + ] |
| 101 | + |
| 102 | + def _software_has_critical_cve(self, cve_list: List[CveMatch]) -> bool: |
75 | 103 | """ |
76 | 104 | Check if any entry in the given dictionary of CVEs has a critical rating. |
77 | 105 | """ |
78 | | - return any(self._entry_has_critical_rating(entry) for entry in cve_dict.values()) |
79 | | - |
80 | | - def add_tags(self, cve_results: dict[str, dict[str, dict[str, str]]], file_object: FileObject): |
81 | | - """ |
82 | | - Adds analysis tags to a file object based on the critical CVE results. |
| 106 | + return any(self._entry_has_critical_rating(entry.scores) for entry in cve_list) |
83 | 107 |
|
84 | | - Results structure: {'component': {'cve_id': {'score2': '6.4', 'score3': 'N/A'}}} |
85 | | - """ |
86 | | - for component in cve_results: |
87 | | - for cve_id in cve_results[component]: |
88 | | - entry = cve_results[component][cve_id] |
89 | | - if self._entry_has_critical_rating(entry): |
90 | | - self.add_analysis_tag(file_object, 'CVE', 'critical CVE', TagColor.RED, True) |
91 | | - return |
92 | | - |
93 | | - def _entry_has_critical_rating(self, entry: dict[str, dict[str, str]]) -> bool: |
| 108 | + def _entry_has_critical_rating(self, scores: list[CvssScore]) -> bool: |
94 | 109 | """ |
95 | 110 | Check if the given entry has a critical rating. |
96 | 111 | """ |
97 | | - return any(value != 'N/A' and float(value) >= self.min_crit_score for value in entry['scores'].values()) |
| 112 | + return any(entry.score != 'N/A' and float(entry.score) >= self.min_crit_score for entry in scores) |
0 commit comments