Skip to content

Commit 73efff3

Browse files
committed
feat: replaced binwalk with binary forensics plugin
which uses the new binwalk v3 instead of binwalk v2
1 parent fd69918 commit 73efff3

File tree

14 files changed

+471
-215
lines changed

14 files changed

+471
-215
lines changed
File renamed without changes.
File renamed without changes.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING
5+
6+
from pydantic import BaseModel
7+
from semver import Version
8+
9+
import config
10+
from analysis.plugin import AnalysisPluginV0
11+
from plugins.analysis.binary_forensics.internal.binwalk import BinwalkSignatureResult, get_binwalk_signature_analysis
12+
from plugins.analysis.binary_forensics.internal.entropy import Entropy, get_entropy_analysis
13+
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED
14+
15+
if TYPE_CHECKING:
16+
from io import FileIO
17+
18+
19+
class AnalysisPlugin(AnalysisPluginV0):
20+
class Schema(BaseModel):
21+
entropy: Entropy
22+
file_matches: list[BinwalkSignatureResult]
23+
24+
def __init__(self):
25+
super().__init__(
26+
metadata=AnalysisPluginV0.MetaData(
27+
name='binary_forensics',
28+
description='binary forensic analysis (entropy and Binwalk file signatures)',
29+
version=Version(1, 0, 0),
30+
Schema=self.Schema,
31+
mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED],
32+
),
33+
)
34+
self.thresholds = {
35+
'very high entropy': self._get_plugin_cfg_entry('very_high_entropy_threshold', 0.95),
36+
'high entropy': self._get_plugin_cfg_entry('high_entropy_threshold', 0.8),
37+
'medium high entropy': self._get_plugin_cfg_entry('medium_high_entropy_threshold', 0.6),
38+
'medium entropy': self._get_plugin_cfg_entry('medium_entropy_threshold', 0.4),
39+
'medium low entropy': self._get_plugin_cfg_entry('medium_low_entropy_threshold', 0.2),
40+
'low entropy': self._get_plugin_cfg_entry('low_entropy_threshold', 0.05),
41+
}
42+
43+
def _get_plugin_cfg_entry(self, name: str, default: float) -> float:
44+
entry = getattr(config.backend.plugin.get(self.metadata.name, {}), name, default)
45+
try:
46+
return float(entry)
47+
except (TypeError, ValueError):
48+
logging.warning(f'Failed to parse config entry {name} of plugin {self.metadata.name} (should be float)')
49+
return default
50+
51+
def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema:
52+
del virtual_file_path, analyses
53+
54+
return self.Schema(
55+
entropy=get_entropy_analysis(file_handle),
56+
file_matches=get_binwalk_signature_analysis(file_handle, timeout=self.metadata.timeout),
57+
)
58+
59+
def summarize(self, result: Schema) -> list:
60+
return [*self._summarize_entropy(result.entropy), *self._summarize_binwalk_result(result.file_matches)]
61+
62+
def _summarize_entropy(self, result: Entropy) -> list[str]:
63+
for key, value in self.thresholds.items():
64+
if result.avg_entropy > value:
65+
return [key]
66+
return ['very low entropy']
67+
68+
@staticmethod
69+
def _summarize_binwalk_result(binwalk_result: list[BinwalkSignatureResult]) -> list[str]:
70+
summary = []
71+
for item in binwalk_result:
72+
summary.append(item.name)
73+
return summary
File renamed without changes.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import logging
5+
from pathlib import Path
6+
from tempfile import NamedTemporaryFile
7+
from typing import TYPE_CHECKING
8+
9+
from docker.errors import DockerException
10+
from docker.types import Mount
11+
from pydantic import BaseModel
12+
from requests.exceptions import JSONDecodeError, ReadTimeout
13+
14+
from analysis.plugin.plugin import AnalysisFailedError
15+
from helperFunctions.docker import run_docker_container
16+
17+
if TYPE_CHECKING:
18+
from io import FileIO
19+
20+
21+
DOCKER_IMAGE = 'refirmlabs/binwalk:latest'
22+
23+
24+
class BinwalkSignatureResult(BaseModel):
25+
offset: int
26+
id: str
27+
size: int
28+
name: str
29+
confidence: int
30+
description: str
31+
32+
33+
def get_binwalk_signature_analysis(file: FileIO, timeout: int) -> list[BinwalkSignatureResult]:
34+
return _parse_binwalk_output(_get_docker_output(file, timeout))
35+
36+
37+
def _parse_binwalk_output(binwalk_output: list[dict]) -> list[BinwalkSignatureResult]:
38+
"""
39+
Expected result structure: (binwalk 3.1.1)
40+
[
41+
{
42+
'Analysis': {
43+
'file_path': '/io/input',
44+
'file_map': [
45+
{
46+
'offset': <int>,
47+
'id': <str>,
48+
'size': <int>,
49+
'name': <str>,
50+
'confidence': <int>,
51+
'description': <str>,
52+
'always_display': <bool>,
53+
'extraction_declined': <bool>,
54+
},
55+
...
56+
],
57+
}
58+
}
59+
]
60+
The outer array has only one entry, since we analyze only one file
61+
"""
62+
try:
63+
return [
64+
BinwalkSignatureResult(
65+
offset=file_result['offset'],
66+
id=file_result['id'],
67+
size=file_result['size'],
68+
name=file_result['name'],
69+
confidence=file_result['confidence'],
70+
description=file_result['description'],
71+
)
72+
for file_result in binwalk_output[0]['Analysis']['file_map']
73+
]
74+
except (KeyError, IndexError) as err:
75+
# FixMe: sadly, there are no tags for the docker container versions, so we can't pin it at the moment
76+
# this should not happen -- if it happens, the plugin needs to be fixed
77+
logging.exception('Failed to binwalk result')
78+
raise AnalysisFailedError('Failed to binwalk result') from err
79+
80+
81+
def _get_docker_output(file: FileIO, timeout: int) -> list[dict]:
82+
container_input_path = '/io/input'
83+
container_output_path = '/io/output'
84+
with NamedTemporaryFile() as temp_file:
85+
Path(temp_file.name).touch()
86+
try:
87+
run_docker_container(
88+
DOCKER_IMAGE,
89+
combine_stderr_stdout=True,
90+
timeout=timeout - 1,
91+
command=f'{container_input_path} -l {container_output_path}',
92+
mounts=[
93+
Mount(container_input_path, file.name, type='bind', read_only=True),
94+
Mount(container_output_path, temp_file.name, type='bind', read_only=False),
95+
],
96+
logging_label='binwalk',
97+
)
98+
return json.loads(Path(temp_file.name).read_text())
99+
except ReadTimeout as err:
100+
raise AnalysisFailedError('Docker container timed out') from err
101+
except (DockerException, OSError) as err:
102+
raise AnalysisFailedError('Docker process error') from err
103+
except JSONDecodeError as err:
104+
raise AnalysisFailedError('Docker output JSON parsing error') from err
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from __future__ import annotations
2+
3+
import math
4+
from pathlib import Path
5+
from typing import TYPE_CHECKING, List
6+
7+
from entropython import metric_entropy
8+
from pydantic import BaseModel
9+
10+
if TYPE_CHECKING:
11+
from io import FileIO
12+
13+
BLOCK_SIZE_MIN = 2**10 # 1 KiB
14+
BLOCK_SIZE_MAX = 2**20 # 1 MiB
15+
16+
17+
class Block(BaseModel):
18+
offset: int
19+
entropy: float
20+
21+
22+
class Entropy(BaseModel):
23+
avg_entropy: float
24+
blocks: List[Block]
25+
blocksize: int
26+
27+
28+
def get_entropy_analysis(file_handle: FileIO) -> Entropy:
29+
file = Path(file_handle.name)
30+
size = file.stat().st_size
31+
if size == 0:
32+
return Entropy(avg_entropy=0, blocksize=0, blocks=[])
33+
34+
blocksize = _get_blocksize(size)
35+
blocks = []
36+
offset = 0
37+
with file.open('rb') as fp:
38+
while block := fp.read(blocksize):
39+
blocks.append(Block(offset=offset, entropy=metric_entropy(block)))
40+
offset += len(block)
41+
avg_entropy = _calculate_avg_entropy(blocks, size, blocksize)
42+
return Entropy(avg_entropy=avg_entropy, blocksize=blocksize, blocks=blocks)
43+
44+
45+
def _get_blocksize(file_size: int) -> int:
46+
# this will always give 32 to 64 points to plot (except the file is smaller than 15 KiB or larger than 32 MiB)
47+
blocksize = 2 ** (math.floor(math.log2(file_size)) - 5)
48+
return min(BLOCK_SIZE_MAX, max(blocksize, BLOCK_SIZE_MIN))
49+
50+
51+
def _calculate_avg_entropy(blocks: list[Block], file_size: int, blocksize: int) -> float:
52+
avg_entropy = 0
53+
for block in blocks[:-1]:
54+
avg_entropy += block.entropy * blocksize
55+
last_block_size = file_size - blocks[-1].offset
56+
avg_entropy += blocks[-1].entropy * last_block_size
57+
return avg_entropy / file_size

0 commit comments

Comments
 (0)