Fail closed on dynamic static extraction inputs
This commit is contained in:
@@ -16,6 +16,10 @@ class UnsupportedStaticExpression(Exception):
|
||||
pass
|
||||
|
||||
|
||||
_MISSING = object()
|
||||
_INVALID = object()
|
||||
|
||||
|
||||
def _literal(node, env):
|
||||
if isinstance(node, ast.Constant):
|
||||
return node.value
|
||||
@@ -35,19 +39,104 @@ def _literal(node, env):
|
||||
raise UnsupportedStaticExpression(type(node).__name__)
|
||||
|
||||
|
||||
def _target_names(target):
|
||||
if isinstance(target, ast.Name):
|
||||
return {target.id}
|
||||
if isinstance(target, (ast.List, ast.Tuple)):
|
||||
names = set()
|
||||
for item in target.elts:
|
||||
names.update(_target_names(item))
|
||||
return names
|
||||
if isinstance(target, ast.Starred):
|
||||
return _target_names(target.value)
|
||||
if isinstance(target, (ast.Attribute, ast.Subscript)):
|
||||
return _target_names(target.value)
|
||||
return set()
|
||||
|
||||
|
||||
def _assignment_target_names(stmt):
|
||||
if isinstance(stmt, ast.Assign):
|
||||
names = set()
|
||||
for target in stmt.targets:
|
||||
names.update(_target_names(target))
|
||||
return names
|
||||
if isinstance(stmt, (ast.AnnAssign, ast.AugAssign)):
|
||||
return _target_names(stmt.target)
|
||||
if isinstance(stmt, (ast.For, ast.AsyncFor)):
|
||||
return _target_names(stmt.target)
|
||||
return set()
|
||||
|
||||
|
||||
def _assigned_names_in_control_flow(stmt):
|
||||
names = set()
|
||||
|
||||
class AssignmentVisitor(ast.NodeVisitor):
|
||||
def visit_FunctionDef(self, node):
|
||||
return None
|
||||
|
||||
def visit_AsyncFunctionDef(self, node):
|
||||
return None
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
return None
|
||||
|
||||
def visit_Assign(self, node):
|
||||
names.update(_assignment_target_names(node))
|
||||
|
||||
def visit_AnnAssign(self, node):
|
||||
names.update(_assignment_target_names(node))
|
||||
|
||||
def visit_AugAssign(self, node):
|
||||
names.update(_assignment_target_names(node))
|
||||
|
||||
def visit_For(self, node):
|
||||
names.update(_assignment_target_names(node))
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_AsyncFor(self, node):
|
||||
names.update(_assignment_target_names(node))
|
||||
self.generic_visit(node)
|
||||
|
||||
AssignmentVisitor().visit(stmt)
|
||||
return names
|
||||
|
||||
|
||||
def _collect_module_env(tree):
|
||||
env = {}
|
||||
for stmt in tree.body:
|
||||
if not isinstance(stmt, ast.Assign):
|
||||
if isinstance(stmt, ast.Assign):
|
||||
names = _assignment_target_names(stmt)
|
||||
if len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name):
|
||||
name = stmt.targets[0].id
|
||||
try:
|
||||
env[name] = _literal(stmt.value, env)
|
||||
except UnsupportedStaticExpression:
|
||||
env.pop(name, None)
|
||||
else:
|
||||
for name in names:
|
||||
env.pop(name, None)
|
||||
continue
|
||||
if len(stmt.targets) != 1 or not isinstance(stmt.targets[0], ast.Name):
|
||||
if isinstance(stmt, ast.AnnAssign):
|
||||
names = _assignment_target_names(stmt)
|
||||
if stmt.value is None:
|
||||
continue
|
||||
if isinstance(stmt.target, ast.Name):
|
||||
name = stmt.target.id
|
||||
try:
|
||||
env[name] = _literal(stmt.value, env)
|
||||
except UnsupportedStaticExpression:
|
||||
env.pop(name, None)
|
||||
else:
|
||||
for name in names:
|
||||
env.pop(name, None)
|
||||
continue
|
||||
name = stmt.targets[0].id
|
||||
try:
|
||||
env[name] = _literal(stmt.value, env)
|
||||
except UnsupportedStaticExpression:
|
||||
env.pop(name, None)
|
||||
if isinstance(stmt, ast.AugAssign):
|
||||
for name in _assignment_target_names(stmt):
|
||||
env.pop(name, None)
|
||||
continue
|
||||
if isinstance(stmt, (ast.If, ast.For, ast.AsyncFor, ast.While, ast.Try)):
|
||||
for name in _assigned_names_in_control_flow(stmt):
|
||||
env.pop(name, None)
|
||||
return env
|
||||
|
||||
|
||||
@@ -63,16 +152,39 @@ def _class_defs(tree):
|
||||
|
||||
|
||||
def _class_attr(cls, name, env):
|
||||
value = _MISSING
|
||||
for stmt in cls.body:
|
||||
if not isinstance(stmt, ast.Assign):
|
||||
continue
|
||||
for target in stmt.targets:
|
||||
if isinstance(target, ast.Name) and target.id == name:
|
||||
if isinstance(stmt, ast.Assign):
|
||||
if not any(isinstance(target, ast.Name) and target.id == name for target in stmt.targets):
|
||||
continue
|
||||
if len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name):
|
||||
try:
|
||||
return _literal(stmt.value, env)
|
||||
value = _literal(stmt.value, env)
|
||||
except UnsupportedStaticExpression:
|
||||
return None
|
||||
return None
|
||||
value = _INVALID
|
||||
else:
|
||||
value = _INVALID
|
||||
continue
|
||||
if isinstance(stmt, ast.AnnAssign):
|
||||
if not isinstance(stmt.target, ast.Name) or stmt.target.id != name:
|
||||
continue
|
||||
if stmt.value is None:
|
||||
continue
|
||||
try:
|
||||
value = _literal(stmt.value, env)
|
||||
except UnsupportedStaticExpression:
|
||||
value = _INVALID
|
||||
continue
|
||||
if isinstance(stmt, ast.AugAssign):
|
||||
if isinstance(stmt.target, ast.Name) and stmt.target.id == name:
|
||||
value = _INVALID
|
||||
continue
|
||||
if isinstance(stmt, (ast.If, ast.For, ast.AsyncFor, ast.While, ast.Try)):
|
||||
if name in _assigned_names_in_control_flow(stmt):
|
||||
value = _INVALID
|
||||
if value in (_MISSING, _INVALID):
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
def _input_types(cls, env):
|
||||
|
||||
Reference in New Issue
Block a user