Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion fickling/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,12 @@ def analyze(self, context: AnalysisContext) -> Iterator[AnalysisResult]:
# NOTE(boyan): Special case with eval?
# Copy pasted from pickled.unsafe_imports() original implementation
elif "eval" in (n.name for n in node.names):
yield node
yield AnalysisResult(
Severity.LIKELY_OVERTLY_MALICIOUS,
f"`{shortened}` imports `eval` which can execute arbitrary code",
"UnsafeImportsML",
trigger=shortened,
)


class BadCalls(Analysis):
Expand Down
67 changes: 33 additions & 34 deletions fickling/fickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,30 @@
OPCODES_BY_NAME: dict[str, type[Opcode]] = {}
OPCODE_INFO_BY_NAME: dict[str, OpcodeInfo] = {opcode.name: opcode for opcode in opcodes}

UNSAFE_IMPORTS: frozenset[str] = frozenset(
[
"__builtin__",
"__builtins__",
"builtins",
"os",
"posix",
"nt",
"subprocess",
"sys",
"socket",
"pty",
"marshal",
"types",
"runpy",
"cProfile",
"ctypes",
"pydoc",
"importlib",
"code",
"multiprocessing",
]
)


def is_std_module(module_name: str) -> bool:
return in_stdlib(module_name) or module_name in BUILTIN_MODULE_NAMES
Expand Down Expand Up @@ -864,20 +888,8 @@ def is_likely_safe(self):

def unsafe_imports(self) -> Iterator[ast.Import | ast.ImportFrom]:
for node in self.properties.imports:
if node.module in (
"__builtin__",
"__builtins__",
"builtins",
"os",
"posix",
"nt",
"subprocess",
"sys",
"builtins",
"socket",
"pty",
"marshal",
"types",
if node.module and any(
component in UNSAFE_IMPORTS for component in node.module.split(".")
):
yield node
elif "eval" in (n.name for n in node.names):
Expand Down Expand Up @@ -1103,12 +1115,8 @@ def attr(self) -> str:

def run(self, interpreter: Interpreter):
module, attr = self.module, self.attr
if module in ("__builtin__", "__builtins__", "builtins"):
# no need to emit an import for builtins!
pass
else:
alias = ast.alias(attr)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
alias = ast.alias(attr)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
interpreter.stack.append(ast.Name(attr, ast.Load()))

def encode(self) -> bytes:
Expand Down Expand Up @@ -1153,19 +1161,14 @@ def run(self, interpreter: Interpreter):
f"Module: {type(module).__name__}, Attr: {type(attr).__name__}"
)

if not module.isidentifier() or not attr.isidentifier():
if not all(m.isidentifier() for m in module.split(".")) or not attr.isidentifier():
raise ValueError(
f"Extracted identifiers are not valid Python identifiers. "
f"Module: {module!r}, Attr: {attr!r}"
)

# Continue with normal processing
if module in ("__builtin__", "__builtins__", "builtins"):
# no need to emit an import for builtins!
pass
else:
alias = ast.alias(attr)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
alias = ast.alias(attr)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
interpreter.stack.append(ast.Name(attr, ast.Load()))


Expand All @@ -1187,12 +1190,8 @@ def cls(self) -> str:

def run(self, interpreter: Interpreter, stack_slice: List[ast.expr]):
module, classname = self.module, self.cls
if module in ("__builtin__", "__builtins__", "builtins"):
# no need to emit an import for builtins!
pass
else:
alias = ast.alias(classname)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
alias = ast.alias(classname)
interpreter.module_body.append(ast.ImportFrom(module=module, names=[alias], level=0))
args = ast.Tuple(tuple(stack_slice))
call = ast.Call(ast.Name(classname, ast.Load()), list(args.elts), [])
var_name = interpreter.new_variable(call)
Expand Down
Loading
Loading