diff --git a/fickling/analysis.py b/fickling/analysis.py index f2c0764..fe6454c 100644 --- a/fickling/analysis.py +++ b/fickling/analysis.py @@ -275,7 +275,12 @@ def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]: # NOTE(boyan): Special case with eval? # Copy pasted from pickled.unsafe_imports() original implementation elif "eval" in (n.name for n in node.names): - yield node + yield AnalysisResult( + Severity.LIKELY_OVERTLY_MALICIOUS, + f"`{shortened}` imports `eval` which can execute arbitrary code", + "UnsafeImportsML", + trigger=shortened, + ) class BadCalls(Analysis): diff --git a/fickling/fickle.py b/fickling/fickle.py index eb82c09..8520481 100644 --- a/fickling/fickle.py +++ b/fickling/fickle.py @@ -40,6 +40,30 @@ OPCODES_BY_NAME: dict[str, type[Opcode]] = {} OPCODE_INFO_BY_NAME: dict[str, OpcodeInfo] = {opcode.name: opcode for opcode in opcodes} +UNSAFE_IMPORTS: frozenset[str] = frozenset( + [ + "__builtin__", + "__builtins__", + "builtins", + "os", + "posix", + "nt", + "subprocess", + "sys", + "socket", + "pty", + "marshal", + "types", + "runpy", + "cProfile", + "ctypes", + "pydoc", + "importlib", + "code", + "multiprocessing", + ] +) + def is_std_module(module_name: str) -> bool: return in_stdlib(module_name) or module_name in BUILTIN_MODULE_NAMES @@ -864,20 +888,8 @@ def is_likely_safe(self): def unsafe_imports(self) -> Iterator[ast.Import | ast.ImportFrom]: for node in self.properties.imports: - if node.module in ( - "__builtin__", - "__builtins__", - "builtins", - "os", - "posix", - "nt", - "subprocess", - "sys", - "builtins", - "socket", - "pty", - "marshal", - "types", + if node.module and any( + component in UNSAFE_IMPORTS for component in node.module.split(".") ): yield node elif "eval" in (n.name for n in node.names): @@ -1103,12 +1115,8 @@ def attr(self) -> str: def run(self, interpreter: Interpreter): module, attr = self.module, self.attr - if module in ("__builtin__", "__builtins__", "builtins"): - # no need to emit an import for builtins! - pass - else: - alias = ast.alias(attr) - interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) + alias = ast.alias(attr) + interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) interpreter.stack.append(ast.Name(attr, ast.Load())) def encode(self) -> bytes: @@ -1153,19 +1161,14 @@ def run(self, interpreter: Interpreter): f"Module: {type(module).__name__}, Attr: {type(attr).__name__}" ) - if not module.isidentifier() or not attr.isidentifier(): + if not all(m.isidentifier() for m in module.split(".")) or not attr.isidentifier(): raise ValueError( f"Extracted identifiers are not valid Python identifiers. " f"Module: {module!r}, Attr: {attr!r}" ) - # Continue with normal processing - if module in ("__builtin__", "__builtins__", "builtins"): - # no need to emit an import for builtins! - pass - else: - alias = ast.alias(attr) - interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) + alias = ast.alias(attr) + interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) interpreter.stack.append(ast.Name(attr, ast.Load())) @@ -1187,12 +1190,8 @@ def cls(self) -> str: def run(self, interpreter: Interpreter, stack_slice: List[ast.expr]): module, classname = self.module, self.cls - if module in ("__builtin__", "__builtins__", "builtins"): - # no need to emit an import for builtins! - pass - else: - alias = ast.alias(classname) - interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) + alias = ast.alias(classname) + interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0)) args = ast.Tuple(tuple(stack_slice)) call = ast.Call(ast.Name(classname, ast.Load()), list(args.elts), []) var_name = interpreter.new_variable(call) diff --git a/test/test_bypasses.py b/test/test_bypasses.py index 3663dc5..f92b4c0 100644 --- a/test/test_bypasses.py +++ b/test/test_bypasses.py @@ -83,3 +83,296 @@ def test_missing_marshal_and_types(self): ) self.assertGreater(check_safety(opcodes).severity, Severity.LIKELY_SAFE) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-wfq2-52f7-7qvj + # https://github.com/trailofbits/fickling/security/advisories/GHSA-q5qq-mvfm-j35x + def test_missing_runpy(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.Frame(46), + op.ShortBinUnicode("runpy"), + op.Memoize(), + op.ShortBinUnicode("run_path"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("/tmp/malicious.py"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from runpy import run_path", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-p523-jq9w-64x9 + def test_missing_cprofile(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.Frame(58), + op.ShortBinUnicode("cProfile"), + op.Memoize(), + op.ShortBinUnicode("run"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("print('CPROFILE_RCE_CONFIRMED')"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from cProfile import run", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-q5qq-mvfm-j35x + # https://github.com/trailofbits/fickling/security/advisories/GHSA-5hvc-6wx8-mvv4 + def test_missing_ctypes(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.ShortBinUnicode("builtins"), + op.Memoize(), + op.ShortBinUnicode("getattr"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("ctypes"), + op.Memoize(), + op.ShortBinUnicode("CDLL"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("libc.dylib"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode("system"), + op.Memoize(), + op.TupleTwo(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinBytes(b"id"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from ctypes import CDLL", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-5hvc-6wx8-mvv4 + def test_missing_pydoc(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.ShortBinUnicode("pydoc"), + op.Memoize(), + op.ShortBinUnicode("locate"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("os.system"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode("id"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from pydoc import locate", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-q5qq-mvfm-j35x + def test_missing_importlib(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.ShortBinUnicode("builtins"), + op.Memoize(), + op.ShortBinUnicode("getattr"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("importlib"), + op.Memoize(), + op.ShortBinUnicode("import_module"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("os"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode("system"), + op.Memoize(), + op.TupleTwo(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode("id"), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from importlib import import_module", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-q5qq-mvfm-j35x + def test_missing_code(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.ShortBinUnicode("builtins"), + op.Memoize(), + op.ShortBinUnicode("getattr"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinUnicode("code"), + op.Memoize(), + op.ShortBinUnicode("InteractiveInterpreter"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.EmptyTuple(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode("runsource"), + op.Memoize(), + op.TupleTwo(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.ShortBinUnicode('import os; os.system("id")'), + op.Memoize(), + op.TupleOne(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from code import InteractiveInterpreter", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-q5qq-mvfm-j35x + def test_missing_multiprocessing(self): + pickled = Pickled( + [ + op.Proto.create(5), + op.Frame(74), + op.ShortBinUnicode("multiprocessing.util"), + op.Memoize(), + op.ShortBinUnicode("spawnv_passfds"), + op.Memoize(), + op.StackGlobal(), + op.Memoize(), + op.ShortBinBytes(b"/bin/sh"), + op.Memoize(), + op.EmptyList(), + op.Memoize(), + op.Mark(), + op.BinGet(3), + op.ShortBinBytes(b"-c"), + op.Memoize(), + op.ShortBinBytes(b"id"), + op.Memoize(), + op.Appends(), + op.EmptyTuple(), + op.TupleThree(), + op.Memoize(), + op.Reduce(), + op.Memoize(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from multiprocessing.util import spawnv_passfds", + ) + + # https://github.com/trailofbits/fickling/security/advisories/GHSA-h4rm-mm56-xf63 + def test_builtins_import_bypass(self): + pickled = Pickled( + [ + op.Global("builtins __import__"), + op.String("os"), + op.TupleOne(), + op.Reduce(), + op.Put(0), + op.Pop(), + op.Global("builtins getattr"), + op.Get(0), + op.String("system"), + op.TupleTwo(), + op.Reduce(), + op.Put(1), + op.Pop(), + op.Get(1), + op.String("whoami"), + op.TupleOne(), + op.Reduce(), + op.Stop(), + ] + ) + res = check_safety(pickled) + self.assertGreater(res.severity, Severity.LIKELY_SAFE) + self.assertEqual( + res.detailed_results()["AnalysisResult"].get("UnsafeImports"), + "from builtins import getattr", + ) diff --git a/test/test_pickle.py b/test/test_pickle.py index e71785a..beaefb4 100644 --- a/test/test_pickle.py +++ b/test/test_pickle.py @@ -87,7 +87,7 @@ def test_bytes(self): def test_call(self): pickled = Pickled( [ - fpickle.Global.create("__builtins__", "eval"), + fpickle.Global.create("builtins", "eval"), fpickle.Mark(), fpickle.Unicode("(lambda:1234)()"), fpickle.Tuple(), @@ -102,7 +102,7 @@ def test_inst(self): [ fpickle.Mark(), fpickle.Unicode("1234"), - fpickle.Inst.create("__builtins__", "int"), + fpickle.Inst.create("builtins", "int"), fpickle.Stop(), ] )