Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fact_extractor/helperFunctions/file_system.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
import lzma
from contextlib import contextmanager
Expand Down Expand Up @@ -31,7 +33,7 @@ def get_fact_bin_dir() -> str:
return str(SRC_DIR_PATH / 'bin')


def file_is_empty(file_path) -> bool:
def file_is_empty(file_path: str | Path) -> bool:
"""
Returns True if file in file_path has 0 Bytes
Returns False otherwise
Expand Down
42 changes: 28 additions & 14 deletions fact_extractor/helperFunctions/statistics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import logging
from contextlib import suppress
from typing import TYPE_CHECKING

from common_helper_files import safe_rglob
from common_helper_unpacking_classifier import avg_entropy, get_binary_size_without_padding, is_compressed
from common_helper_unpacking_classifier import (
avg_entropy,
get_file_size_without_padding,
)

from helperFunctions import magic
from helperFunctions.config import read_list_from_config
Expand All @@ -13,6 +17,10 @@
from configparser import ConfigParser
from pathlib import Path

SMALL_SIZE_THRESHOLD = 255
VERY_SMALL_SIZE_THRESHOLD = 50
COMPRESS_ENTROPY_THRESHOLD_SMALL_FILE = 0.65


def add_unpack_statistics(extraction_dir: Path, meta_data: dict):
unpacked_files, unpacked_directories = 0, 0
Expand All @@ -26,30 +34,26 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: dict):
meta_data['number_of_unpacked_directories'] = unpacked_directories


def get_unpack_status(
file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser
):
def get_unpack_status(file_path: Path, extracted_files: list[Path], meta_data: dict, config: ConfigParser):
meta_data['summary'] = []
meta_data['entropy'] = avg_entropy(binary)
meta_data['entropy'] = avg_entropy(file_path)

if not extracted_files and meta_data.get('number_of_excluded_files', 0) == 0:
if magic.from_file(file_path, mime=True) in read_list_from_config(
config, 'ExpertSettings', 'compressed_file_types'
) or not is_compressed(
binary,
compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'),
classifier=avg_entropy,
compressed_types = read_list_from_config(config, 'ExpertSettings', 'compressed_file_types')
mime = magic.from_file(file_path, mime=True)
if mime in compressed_types or not _is_probably_compressed(
file_path.stat().st_size, meta_data['entropy'], config
):
meta_data['summary'] = ['unpacked']
else:
meta_data['summary'] = ['packed']
else:
_detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead'))
_detect_unpack_loss(file_path, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead'))


def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int):
def _detect_unpack_loss(file_path: Path, extracted_files: list[Path], meta_data: dict, header_overhead: int):
decoding_overhead = 1 - meta_data.get('encoding_overhead', 0)
cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead
cleaned_size = get_file_size_without_padding(file_path, blocksize=1024) * decoding_overhead - header_overhead
size_of_extracted_files = _total_size_of_extracted_files(extracted_files)
meta_data['size_packed'] = cleaned_size
meta_data['size_unpacked'] = size_of_extracted_files
Expand All @@ -62,3 +66,13 @@ def _total_size_of_extracted_files(extracted_files: list[Path]) -> int:
with suppress(OSError):
total_size += item.stat().st_size
return total_size


def _is_probably_compressed(file_size: int, entropy: float, config: ConfigParser) -> bool:
if file_size <= VERY_SMALL_SIZE_THRESHOLD:
logging.debug('could not determine compression: file too small')
return False
if file_size <= SMALL_SIZE_THRESHOLD:
logging.debug('compression classification might be wrong: file is small')
return entropy > COMPRESS_ENTROPY_THRESHOLD_SMALL_FILE
return entropy > config.getfloat('ExpertSettings', 'unpack_threshold')
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_unpacker_selection_generic(self):
self.check_unpacker_selection('generic/carver', 'generic_carver')

def test_extraction(self):
in_file = f'{get_test_data_dir()}/generic_carver_test'
in_file = Path(get_test_data_dir()) / 'generic_carver_test'
files, meta_data = self.unpacker.base._extract_files_from_file_using_specific_unpacker(
in_file, self.tmp_dir.name, self.unpacker.base.unpacker_plugins['generic/carver']
)
Expand All @@ -40,7 +40,7 @@ def test_filter(self):
in_file = TEST_DATA_DIR / 'carving_test_file'
assert Path(in_file).is_file()
files, meta_data = self.unpacker.base._extract_files_from_file_using_specific_unpacker(
str(in_file), self.tmp_dir.name, self.unpacker.base.unpacker_plugins['generic/carver']
in_file, self.tmp_dir.name, self.unpacker.base.unpacker_plugins['generic/carver']
)
files = set(files)
assert len(files) == 4, 'file number incorrect'
Expand Down
6 changes: 3 additions & 3 deletions fact_extractor/plugins/unpacking/raw/test/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_unpacker_selection(self):

def test_extraction(self):
input_file = Path(TEST_DATA_DIR, 'raw.bin')
unpacked_files, meta_data = self.unpacker.extract_files_from_file(str(input_file), self.tmp_dir.name)
unpacked_files, meta_data = self.unpacker.extract_files_from_file(input_file, self.tmp_dir.name)
assert meta_data['padding seperated sections'] == 3
assert meta_data['LZMA'] == 1
assert len(unpacked_files) == 4
Expand All @@ -22,7 +22,7 @@ def test_extraction(self):
def test_extraction_encoded(self):
input_file = Path(TEST_DATA_DIR, 'encoded.bin')
unpacked_files, meta_data = self.unpacker.base._extract_files_from_file_using_specific_unpacker(
str(input_file), self.tmp_dir.name, self.unpacker.base.unpacker_plugins['data/raw']
input_file, self.tmp_dir.name, self.unpacker.base.unpacker_plugins['data/raw']
)
assert meta_data['Intel Hex'] == 1
assert meta_data['Motorola S-Record'] == 1
Expand All @@ -32,5 +32,5 @@ def test_extraction_encoded(self):

def test_extraction_nothing_included(self):
input_file = Path(TEST_DATA_DIR, 'nothing.bin')
unpacked_files, _ = self.unpacker.extract_files_from_file(str(input_file), self.tmp_dir.name)
unpacked_files, _ = self.unpacker.extract_files_from_file(input_file, self.tmp_dir.name)
assert len(unpacked_files) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_extraction(self):
in_file = TEST_DATA_DIR / 'testfw_1.enc'
assert in_file.is_file(), 'test file is missing'
files, meta_data = self.unpacker.base._extract_files_from_file_using_specific_unpacker(
str(in_file),
in_file,
self.tmp_dir.name,
self.unpacker.base.unpacker_plugins['firmware/senao-v2b'],
)
Expand Down
2 changes: 1 addition & 1 deletion fact_extractor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get(self, folder):
try:
input_file = list(input_dir.iterdir())[0]
unpacker = Unpacker(config, folder=folder, base=self.unpacking_base)
unpacker.unpack(str(input_file))
unpacker.unpack(input_file)
if self.owner:
change_owner_of_output_files(input_dir.parent, self.owner)
except Exception: # pylint: disable=broad-except
Expand Down
32 changes: 12 additions & 20 deletions fact_extractor/test/unit/helperFunctions/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,22 @@ def test_unpack_status_packed_file(config_fixture):
test_packed_file_path = Path(get_test_data_dir(), 'container/test.7z')

result = {}
get_unpack_status(test_packed_file_path, test_packed_file_path.read_bytes(), [], result, config_fixture)
get_unpack_status(test_packed_file_path, [], result, config_fixture)

assert result['entropy'] > 0.7, 'entropy not valid'
assert result['summary'] == ['packed'], '7z file should be packed'

result = {}
config_fixture.set('ExpertSettings', 'compressed_file_types', 'application/x-7z-compressed, ')
get_unpack_status(test_packed_file_path, test_packed_file_path.read_bytes(), [], result, config_fixture)
get_unpack_status(test_packed_file_path, [], result, config_fixture)
assert result['summary'] == ['unpacked'], 'Unpacking Whitelist does not work'


def test_unpack_status_unpacked_file(config_fixture):
def test_unpack_status_unpacked_file(config_fixture, common_tmpdir):
result = {}
get_unpack_status(Path('/dev/null'), b'aaaaa', [], result, config_fixture)
test_file = Path(common_tmpdir) / 'testfile'
test_file.write_bytes(b'aaaaa')
get_unpack_status(test_file, [], result, config_fixture)

assert result['entropy'] < 0.7, 'entropy not valid'
assert result['summary'] == ['unpacked']
Expand All @@ -49,15 +51,10 @@ def test_detect_unpack_loss_data_lost(config_fixture, common_tmpdir):
included_file = Path(str(common_tmpdir), 'inner')
included_file.write_bytes(256 * b'ABCDEFGH')
result = {'summary': []}
test_file = Path(common_tmpdir) / 'testfile'
test_file.write_bytes(512 * b'ABCDEFGH')

_detect_unpack_loss(
512 * b'ABCDEFGH',
[
included_file,
],
result,
256,
)
_detect_unpack_loss(test_file, [included_file], result, 256)
assert 'data lost' in result['summary']
assert result['size_packed'] == 512 * len(b'ABCDEFGH') - 256
assert result['size_unpacked'] == 256 * len(b'ABCDEFGH')
Expand All @@ -67,13 +64,8 @@ def test_detect_unpack_loss_no_data_lost(config_fixture, common_tmpdir):
included_file = Path(str(common_tmpdir), 'inner')
included_file.write_bytes(512 * b'ABCDEFGH')
result = {'summary': []}
test_file = Path(common_tmpdir) / 'testfile'
test_file.write_bytes(512 * b'ABCDEFGH')

_detect_unpack_loss(
512 * b'ABCDEFGH',
[
included_file,
],
result,
256,
)
_detect_unpack_loss(test_file, [included_file], result, 256)
assert 'no data lost' in result['summary']
2 changes: 1 addition & 1 deletion fact_extractor/test/unit/unpacker/test_unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def check_unpacking_of_standard_unpack_set(
output: bool = True,
ignore: set[str] | None = None,
):
files, meta_data = self.unpacker.base.extract_files_from_file(str(in_file), self.tmp_dir.name)
files, meta_data = self.unpacker.extract_files_from_file(in_file, self.tmp_dir.name)
files = {f for f in files if not any(rule in f for rule in ignore or set())}
assert len(files) == 3, f'file number incorrect: {meta_data}'
assert files == {
Expand Down
16 changes: 9 additions & 7 deletions fact_extractor/unpacker/unpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def __init__(
self._file_folder = data_folder / 'files'
self._report_folder = data_folder / 'reports'

def unpack(self, file_path):
def unpack(self, file_path: str | Path):
file_path = Path(file_path)
if self.base.should_ignore(file_path):
meta_data = {
'plugin_used': None,
Expand All @@ -42,7 +43,7 @@ def unpack(self, file_path):
}
extracted_files = []
else:
logging.debug(f'Extracting {Path(file_path).name}')
logging.debug(f'Extracting {file_path.name}')

tmp_dir = TemporaryDirectory(prefix='fact_unpack_')

Expand All @@ -55,9 +56,8 @@ def unpack(self, file_path):

compute_stats = self.config.getboolean('ExpertSettings', 'statistics', fallback=True)
if compute_stats:
binary = Path(file_path).read_bytes()
add_unpack_statistics(self._file_folder, meta_data)
get_unpack_status(file_path, binary, extracted_files, meta_data, self.config)
get_unpack_status(file_path, extracted_files, meta_data, self.config)

self.cleanup(tmp_dir)

Expand All @@ -66,7 +66,7 @@ def unpack(self, file_path):
return extracted_files

def _do_fallback_if_necessary(
self, extracted_files: List, meta_data: Dict, tmp_dir: str, file_path: str
self, extracted_files: List, meta_data: Dict, tmp_dir: str, file_path: Path
) -> Tuple[List, Dict]:
if meta_data.get('number_of_excluded_files', 0) > 0:
# If files have been excluded, extracted_files might be empty, but
Expand All @@ -90,7 +90,7 @@ def cleanup(tmp_dir: TemporaryDirectory):
try:
tmp_dir.cleanup()
except OSError as error:
logging.error(f'Could not CleanUp tmp_dir: {error}', exc_info=True)
logging.error(f'Could not clean up tmp_dir: {error}', exc_info=True)

def move_extracted_files(self, file_paths: List[str], extraction_dir: Path) -> List[Path]:
extracted_files = []
Expand All @@ -110,10 +110,12 @@ def move_extracted_files(self, file_paths: List[str], extraction_dir: Path) -> L

def extract_files_from_file(self, file_path: str | Path, tmp_dir) -> Tuple[List, Dict]:
"""For backwards compatibility of tests"""
if not isinstance(file_path, Path):
file_path = Path(file_path)
return self.base.extract_files_from_file(file_path, tmp_dir)


def unpack(file_path: str, config, extract_everything: bool = False, folder: str | None = None):
def unpack(file_path: str | Path, config, extract_everything: bool = False, folder: str | None = None):
extracted_objects = Unpacker(config, extract_everything, folder).unpack(file_path)
logging.info(f'{len(extracted_objects)} files extracted')
path_extracted_files = '\n'.join(str(path) for path in extracted_objects)
Expand Down
23 changes: 12 additions & 11 deletions fact_extractor/unpacker/unpackBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
import fnmatch
import logging
from os import getgid, getuid
from pathlib import Path
from subprocess import PIPE, Popen
from time import time
from typing import Callable, Dict, List, Tuple
from typing import TYPE_CHECKING, Callable, Dict, List, Tuple

from common_helper_files import get_files_in_dir

from helperFunctions import magic
from helperFunctions.config import read_list_from_config
from helperFunctions.plugin import import_plugins

if TYPE_CHECKING:
from pathlib import Path


class UnpackBase:
"""
Expand Down Expand Up @@ -52,11 +54,11 @@ def get_unpacker(self, mime_type: str):
return self.unpacker_plugins[mime_type]
return self.unpacker_plugins['generic/carver']

