Skip to content

Commit 373085b

Browse files
authored
Merge pull request #558 from onekey-sec/entropy
Report calculated entropies
2 parents c5837b7 + 282deff commit 373085b

File tree

6 files changed

+180
-44
lines changed

6 files changed

+180
-44
lines changed

tests/test_processing.py

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,30 @@
22
import sys
33
import zipfile
44
from pathlib import Path
5-
from typing import Collection, List, Tuple
5+
from typing import Collection, List, Tuple, Type, TypeVar
66

77
import attr
88
import pytest
99

10+
from unblob import handlers
1011
from unblob.models import UnknownChunk, ValidChunk
1112
from unblob.processing import (
1213
ExtractionConfig,
1314
calculate_buffer_size,
1415
calculate_entropy,
1516
calculate_unknown_chunks,
16-
draw_entropy_plot,
17+
format_entropy_plot,
1718
process_file,
1819
remove_inner_chunks,
1920
)
20-
from unblob.report import ExtractDirectoryExistsReport, StatReport
21+
from unblob.report import (
22+
EntropyReport,
23+
ExtractDirectoryExistsReport,
24+
StatReport,
25+
UnknownChunkReport,
26+
)
27+
28+
T = TypeVar("T")
2129

2230

2331
def assert_same_chunks(expected, actual, explanation=None):
@@ -129,32 +137,34 @@ def test_calculate_buffer_size(
129137
)
130138

131139

132-
def test_draw_entropy_plot_error():
140+
def test_format_entropy_plot_error():
133141
with pytest.raises(TypeError):
134-
draw_entropy_plot([])
142+
format_entropy_plot(percentages=[], buffer_size=1024)
135143

136144

137145
@pytest.mark.parametrize(
138-
"percentages",
146+
"percentages, buffer_size",
139147
[
140-
pytest.param([0.0] * 100, id="zero-array"),
141-
pytest.param([99.99] * 100, id="99-array"),
142-
pytest.param([100.0] * 100, id="100-array"),
148+
pytest.param([0.0] * 100, 1024, id="zero-array"),
149+
pytest.param([99.99] * 100, 1024, id="99-array"),
150+
pytest.param([100.0] * 100, 1024, id="100-array"),
151+
pytest.param([100.0] * 100, -1, id="buffer_size-can-be-anything1"),
152+
pytest.param([100.0] * 100, None, id="buffer_size-can-be-anything2"),
153+
pytest.param([100.0] * 100, "None", id="buffer_size-can-be-anything3"),
143154
],
144155
)
145-
def test_draw_entropy_plot_no_exception(percentages: List[float]):
146-
assert draw_entropy_plot(percentages) is None
156+
def test_format_entropy_plot_no_exception(percentages: List[float], buffer_size: int):
157+
assert str(buffer_size) in format_entropy_plot(
158+
percentages=percentages, buffer_size=buffer_size
159+
)
147160

148161

149-
@pytest.mark.parametrize(
150-
"path, draw_plot",
151-
[
152-
pytest.param(Path(sys.executable), True, id="draw-plot"),
153-
pytest.param(Path(sys.executable), False, id="no-plot"),
154-
],
155-
)
156-
def test_calculate_entropy_no_exception(path: Path, draw_plot: bool):
157-
assert calculate_entropy(path, draw_plot=draw_plot) is None
162+
def test_calculate_entropy_no_exception():
163+
report = calculate_entropy(Path(sys.executable))
164+
format_entropy_plot(
165+
percentages=report.percentages,
166+
buffer_size=report.buffer_size,
167+
)
158168

159169

160170
@pytest.mark.parametrize(
@@ -311,3 +321,71 @@ def test_processing_with_non_posix_paths(tmp_path: Path):
311321
is_link=False,
312322
link_target=None,
313323
)
324+
325+
326+
def test_entropy_calculation(tmp_path: Path):
327+
"""Process a file with unknown chunk and a zip file with entropy calculation enabled.
328+
329+
The input file structure is
330+
- zip-chunk
331+
- empty.txt
332+
- 0-255.bin
333+
- unknown_chunk
334+
"""
335+
#
336+
# ** input
337+
338+
input_file = tmp_path / "input-file"
339+
with zipfile.ZipFile(input_file, "w") as zf:
340+
zf.writestr("empty.txt", data=b"")
341+
zf.writestr("0-255.bin", data=bytes(range(256)))
342+
343+
# entropy is calculated in 1Kb blocks for files smaller than 80Kb
344+
# so let's have 1 block with 0 entropy, 1 with 6 bit entropy, the rest with 8 bit entropy
345+
unknown_chunk_content = (
346+
bytes(1024) + bytes(range(64)) * 4 * 4 + bytes(range(256)) * 4 * 62
347+
)
348+
with input_file.open("ab") as f:
349+
f.write(unknown_chunk_content)
350+
351+
config = ExtractionConfig(
352+
extract_root=tmp_path / "extract_root",
353+
entropy_depth=100,
354+
entropy_plot=True,
355+
handlers=(handlers.archive.zip.ZIPHandler,),
356+
)
357+
358+
# ** action
359+
360+
process_result = process_file(config, input_file)
361+
362+
task_result_by_name = {r.task.path.name: r for r in process_result.results}
363+
364+
def get_all(file_name, report_type: Type[T]) -> List[T]:
365+
return [
366+
r
367+
for r in task_result_by_name[file_name].reports
368+
if isinstance(r, report_type)
369+
]
370+
371+
# ** verification
372+
373+
# the unknown chunk report for the second chunk for the input file should have an entropy report
374+
# with a percentages (scaled up bits) of 64 items, for 0, 6, 8, 8, ... bits of entropies
375+
[unknown_chunk_report] = get_all("input-file", UnknownChunkReport)
376+
unknown_entropy = unknown_chunk_report.entropy
377+
assert unknown_entropy == EntropyReport(
378+
percentages=[0.0, 75.0] + [100.0] * 62, buffer_size=1024
379+
)
380+
assert (
381+
unknown_entropy is not None
382+
) # removes pyright complaints for the below 3 lines :(
383+
assert round(unknown_entropy.mean, 2) == 98.05 # noqa: PLR2004
384+
assert unknown_entropy.highest == 100.0 # noqa: PLR2004
385+
assert unknown_entropy.lowest == 0.0 # noqa: PLR2004
386+
387+
# we should have entropy calculated for files without extractions, except for empty files
388+
assert [] == get_all("empty.txt", EntropyReport)
389+
assert [EntropyReport(percentages=[100.0], buffer_size=1024)] == get_all(
390+
"0-255.bin", EntropyReport
391+
)

