diff --git a/fickling/analysis.py b/fickling/analysis.py index 2155100..55327ed 100644 --- a/fickling/analysis.py +++ b/fickling/analysis.py @@ -243,19 +243,14 @@ class UnsafeImportsML(Analysis): "dill": "This module can load and execute arbitrary code.", "code": "This module can compile and execute arbitrary code.", "pty": "This module contains functions that can perform system operations and execute arbitrary code.", + "pickle": "This module can deserialize and execute arbitrary code through nested unpickling.", + "_pickle": "This module can deserialize and execute arbitrary code through nested unpickling.", } UNSAFE_IMPORTS = { "torch": { "load": "This function can load untrusted files and code from arbitrary web sources." }, - "numpy.testing._private.utils": {"runstring": "This function can execute arbitrary code."}, - "operator": { - "getitem": "This function can lead to arbitrary code execution", - "attrgetter": "This function can lead to arbitrary code execution", - "itemgetter": "This function can lead to arbitrary code execution", - "methodcaller": "This function can lead to arbitrary code execution", - }, "torch.storage": { "_load_from_bytes": "This function calls `torch.load()` which is unsafe as using a string argument would " "allow to load and execute arbitrary code hosted on the internet. However, in this case, the " @@ -264,6 +259,13 @@ class UnsafeImportsML(Analysis): "underlying `torch.load()` call to unpickle that bytestring and execute arbitrary code through nested pickle calls. " "So this import is safe only if restrictions on pickle (such as Fickling's hooks) have been set properly", }, + "numpy.testing._private.utils": {"runstring": "This function can execute arbitrary code."}, + "numpy.f2py.crackfortran": { + "getlincoef": "This function can execute arbitrary code.", + "_eval_length": "This function can execute arbitrary code.", + }, + "_io": {"FileIO": "This class can read/write arbitrary files."}, + "io": {"FileIO": "This class can read/write arbitrary files."}, } def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]: diff --git a/fickling/fickle.py b/fickling/fickle.py index 65cbef0..ba792d5 100644 --- a/fickling/fickle.py +++ b/fickling/fickle.py @@ -40,6 +40,7 @@ UNSAFE_IMPORTS: frozenset[str] = frozenset( [ + # Core builtins and system modules "__builtin__", "__builtins__", "builtins", @@ -59,6 +60,39 @@ "importlib", "code", "multiprocessing", + # File and shell operations + "shutil", + "distutils", + "commands", + # Operator module bypasses + "_operator", + "operator", + "functools", + # Async subprocess execution + "asyncio", + # Code execution via profilers/debuggers + "profile", + "trace", + "pdb", + "bdb", + "timeit", + "doctest", + # Package and environment manipulation + "venv", + "pip", + "ensurepip", + # Network and web modules + "webbrowser", + "aiohttp", + "httplib", + "http", + "ssl", + "requests", + "urllib", + "urllib2", + # IDE and dev tools + "idlelib", + "lib2to3", ] ) @@ -1267,7 +1301,9 @@ def run(self, interpreter: Interpreter): f"Module: {type(module).__name__}, Attr: {type(attr).__name__}" ) - if not all(m.isidentifier() for m in module.split(".")) or not attr.isidentifier(): + if not all(m.isidentifier() for m in module.split(".")) or not all( + a.isidentifier() for a in attr.split(".") + ): raise ValueError( f"Extracted identifiers are not valid Python identifiers. " f"Module: {module!r}, Attr: {attr!r}" diff --git a/test/test_bypasses.py b/test/test_bypasses.py index e42c32e..68613b3 100644 --- a/test/test_bypasses.py +++ b/test/test_bypasses.py @@ -445,3 +445,121 @@ def test_unsafe_builtin_eval_still_flagged(self): detailed = res.detailed_results().get("AnalysisResult", {}) self.assertIsNotNone(detailed.get("UnsafeImports")) self.assertIsNotNone(detailed.get("UnsafeImportsML")) + + # https://github.com/mmaitre314/picklescan/security/advisories/GHSA-955r-x9j8-7rhh + def test_operator_methodcaller(self): + """Test detection of _operator.methodcaller bypass.""" + pickled = Pickled( + [ + op.Global.create("builtins", "__import__"), + op.Mark(), + op.Unicode("os"), + op.Tuple(), + op.Reduce(), + op.Put(0), + op.Pop(), + op.Global.create("_operator", "methodcaller"), + op.Mark(), + op.Unicode("system"), + op.Unicode('echo "pwned by _operator.methodcaller"'), + op.Tuple(), + op.Reduce(), + op.Mark(), + op.Get(0), + op.Tuple(), + op.Reduce(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + + # https://github.com/mmaitre314/picklescan/security/advisories/GHSA-m273-6v24-x4m4 + def test_distutils_write_file(self): + """Test detection of distutils.file_util.write_file bypass.""" + pickled = Pickled( + [ + op.Proto.create(4), + op.ShortBinUnicode("distutils.file_util"), + op.ShortBinUnicode("write_file"), + op.StackGlobal(), + op.ShortBinUnicode("/tmp/malicious.txt"), + op.Mark(), + op.ShortBinUnicode("malicious content"), + op.List(), + op.TupleTwo(), + op.Reduce(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + + def test_io_fileio(self): + """Test detection of _io.FileIO bypass.""" + pickled = Pickled( + [ + op.Proto.create(4), + op.ShortBinUnicode("_io"), + op.ShortBinUnicode("FileIO"), + op.StackGlobal(), + op.ShortBinUnicode("/etc/passwd"), + op.TupleOne(), + op.Reduce(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + + # https://github.com/mmaitre314/picklescan/security/advisories/GHSA-r8g5-cgf2-4m4m + def test_numpy_f2py_getlincoef(self): + """Test detection of numpy.f2py.crackfortran.getlincoef bypass.""" + pickled = Pickled( + [ + op.Proto.create(4), + op.ShortBinUnicode("numpy.f2py.crackfortran"), + op.ShortBinUnicode("getlincoef"), + op.StackGlobal(), + op.ShortBinUnicode("__import__('os').system('id')"), + op.EmptyDict(), + op.TupleTwo(), + op.Reduce(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + + # https://github.com/mmaitre314/picklescan/security/advisories/GHSA-f7qq-56ww-84cr + def test_asyncio_subprocess(self): + """Test detection of asyncio subprocess execution bypass.""" + pickled = Pickled( + [ + op.Proto.create(4), + op.Frame(81), + op.ShortBinUnicode("asyncio.unix_events"), + op.Memoize(), + op.ShortBinUnicode("_UnixSubprocessTransport._start"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.Mark(), + op.EmptyDict(), + op.Memoize(), + op.ShortBinUnicode("whoami"), + op.Memoize(), + op.NewTrue(), + op.NoneOpcode(), + op.NoneOpcode(), + op.NoneOpcode(), + op.BinInt1(0), + op.Tuple(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE)