diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 4161a88b03..29ab9134ca 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -310,6 +310,7 @@ https hunspell hur hwloc +hyperscan i icecast icu @@ -618,6 +619,7 @@ pybabel pycon pycqa pypa +pyperscan pypi pytest pythex @@ -796,6 +798,7 @@ uuid uwsgi v varnish +vectorscan venv VEXs vextype diff --git a/README.md b/README.md index 91bd126e04..cf55857191 100644 --- a/README.md +++ b/README.md @@ -543,6 +543,7 @@ Checkers: -s SKIPS, --skips SKIPS comma-separated list of checkers to disable -r RUNS, --runs RUNS comma-separated list of checkers to enable + --pyperscan use pyperscan for binary checkers (unsupported on Windows) Database Management: --import-json IMPORT_JSON diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index c08d8c3dea..13cff128d4 100644 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -489,6 +489,12 @@ def main(argv=None): help="comma-separated list of checkers to enable", default="", ) + checker_group.add_argument( + "--pyperscan", + action="store_true", + help="Use pyperscan for binary checkers (unsupported on Windows)", + default=False, + ) database_group = parser.add_argument_group("Database Management") database_group.add_argument( @@ -1129,10 +1135,12 @@ def main(argv=None): version_scanner = VersionScanner( should_extract=args["extract"], exclude_folders=args["exclude"], + logger=LOGGER, error_mode=error_mode, validate=not args["disable_validation_check"], sources=enabled_sources, no_scan=args["no_scan"], + pyperscan=args["pyperscan"], ) version_scanner.remove_skiplist(skips) LOGGER.info(f"Number of checkers: {version_scanner.number_of_checkers()}") diff --git a/cve_bin_tool/version_scanner.py b/cve_bin_tool/version_scanner.py index 7ee0686876..23c5cc57a7 100644 --- a/cve_bin_tool/version_scanner.py +++ b/cve_bin_tool/version_scanner.py @@ -8,7 +8,7 @@ from pathlib import Path from typing import Iterator -from cve_bin_tool.checkers import BUILTIN_CHECKERS, Checker +from cve_bin_tool.checkers import BUILTIN_CHECKERS, Checker, VersionMatchInfo from cve_bin_tool.cvedb import CVEDB from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg from cve_bin_tool.error_handler import ErrorMode @@ -19,6 +19,9 @@ from cve_bin_tool.strings import parse_strings from cve_bin_tool.util import DirWalk, ProductInfo, ScanInfo, inpath +if sys.platform != "win32": + from pyperscan import Pattern, Scan, StreamDatabase + if sys.version_info >= (3, 10): from importlib import metadata as importlib_metadata else: @@ -45,6 +48,7 @@ def __init__( validate: bool = True, sources=None, no_scan=False, + pyperscan=False, ): self.no_scan = no_scan self.logger = logger or LOGGER.getChild(self.__class__.__name__) @@ -74,6 +78,17 @@ def __init__( ) self.language_checkers = valid_files self.language_checkers_names = self.available_language_checkers() + if pyperscan: + self.logger.warning("pyperscan requested") + if sys.platform == "win32": + self.logger.error("pyperscan unsupported on Windows") + self.pyperscan = False + else: + self.logger.warning("pyperscan enabled") + self.pyperscan = True + else: + self.pyperscan = False + self.pyperscan_db = None if self.no_scan: self.cve_db = None @@ -281,36 +296,79 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]: yield from self.run_checkers(filename, lines) + def build_pyperscan_database(self, checkers: Checker) -> None: + # Database built only once to improve performance + if self.pyperscan_db is None: + patterns = [] + for dummy_checker_name, checker in self.checkers.items(): + checker = checker() + checker.dummy_checker_name = dummy_checker_name + for pattern in checker.VERSION_PATTERNS + checker.CONTAINS_PATTERNS: + patterns.append(Pattern(pattern.pattern.encode(), tag=checker)) + if patterns: + self.pyperscan_db = StreamDatabase(*patterns) + + @staticmethod + def pyperscan_match( + pyperscan_matches: dict, checker: Checker, offset: int, end: int + ) -> Scan: + pyperscan_matches[checker.dummy_checker_name] = checker, offset, end + return Scan.Continue + def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: """process a Set of checker objects, run them on file lines, and yield information about detected products and versions. It uses logging to provide debug and error information along the way.""" LOGGER.info(f"filename = {filename}") - # tko - for dummy_checker_name, checker in self.checkers.items(): - checker = checker() - version_results = checker.get_versions(lines, filename) - - if version_results.matched_filename or version_results.matched_contains: - for version in version_results.versions: - if version == "UNKNOWN": - file_path = "".join(self.file_stack) - self.logger.debug( - f"{dummy_checker_name} was detected with version UNKNOWN in file {file_path}" - ) - else: - file_path = "".join(self.file_stack) - self.logger.debug( - f"{file_path} matched {dummy_checker_name} {version} ({version_results.matched_filename=}, {version_results.matched_contains=})" - ) - for vendor, product in checker.VENDOR_PRODUCT: - yield ScanInfo( - ProductInfo(vendor, product, version), - file_path, - ) + if self.pyperscan: + self.build_pyperscan_database(self.checkers) + + pyperscan_matches = dict() + scanner = self.pyperscan_db.build(pyperscan_matches, self.pyperscan_match) + scanner.scan(lines.encode()) + + for dummy_checker_name, (checker, offset, end) in pyperscan_matches.items(): + # Confirm pyperscan match with get_versions as pyperscan doesn't support + # group capture. SOM_LEFTMOST is not enabled (offset is always 0) + version_results = checker.get_versions(lines[offset:end], filename) + yield from self.parse_version_match( + dummy_checker_name, checker, version_results + ) + else: + # tko + for dummy_checker_name, checker in self.checkers.items(): + checker = checker() + version_results = checker.get_versions(lines, filename) + yield from self.parse_version_match( + dummy_checker_name, checker, version_results + ) self.logger.debug(f"Done scanning file: {filename}") + def parse_version_match( + self, + dummy_checker_name: str, + checker: Checker, + version_results: VersionMatchInfo, + ): + if version_results.matched_filename or version_results.matched_contains: + for version in version_results.versions: + if version == "UNKNOWN": + file_path = "".join(self.file_stack) + self.logger.debug( + f"{dummy_checker_name} was detected with version UNKNOWN in file {file_path}" + ) + else: + file_path = "".join(self.file_stack) + self.logger.debug( + f"{file_path} matched {dummy_checker_name} {version} ({version_results.matched_filename=}, {version_results.matched_contains=})" + ) + for vendor, product in checker.VENDOR_PRODUCT: + yield ScanInfo( + ProductInfo(vendor, product, version), + file_path, + ) + @staticmethod def clean_file_path(filepath: str) -> str: """Returns a cleaner filepath by removing temp path from filepath""" diff --git a/doc/MANUAL.md b/doc/MANUAL.md index cd5c8c6010..49f5779fb2 100644 --- a/doc/MANUAL.md +++ b/doc/MANUAL.md @@ -37,6 +37,7 @@ - [Checkers Arguments](#checkers-arguments) - [-s SKIPS, --skips SKIPS](#-s-skips---skips-skips) - [-r CHECKERS, --runs CHECKERS](#-r-checkers---runs-checkers) + - [--pyperscan](#--pyperscan) - [Input Arguments](#input-arguments) - [directory (positional argument)](#directory-positional-argument) - [-i INPUT\_FILE, --input-file INPUT\_FILE](#-i-input_file---input-file-input_file) @@ -214,6 +215,7 @@ which is useful if you're trying the latest code from -s SKIPS, --skips SKIPS comma-separated list of checkers to disable -r RUNS, --runs RUNS comma-separated list of checkers to enable + --pyperscan use pyperscan for binary checkers (unsupported on Windows) Database Management: --import-json IMPORT_JSON @@ -889,6 +891,19 @@ This option allows one to skip (disable) a comma-separated list of checkers and This option allows one to enable a comma-separated list of checkers. +### --pyperscan + +The pyperscan flag enables pyperscan support in the CVE Binary Tool. [pyperscan](https://github.com/vlaci/pyperscan) is an opinionated Python binding for [Hyperscan](https://www.hyperscan.io) focusing on ease of use and safety. + +When pyperscan flag is enabled, the tool leverages on Hyperscan high-performance regular expression matching library to runs simultaneously all binary version checkers on a file which significantly reduces processing time. + +pyperscan package is used instead of the most well-known hyperscan package as pyperscan allows to add a tag for each pattern. This feature allows to easily retrieve the binary checker associated to the matched pattern. + +> **Note**: pyperscan is unsupported on Windows. + +> **Note**: [Default](https://github.com/vlaci/pyperscan/issues/35) configuration of pyperscan uses [vectorscan](https://github.com/VectorCamp/vectorscan), a fork of Intel's Hyperscan, modified to run on more platforms. + + ## Input Arguments ### directory (positional argument) diff --git a/requirements.csv b/requirements.csv index 3ad34e857e..9d2aa64713 100644 --- a/requirements.csv +++ b/requirements.csv @@ -26,3 +26,4 @@ the_purl_authors_not_in_db,packageurl-python h2non,filetype python,setuptools jaraco,zipp +vlaci_not_in_db,pyperscan diff --git a/requirements.txt b/requirements.txt index e411ca20d2..2ec2e4e55a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ lib4vex>=0.2.0 packageurl-python packaging>=22.0 plotly +pyperscan; sys_platform != 'win32' # pyperscan unsupported on Windows python-gnupg pyyaml>=5.4 requests>=2.32.2 diff --git a/test/test_cli.py b/test/test_cli.py index 3c65e44611..5e4abc518c 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -236,6 +236,18 @@ def test_runs(self, caplog): main(["cve-bin-tool", test_path, "-r", ",".join(runs)]) self.check_checkers_log(caplog, skip_checkers, runs) + def test_pyperscan(self, caplog): + test_path = str(Path(__file__).parent.resolve() / "csv") + + with caplog.at_level(logging.INFO): + main(["cve-bin-tool", "--pyperscan", test_path]) + assert ( + "cve_bin_tool", + logging.WARNING, + "pyperscan requested", + ) in caplog.record_tuples + caplog.clear() + @pytest.mark.skipif(not LONG_TESTS(), reason="Update flag tests are long tests") def test_update(self, caplog): test_path = str(Path(__file__).parent.resolve() / "csv")