Skip to content
16 changes: 9 additions & 7 deletions fickling/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,14 @@ class UnsafeImportsML(Analysis):
"dill": "This module can load and execute arbitrary code.",
"code": "This module can compile and execute arbitrary code.",
"pty": "This module contains functions that can perform system operations and execute arbitrary code.",
"pickle": "This module can deserialize and execute arbitrary code through nested unpickling.",
"_pickle": "This module can deserialize and execute arbitrary code through nested unpickling.",
}

UNSAFE_IMPORTS = {
"torch": {
"load": "This function can load untrusted files and code from arbitrary web sources."
},
"numpy.testing._private.utils": {"runstring": "This function can execute arbitrary code."},
"operator": {
"getitem": "This function can lead to arbitrary code execution",
"attrgetter": "This function can lead to arbitrary code execution",
"itemgetter": "This function can lead to arbitrary code execution",
"methodcaller": "This function can lead to arbitrary code execution",
},
"torch.storage": {
"_load_from_bytes": "This function calls `torch.load()` which is unsafe as using a string argument would "
"allow to load and execute arbitrary code hosted on the internet. However, in this case, the "
Expand All @@ -264,6 +259,13 @@ class UnsafeImportsML(Analysis):
"underlying `torch.load()` call to unpickle that bytestring and execute arbitrary code through nested pickle calls. "
"So this import is safe only if restrictions on pickle (such as Fickling's hooks) have been set properly",
},
"numpy.testing._private.utils": {"runstring": "This function can execute arbitrary code."},
"numpy.f2py.crackfortran": {
"getlincoef": "This function can execute arbitrary code.",
"_eval_length": "This function can execute arbitrary code.",
},
"_io": {"FileIO": "This class can read/write arbitrary files."},
"io": {"FileIO": "This class can read/write arbitrary files."},
}

def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]:
Expand Down
38 changes: 37 additions & 1 deletion fickling/fickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

UNSAFE_IMPORTS: frozenset[str] = frozenset(
[
# Core builtins and system modules
"__builtin__",
"__builtins__",
"builtins",
Expand All @@ -59,6 +60,39 @@
"importlib",
"code",
"multiprocessing",
# File and shell operations
"shutil",
"distutils",
"commands",
# Operator module bypasses
"_operator",
"operator",
"functools",
# Async subprocess execution
"asyncio",
# Code execution via profilers/debuggers
"profile",
"trace",
"pdb",
"bdb",
"timeit",
"doctest",
# Package and environment manipulation
"venv",
"pip",
"ensurepip",
# Network and web modules
"webbrowser",
"aiohttp",
"httplib",
"http",
"ssl",
"requests",
"urllib",
"urllib2",
# IDE and dev tools
"idlelib",
"lib2to3",
]
)

Expand Down Expand Up @@ -1267,7 +1301,9 @@ def run(self, interpreter: Interpreter):
f"Module: {type(module).__name__}, Attr: {type(attr).__name__}"
)

if not all(m.isidentifier() for m in module.split(".")) or not attr.isidentifier():
if not all(m.isidentifier() for m in module.split(".")) or not all(
a.isidentifier() for a in attr.split(".")
):
raise ValueError(
f"Extracted identifiers are not valid Python identifiers. "
f"Module: {module!r}, Attr: {attr!r}"
Expand Down
118 changes: 118 additions & 0 deletions test/test_bypasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,121 @@ def test_unsafe_builtin_eval_still_flagged(self):
detailed = res.detailed_results().get("AnalysisResult", {})
self.assertIsNotNone(detailed.get("UnsafeImports"))
self.assertIsNotNone(detailed.get("UnsafeImportsML"))

# https://github.com/mmaitre314/picklescan/security/advisories/GHSA-955r-x9j8-7rhh
def test_operator_methodcaller(self):
"""Test detection of _operator.methodcaller bypass."""
pickled = Pickled(
[
op.Global.create("builtins", "__import__"),
op.Mark(),
op.Unicode("os"),
op.Tuple(),
op.Reduce(),
op.Put(0),
op.Pop(),
op.Global.create("_operator", "methodcaller"),
op.Mark(),
op.Unicode("system"),
op.Unicode('echo "pwned by _operator.methodcaller"'),
op.Tuple(),
op.Reduce(),
op.Mark(),
op.Get(0),
op.Tuple(),
op.Reduce(),
op.Stop(),
]
)
res = check_safety(pickled)
self.assertGreater(res.severity, Severity.LIKELY_SAFE)

# https://github.com/mmaitre314/picklescan/security/advisories/GHSA-m273-6v24-x4m4
def test_distutils_write_file(self):
"""Test detection of distutils.file_util.write_file bypass."""
pickled = Pickled(
[
op.Proto.create(4),
op.ShortBinUnicode("distutils.file_util"),
op.ShortBinUnicode("write_file"),
op.StackGlobal(),
op.ShortBinUnicode("/tmp/malicious.txt"),
op.Mark(),
op.ShortBinUnicode("malicious content"),
op.List(),
op.TupleTwo(),
op.Reduce(),
op.Stop(),
]
)
res = check_safety(pickled)
self.assertGreater(res.severity, Severity.LIKELY_SAFE)

def test_io_fileio(self):
"""Test detection of _io.FileIO bypass."""
pickled = Pickled(
[
op.Proto.create(4),
op.ShortBinUnicode("_io"),
op.ShortBinUnicode("FileIO"),
op.StackGlobal(),
op.ShortBinUnicode("/etc/passwd"),
op.TupleOne(),
op.Reduce(),
op.Stop(),
]
)
res = check_safety(pickled)
self.assertGreater(res.severity, Severity.LIKELY_SAFE)

# https://github.com/mmaitre314/picklescan/security/advisories/GHSA-r8g5-cgf2-4m4m
def test_numpy_f2py_getlincoef(self):
"""Test detection of numpy.f2py.crackfortran.getlincoef bypass."""
pickled = Pickled(
[
op.Proto.create(4),
op.ShortBinUnicode("numpy.f2py.crackfortran"),
op.ShortBinUnicode("getlincoef"),
op.StackGlobal(),
op.ShortBinUnicode("__import__('os').system('id')"),
op.EmptyDict(),
op.TupleTwo(),
op.Reduce(),
op.Stop(),
]
)
res = check_safety(pickled)
self.assertGreater(res.severity, Severity.LIKELY_SAFE)

# https://github.com/mmaitre314/picklescan/security/advisories/GHSA-f7qq-56ww-84cr
def test_asyncio_subprocess(self):
"""Test detection of asyncio subprocess execution bypass."""
pickled = Pickled(
[
op.Proto.create(4),
op.Frame(81),
op.ShortBinUnicode("asyncio.unix_events"),
op.Memoize(),
op.ShortBinUnicode("_UnixSubprocessTransport._start"),
op.Memoize(),
op.StackGlobal(),
op.Memoize(),
op.Mark(),
op.EmptyDict(),
op.Memoize(),
op.ShortBinUnicode("whoami"),
op.Memoize(),
op.NewTrue(),
op.NoneOpcode(),
op.NoneOpcode(),
op.NoneOpcode(),
op.BinInt1(0),
op.Tuple(),
op.Memoize(),
op.Reduce(),
op.Memoize(),
op.Stop(),
]
)
res = check_safety(pickled)
self.assertGreater(res.severity, Severity.LIKELY_SAFE)
Loading