Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 128 additions & 1 deletion fickling/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@
from collections.abc import Iterable, Iterator
from enum import Enum

from fickling.exception import ResourceExhaustionError
from fickling.fickle import (
BUILTIN_MODULE_NAMES,
SAFE_BUILTINS,
BinGet,
BinPut,
Dup,
Get,
InterpretationError,
Interpreter,
LongBinGet,
LongBinPut,
Memoize,
Pickled,
Proto,
Put,
)


Expand All @@ -35,7 +44,29 @@ def __init__(self, pickled: Pickled):
self.results_by_analysis: dict[type[Analysis], list[AnalysisResult]] = defaultdict(list)

def analyze(self, analysis: Analysis) -> list[AnalysisResult]:
results = list(analysis.analyze(self))
try:
results = list(analysis.analyze(self))
except ResourceExhaustionError as e:
# Resource limits exceeded - this is a DoS attack indicator
results = [
AnalysisResult(
Severity.LIKELY_OVERTLY_MALICIOUS,
f"Resource exhaustion detected during analysis: {e}; "
f"this is indicative of an expansion attack (Billion Laughs style)",
"ResourceExhaustion",
trigger=f"{e.resource_type}: {e.actual}",
)
]
except (ValueError, IndexError, RecursionError) as e:
# Malformed pickle caused an interpretation error
results = [
AnalysisResult(
Severity.LIKELY_UNSAFE,
f"The pickle file has malformed opcode sequences ({type(e).__name__}: {e}); "
f"it is either corrupted or attempting to bypass the pickle security analysis",
"InterpretationError",
)
]
if not results:
self.results_by_analysis[type(analysis)].append(AnalysisResult(Severity.LIKELY_SAFE))
else:
Expand Down Expand Up @@ -415,6 +446,102 @@ def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]:
)


class ExpansionAttackAnalysis(Analysis):
"""Detects potential exponential expansion attacks (Billion Laughs style).

These attacks use:
- High GET/PUT ratio: Many GET operations retrieving memoized values
- Excessive DUP operations: Duplicating stack items repeatedly
"""

# Thresholds for pattern detection
DEFAULT_GET_PUT_RATIO_THRESHOLD = 10 # GETs per PUT that is suspicious
DEFAULT_HIGH_GET_PUT_RATIO_THRESHOLD = 50 # Extremely high ratio
DEFAULT_DUP_COUNT_THRESHOLD = 100 # Number of DUPs that is suspicious

def __init__(
self,
*,
get_put_ratio_threshold: int = DEFAULT_GET_PUT_RATIO_THRESHOLD,
high_get_put_ratio_threshold: int = DEFAULT_HIGH_GET_PUT_RATIO_THRESHOLD,
dup_count_threshold: int = DEFAULT_DUP_COUNT_THRESHOLD,
):
self._get_put_ratio_threshold = get_put_ratio_threshold
self._high_get_put_ratio_threshold = high_get_put_ratio_threshold
self._dup_count_threshold = dup_count_threshold

def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]:
get_count = 0
put_count = 0
dup_count = 0

for opcode in context.pickled:
if isinstance(opcode, BinGet | LongBinGet | Get):
get_count += 1
elif isinstance(opcode, BinPut | LongBinPut | Put | Memoize):
put_count += 1
elif isinstance(opcode, Dup):
dup_count += 1

findings: list[AnalysisResult] = []

# Check for high GET/PUT ratio
if put_count > 0:
ratio = get_count / put_count
if ratio > self._high_get_put_ratio_threshold:
findings.append(
AnalysisResult(
Severity.LIKELY_UNSAFE,
f"Extremely high GET/PUT ratio ({ratio:.1f}:1) detected; "
f"this pattern is indicative of an exponential expansion attack "
f"(Billion Laughs style) that could cause DoS",
"ExpansionAttackAnalysis",
trigger=f"GET/PUT ratio: {ratio:.1f}:1",
)
)
elif ratio > self._get_put_ratio_threshold:
findings.append(
AnalysisResult(
Severity.SUSPICIOUS,
f"High GET/PUT ratio ({ratio:.1f}:1) detected; "
f"this may indicate an expansion attack pattern",
"ExpansionAttackAnalysis",
trigger=f"GET/PUT ratio: {ratio:.1f}:1",
)
)
elif get_count > self._get_put_ratio_threshold:
# GETs with no PUTs is inherently malformed/malicious
findings.append(
AnalysisResult(
Severity.LIKELY_UNSAFE,
f"GET operations ({get_count}) with no PUT operations detected; "
f"this is indicative of a malformed or malicious pickle",
"ExpansionAttackAnalysis",
trigger=f"GET count: {get_count}, PUT count: 0",
)
)