tests/test_report.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,12 +312,26 @@ def hello_kitty_task_results(
312312
sha1="febca6ed75dc02e0def065e7b08f1cca87b57c74",
313313
sha256="144d8b2c949cb4943128aa0081153bcba4f38eb0ba26119cc06ca1563c4999e1",
314314
),
315-
UnknownChunkReport(chunk_id=ANY, start_offset=0, end_offset=6, size=6),
316315
UnknownChunkReport(
317-
chunk_id=ANY, start_offset=131, end_offset=138, size=7
316+
chunk_id=ANY,
317+
start_offset=0,
318+
end_offset=6,
319+
size=6,
320+
entropy=None,
321+
),
322+
UnknownChunkReport(
323+
chunk_id=ANY,
324+
start_offset=131,
325+
end_offset=138,
326+
size=7,
327+
entropy=None,
318328
),
319329
UnknownChunkReport(
320-
chunk_id=ANY, start_offset=263, end_offset=264, size=1
330+
chunk_id=ANY,
331+
start_offset=263,
332+
end_offset=264,
333+
size=1,
334+
entropy=None,
321335
),
322336
ChunkReport(
323337
chunk_id=hello_id,

unblob/math.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
from typing import Callable
2+
3+
shannon_entropy: Callable[[bytes], float]
4+
15
try:
26
from ._rust import shannon_entropy # type: ignore
37
except ImportError:

unblob/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from .file_utils import Endian, File, InvalidInputFormat, StructParser
1212
from .identifiers import new_id
1313
from .parser import hexstring2regex
14-
from .report import ChunkReport, ErrorReport, Report, UnknownChunkReport
14+
from .report import ChunkReport, EntropyReport, ErrorReport, Report, UnknownChunkReport
1515

1616
logger = get_logger()
1717

@@ -123,12 +123,13 @@ class UnknownChunk(Chunk):
123123
like most common bytes (like \x00 and \xFF), ASCII strings, high entropy, etc.
124124
"""
125125

126-
def as_report(self) -> UnknownChunkReport:
126+
def as_report(self, entropy: Optional[EntropyReport]) -> UnknownChunkReport:
127127
return UnknownChunkReport(
128128
chunk_id=self.chunk_id,
129129
start_offset=self.start_offset,
130130
end_offset=self.end_offset,
131131
size=self.size,
132+
entropy=entropy,
132133
)
133134

134135

unblob/processing.py

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import multiprocessing
22
import shutil
3-
import statistics
43
from operator import attrgetter
54
from pathlib import Path
65
from typing import Iterable, List, Optional, Sequence
@@ -30,6 +29,7 @@
3029
)
3130
from .pool import make_pool
3231
from .report import (
32+
EntropyReport,
3333
ExtractDirectoryExistsReport,
3434
FileMagicReport,
3535
HashReport,
@@ -332,7 +332,9 @@ def process(self):
332332
else:
333333
# we don't consider whole files as unknown chunks, but we still want to
334334
# calculate entropy for whole files which produced no valid chunks
335-
self._calculate_entropy(self.task.path)
335+
entropy = self._calculate_entropy(self.task.path)
336+
if entropy:
337+
self.result.add_report(entropy)
336338

337339
def _process_chunks(
338340
self,
@@ -345,15 +347,26 @@ def _process_chunks(
345347

346348
for chunk in unknown_chunks:
347349
carved_unknown_path = carve_unknown_chunk(self.carve_dir, file, chunk)
348-
self._calculate_entropy(carved_unknown_path)
349-
self.result.add_report(chunk.as_report())
350+
entropy = self._calculate_entropy(carved_unknown_path)
351+
self.result.add_report(chunk.as_report(entropy=entropy))
350352

351353
for chunk in outer_chunks:
352354
self._extract_chunk(file, chunk)
353355

354-
def _calculate_entropy(self, path: Path):
356+
def _calculate_entropy(self, path: Path) -> Optional[EntropyReport]:
355357
if self.task.depth < self.config.entropy_depth:
356-
calculate_entropy(path, draw_plot=self.config.entropy_plot)
358+
report = calculate_entropy(path)
359+
if self.config.entropy_plot:
360+
logger.debug(
361+
"Entropy chart",
362+
# New line so that chart title will be aligned correctly in the next line
363+
chart="\n"
364+
+ format_entropy_plot(report.percentages, report.buffer_size),
365+
path=path,
366+
_verbosity=3,
367+
)
368+
return report
369+
return None
357370

358371
def _extract_chunk(self, file, chunk: ValidChunk):
359372
skip_carving = chunk.is_whole_file
@@ -484,8 +497,8 @@ def calculate_unknown_chunks(
484497
return unknown_chunks
485498

486499

487-
def calculate_entropy(path: Path, *, draw_plot: bool):
488-
"""Calculate and log shannon entropy divided by 8 for the file in 1mB chunks.
500+
def calculate_entropy(path: Path) -> EntropyReport:
501+
"""Calculate and log shannon entropy divided by 8 for the file in chunks.
489502
490503
Shannon entropy returns the amount of information (in bits) of some numeric
491504
sequence. We calculate the average entropy of byte chunks, which in theory
@@ -499,7 +512,7 @@ def calculate_entropy(path: Path, *, draw_plot: bool):
499512
file_size = path.stat().st_size
500513
logger.debug("Calculating entropy for file", path=path, size=file_size)
501514

502-
# Smaller chuk size would be very slow to calculate.
515+
# Smaller chunk size would be very slow to calculate.
503516
# 1Mb chunk size takes ~ 3sec for a 4,5 GB file.
504517
buffer_size = calculate_buffer_size(
505518
file_size, chunk_count=80, min_limit=1024, max_limit=1024 * 1024
@@ -511,15 +524,19 @@ def calculate_entropy(path: Path, *, draw_plot: bool):
511524
entropy_percentage = round(entropy / 8 * 100, 2)
512525
percentages.append(entropy_percentage)
513526

527+
report = EntropyReport(percentages=percentages, buffer_size=buffer_size)
528+
514529
logger.debug(
515530
"Entropy calculated",
516-
mean=round(statistics.mean(percentages), 2),
517-
highest=max(percentages),
518-
lowest=min(percentages),
531+
path=path,
532+
size=file_size,
533+
buffer_size=report.buffer_size,
534+
mean=round(report.mean, 2),
535+
highest=round(report.highest, 2),
536+
lowest=round(report.lowest, 2),
519537
)
520538

521-
if draw_plot:
522-
draw_entropy_plot(percentages)
539+
return report
523540

524541

525542
def calculate_buffer_size(
@@ -533,11 +550,14 @@ def calculate_buffer_size(
533550
return buffer_size
534551

535552

536-
def draw_entropy_plot(percentages: List[float]):
537-
plt.clear_data()
553+
def format_entropy_plot(percentages: List[float], buffer_size: int):
554+
# start from scratch
555+
plt.clear_figure()
556+
# go colorless
538557
plt.clear_color()
539558
plt.title("Entropy distribution")
540-
plt.xlabel("mB")
559+
# plt.xlabel(humanize.naturalsize(buffer_size))
560+
plt.xlabel(f"{buffer_size} bytes")
541561
plt.ylabel("entropy %")
542562

543563
plt.scatter(percentages, marker="dot")
@@ -549,5 +569,4 @@ def draw_entropy_plot(percentages: List[float]):
549569
# Always show 0% and 100%
550570
plt.yticks(range(0, 101, 10))
551571

552-
# New line so that chart title will be aligned correctly in the next line
553-
logger.debug("Entropy chart", chart="\n" + plt.build(), _verbosity=3)
572+
return plt.build()

unblob/report.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import hashlib
22
import os
33
import stat
4+
import statistics
45
import traceback
56
from enum import Enum
67
from pathlib import Path
@@ -172,6 +173,24 @@ class FileMagicReport(Report):
172173
mime_type: str
173174

174175

176+
@attr.define(kw_only=True)
177+
class EntropyReport(Report):
178+
percentages: List[float]
179+
buffer_size: int
180+
181+
@property
182+
def mean(self):
183+
return statistics.mean(self.percentages)
184+
185+
@property
186+
def highest(self):
187+
return max(self.percentages)
188+
189+
@property
190+
def lowest(self):
191+
return min(self.percentages)
192+
193+
175194
@final
176195
@attr.define(kw_only=True)
177196
class ChunkReport(Report):
@@ -191,3 +210,4 @@ class UnknownChunkReport(Report):
191210
start_offset: int
192211
end_offset: int
193212
size: int
213+
entropy: Optional[EntropyReport]

0 commit comments

Comments
 (0)