|
| 1 | +import logging |
| 2 | +from abc import ABC |
| 3 | +from typing import List, Optional |
| 4 | + |
| 5 | +from PySquashfsImage import SquashFsImage |
| 6 | + |
| 7 | +from credsweeper.credentials.candidate import Candidate |
| 8 | +from credsweeper.deep_scanner.abstract_scanner import AbstractScanner |
| 9 | +from credsweeper.file_handler.data_content_provider import DataContentProvider |
| 10 | +from credsweeper.file_handler.file_path_extractor import FilePathExtractor |
| 11 | +from credsweeper.utils.util import Util |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | + |
| 16 | +class SquashfsScanner(AbstractScanner, ABC): |
| 17 | + """Implements squash file system scanning""" |
| 18 | + |
| 19 | + def data_scan( |
| 20 | + self, # |
| 21 | + data_provider: DataContentProvider, # |
| 22 | + depth: int, # |
| 23 | + recursive_limit_size: int) -> Optional[List[Candidate]]: |
| 24 | + """Extracts files one by one from tar archive and launches data_scan""" |
| 25 | + try: |
| 26 | + candidates = [] |
| 27 | + with SquashFsImage.from_bytes(data_provider.data) as image: |
| 28 | + for i in image: |
| 29 | + # skip directory |
| 30 | + if not i.is_file or i.is_symlink: |
| 31 | + continue |
| 32 | + logger.warning(f"{i.path}") |
| 33 | + if FilePathExtractor.check_exclude_file(self.config, i.path): |
| 34 | + continue |
| 35 | + if 0 > recursive_limit_size - i.size: |
| 36 | + logger.error(f"{i.name}: size {i.size}" |
| 37 | + f" is over limit {recursive_limit_size} depth:{depth}") |
| 38 | + continue |
| 39 | + logger.warning(f"{i.path} {i.name}") |
| 40 | + hsqs_content_provider = DataContentProvider(data=image.read_file(i.inode), |
| 41 | + file_path=i.path, |
| 42 | + file_type=Util.get_extension(i.path), |
| 43 | + info=f"{data_provider.info}|HSQS:{i.path}") |
| 44 | + # Nevertheless, use extracted data size |
| 45 | + new_limit = recursive_limit_size - len(hsqs_content_provider.data) |
| 46 | + logger.info(f"{i.name}: size {len(hsqs_content_provider.data)}") |
| 47 | + hsqs_candidates = self.recursive_scan(hsqs_content_provider, depth, new_limit) |
| 48 | + candidates.extend(hsqs_candidates) |
| 49 | + return candidates |
| 50 | + except Exception as hsqs_exc: |
| 51 | + logger.error(f"{data_provider.file_path}:{hsqs_exc}") |
| 52 | + return None |
0 commit comments