# Check for excessive DUP operations
if dup_count > self._dup_count_threshold:
findings.append(
AnalysisResult(
Severity.SUSPICIOUS,
f"Excessive DUP operations ({dup_count}) detected; "
f"this may indicate a stack duplication attack",
"ExpansionAttackAnalysis",
trigger=f"DUP count: {dup_count}",
)
)

# Multiple indicators together are more severe
if len(findings) > 1:
for finding in findings:
if finding.severity < Severity.LIKELY_UNSAFE:
finding.severity = Severity.LIKELY_UNSAFE

yield from findings


class AnalysisResults:
def __init__(self, pickled: Pickled, results: Iterable[AnalysisResult]):
self.pickled: Pickled = pickled
Expand Down
19 changes: 14 additions & 5 deletions fickling/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from . import __version__, fickle, tracing
from .analysis import Severity, check_safety
from .constants import EXIT_CLEAN, EXIT_ERROR, EXIT_UNSAFE
from .exception import ResourceExhaustionError

DEFAULT_JSON_OUTPUT_FILE = "safety_results.json"

Expand Down Expand Up @@ -183,11 +184,19 @@ def main(argv: list[str] | None = None) -> int:
interpreter = fickle.Interpreter(
pickled, first_variable_id=var_id, result_variable=f"result{i}"
)
if args.trace:
trace = tracing.Trace(interpreter)
print(unparse(trace.run()))
else:
print(unparse(interpreter.to_ast()))
try:
if args.trace:
trace = tracing.Trace(interpreter)
print(unparse(trace.run()))
else:
print(unparse(interpreter.to_ast()))
except ResourceExhaustionError as e:
sys.stderr.write(
f"Error: {e}\n"
"This pickle file may contain an expansion attack. "
"Use --check-safety to analyze it.\n"
)
return 1
var_id = interpreter.next_variable_id
else:
pickled = fickle.Pickled(
Expand Down
19 changes: 19 additions & 0 deletions fickling/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,22 @@ def __init__(self, msg):

def __str__(self):
return self.msg


class ResourceExhaustionError(Exception):
"""Raised when resource limits are exceeded during analysis."""

def __init__(self, resource_type: str, limit: int, actual: int):
self.resource_type = resource_type
self.limit = limit
self.actual = actual
super().__init__(
f"Resource limit exceeded: {resource_type} (limit={limit}, actual={actual})"
)


class ExpansionAttackError(ResourceExhaustionError):
"""Raised when exponential expansion attack (Billion Laughs style) is detected."""

def __init__(self, limit: int, actual: int):
super().__init__("get_ratio", limit, actual)
73 changes: 71 additions & 2 deletions fickling/fickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from abc import ABC, abstractmethod
from collections.abc import Iterable, Iterator, MutableSequence, Sequence
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from pickletools import OpcodeInfo, genops, opcodes
Expand All @@ -19,7 +20,7 @@
overload,
)

from fickling.exception import WrongMethodError
from fickling.exception import ExpansionAttackError, ResourceExhaustionError, WrongMethodError

T = TypeVar("T")

Expand All @@ -29,6 +30,26 @@
from collections.abc import Buffer


@dataclass(frozen=True)
class InterpreterLimits:
"""Resource limits to prevent DoS attacks during pickle interpretation."""

max_opcodes: int = 1_000_000
max_stack_depth: int = 10_000
max_memo_size: int = 100_000
max_get_ratio: int = 50 # Maximum GETs per PUT threshold

def __post_init__(self) -> None:
for field_name in ("max_opcodes", "max_stack_depth", "max_memo_size", "max_get_ratio"):
value = getattr(self, field_name)
if value < 1:
raise ValueError(f"{field_name} must be positive, got {value}")


# Default limits instance (frozen, so safe as a global singleton)
DEFAULT_INTERPRETER_LIMITS = InterpreterLimits()


OpcodeSequence = MutableSequence["Opcode"]
GenericSequence = Sequence[T]
make_constant = ast.Constant
Expand Down Expand Up @@ -1133,6 +1154,12 @@ def ast(self) -> ast.Module:
f"returning empty AST to continue analysis\n"
)
self._ast = ast.Module(body=[], type_ignores=[])
except ResourceExhaustionError:
sys.stderr.write(
"Warning: resource limits exceeded during interpretation; "
"returning empty AST to continue analysis\n"
)
self._ast = ast.Module(body=[], type_ignores=[])
return self._ast