def extract_files_from_file(self, file_path: str | Path, tmp_dir) -> Tuple[List, Dict]:
def extract_files_from_file(self, file_path: Path, tmp_dir) -> Tuple[List, Dict]:
current_unpacker = self.get_unpacker(magic.from_file(file_path, mime=True))
return self._extract_files_from_file_using_specific_unpacker(str(file_path), tmp_dir, current_unpacker)
return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, current_unpacker)

def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]:
def unpacking_fallback(self, file_path: Path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]:
fallback_plugin = self.unpacker_plugins[fallback_plugin_mime]
old_meta[f"""0_FALLBACK_{old_meta['plugin_used']}"""] = (
f"""{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)"""
Expand All @@ -67,12 +69,11 @@ def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime)
file_path, tmp_dir, fallback_plugin, meta_data=old_meta
)

def should_ignore(self, file):
path = str(file)
return any(fnmatch.fnmatchcase(path, pattern) for pattern in self.exclude)
def should_ignore(self, file: str | Path) -> bool:
return any(fnmatch.fnmatchcase(str(file), pattern) for pattern in self.exclude)

def _extract_files_from_file_using_specific_unpacker(
self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict | None = None
self, file_path: Path, tmp_dir: str, selected_unpacker, meta_data: dict | None = None
) -> Tuple[List, Dict]:
unpack_function, name, version = (
selected_unpacker # TODO Refactor register method to directly use four parameters instead of three
Expand All @@ -83,10 +84,10 @@ def _extract_files_from_file_using_specific_unpacker(
meta_data['plugin_used'] = name
meta_data['plugin_version'] = version

logging.info(f'Trying to unpack "{Path(file_path).name}" with plugin {name}')
logging.info(f'Trying to unpack "{file_path.name}" with plugin {name}')

try:
additional_meta = unpack_function(file_path, tmp_dir)
additional_meta = unpack_function(str(file_path), tmp_dir)
except Exception as error:
logging.debug(f'Unpacking of {file_path} failed: {error}', exc_info=True)
additional_meta = {'error': f'{type(error)}: {error!s}'}
Expand Down
2 changes: 1 addition & 1 deletion requirements-unpackers.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# FixMe: deprecated
pluginbase~=1.0.1
git+https://github.com/fkie-cad/common_helper_unpacking_classifier.git
git+https://github.com/fkie-cad/common_helper_unpacking_classifier.git@0.6.0
python-magic
patool~=3.1.3
# jffs2: jefferson + deps
Expand Down
Loading