@property
Expand Down Expand Up @@ -1215,7 +1242,11 @@ def __getitem__(self, index: int | slice) -> ast.stmt:

class Interpreter:
def __init__(
self, pickled: Pickled, first_variable_id: int = 0, result_variable: str = "result"
self,
pickled: Pickled,
first_variable_id: int = 0,
result_variable: str = "result",
limits: InterpreterLimits | None = None,
):
self.pickled: Pickled = pickled
self.memory: dict[int, ast.expr] = {}
Expand All @@ -1227,6 +1258,12 @@ def __init__(
self._opcodes: Iterator[Opcode] = iter(pickled)
self._has_cycle: bool = False

# Resource limits and tracking for DoS protection
self.limits: InterpreterLimits = limits or DEFAULT_INTERPRETER_LIMITS
self._opcode_count: int = 0
self._get_count: int = 0
self._put_count: int = 0

@property
def next_variable_id(self) -> int:
return self._var_counter
Expand Down Expand Up @@ -1294,10 +1331,36 @@ def step(self) -> Opcode:
stmt.col_offset = 0
self._module = ast.Module(list(self.module_body), type_ignores=[])
raise StopIteration()
self._opcode_count += 1
self.stack.opcode = opcode
opcode.run(self)
self._check_limits()
return opcode

def _check_limits(self) -> None:
"""Check resource limits and raise if exceeded."""
if self._opcode_count > self.limits.max_opcodes:
raise ResourceExhaustionError("opcodes", self.limits.max_opcodes, self._opcode_count)
if len(self.stack) > self.limits.max_stack_depth:
raise ResourceExhaustionError(
"stack_depth", self.limits.max_stack_depth, len(self.stack)
)
if len(self.memory) > self.limits.max_memo_size:
raise ResourceExhaustionError("memo_size", self.limits.max_memo_size, len(self.memory))
# Check for suspicious GET/PUT ratio (expansion attack indicator)
if self._put_count > 0:
ratio = self._get_count / self._put_count
if ratio > self.limits.max_get_ratio:
raise ExpansionAttackError(self.limits.max_get_ratio, round(ratio))

def track_get(self) -> None:
"""Track a GET operation for DoS protection."""
self._get_count += 1

def track_put(self) -> None:
"""Track a PUT operation for DoS protection."""
self._put_count += 1

def new_variable(self, value: ast.expr, name: str | None = None) -> str:
if name is None:
name = f"_var{self._var_counter}"
Expand Down Expand Up @@ -1503,6 +1566,7 @@ class Put(Opcode):
name = "PUT"

def run(self, interpreter: Interpreter):
interpreter.track_put()
interpreter.memory[self.arg] = interpreter.stack[-1]

def encode_body(self) -> bytes:
Expand All @@ -1514,6 +1578,7 @@ class BinPut(Opcode):
name = "BINPUT"

def run(self, interpreter: Interpreter):
interpreter.track_put()
interpreter.memory[self.arg] = interpreter.stack[-1]

def encode_body(self):
Expand Down Expand Up @@ -1822,6 +1887,7 @@ class BinGet(Opcode):
name = "BINGET"

def run(self, interpreter: Interpreter):
interpreter.track_get()
if self.arg not in interpreter.memory:
sys.stderr.write(
f"Warning: malformed pickle file. BINGET references non-existent memo key {self.arg}; "
Expand All @@ -1840,6 +1906,7 @@ class LongBinGet(Opcode):
name = "LONG_BINGET"

def run(self, interpreter: Interpreter):
interpreter.track_get()
if self.arg not in interpreter.memory:
sys.stderr.write(
f"Warning: malformed pickle file. LONG_BINGET references non-existent memo key {self.arg}; "
Expand All @@ -1858,6 +1925,7 @@ def memo_id(self) -> int:
return int(self.arg)

def run(self, interpreter: Interpreter):
interpreter.track_get()
if self.memo_id not in interpreter.memory:
sys.stderr.write(
f"Warning: malformed pickle file. BINGET references non-existent memo key {self.memo_id}; "
Expand Down Expand Up @@ -1991,6 +2059,7 @@ class Memoize(Opcode):
name = "MEMOIZE"

def run(self, interpreter: Interpreter):
interpreter.track_put()
interpreter.memory[len(interpreter.memory)] = interpreter.stack[-1]


Expand Down
Loading
Loading