Fail closed on dynamic node mapping extraction
This commit is contained in:
@@ -364,6 +364,98 @@ NODE_CLASS_MAPPINGS = {
|
|||||||
self.assertEqual(["MASK"], result["nodes"]["FinalReturnTypesNode"]["outputs"])
|
self.assertEqual(["MASK"], result["nodes"]["FinalReturnTypesNode"]["outputs"])
|
||||||
self.assertEqual("ok", result["pack"]["status"])
|
self.assertEqual("ok", result["pack"]["status"])
|
||||||
|
|
||||||
|
def test_dynamic_node_class_mapping_reassignment_skips_node(self):
|
||||||
|
source = '''
|
||||||
|
def build_mappings():
|
||||||
|
return {"DynamicMappingNode": DynamicMappingNode}
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicMappingNode:
|
||||||
|
RETURN_TYPES = ("IMAGE",)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def INPUT_TYPES(cls):
|
||||||
|
return {
|
||||||
|
"required": {
|
||||||
|
"image": ("IMAGE",),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
NODE_CLASS_MAPPINGS = {
|
||||||
|
"DynamicMappingNode": DynamicMappingNode,
|
||||||
|
}
|
||||||
|
NODE_CLASS_MAPPINGS = build_mappings()
|
||||||
|
'''
|
||||||
|
result = self._extract_source(source, "dynamic-mapping-pack")
|
||||||
|
|
||||||
|
self.assertEqual({}, result["nodes"])
|
||||||
|
self.assertEqual("no_static_nodes", result["pack"]["status"])
|
||||||
|
|
||||||
|
def test_dynamic_display_mapping_reassignment_falls_back_to_node_type(self):
|
||||||
|
source = '''
|
||||||
|
def build_displays():
|
||||||
|
return {"DisplayInvalidatedNode": "Dynamic Display"}
|
||||||
|
|
||||||
|
|
||||||
|
class DisplayInvalidatedNode:
|
||||||
|
RETURN_TYPES = ("IMAGE",)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def INPUT_TYPES(cls):
|
||||||
|
return {
|
||||||
|
"required": {
|
||||||
|
"image": ("IMAGE",),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
NODE_CLASS_MAPPINGS = {
|
||||||
|
"DisplayInvalidatedNode": DisplayInvalidatedNode,
|
||||||
|
}
|
||||||
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
||||||
|
"DisplayInvalidatedNode": "Stale Display",
|
||||||
|
}
|
||||||
|
NODE_DISPLAY_NAME_MAPPINGS = build_displays()
|
||||||
|
'''
|
||||||
|
result = self._extract_source(source, "dynamic-display-pack")
|
||||||
|
|
||||||
|
self.assertEqual("DisplayInvalidatedNode", result["nodes"]["DisplayInvalidatedNode"]["display"])
|
||||||
|
self.assertEqual("ok", result["pack"]["status"])
|
||||||
|
|
||||||
|
def test_input_types_with_dynamic_control_flow_is_skipped(self):
|
||||||
|
source = '''
|
||||||
|
def something():
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def dynamic_inputs():
|
||||||
|
return {"required": {"image": ("IMAGE",)}}
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicBranchInputNode:
|
||||||
|
RETURN_TYPES = ("IMAGE",)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def INPUT_TYPES(cls):
|
||||||
|
if something():
|
||||||
|
return dynamic_inputs()
|
||||||
|
return {
|
||||||
|
"required": {
|
||||||
|
"image": ("IMAGE",),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
NODE_CLASS_MAPPINGS = {
|
||||||
|
"DynamicBranchInputNode": DynamicBranchInputNode,
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
result = self._extract_source(source, "dynamic-branch-input-pack")
|
||||||
|
|
||||||
|
self.assertEqual({}, result["nodes"])
|
||||||
|
self.assertEqual("no_static_nodes", result["pack"]["status"])
|
||||||
|
|
||||||
def test_write_artifact_is_deterministic(self):
|
def test_write_artifact_is_deterministic(self):
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
out_one = Path(tmp, "one.json")
|
out_one = Path(tmp, "one.json")
|
||||||
|
|||||||
@@ -191,10 +191,10 @@ def _input_types(cls, env):
|
|||||||
for stmt in cls.body:
|
for stmt in cls.body:
|
||||||
if not isinstance(stmt, ast.FunctionDef) or stmt.name != "INPUT_TYPES":
|
if not isinstance(stmt, ast.FunctionDef) or stmt.name != "INPUT_TYPES":
|
||||||
continue
|
continue
|
||||||
for child in stmt.body:
|
if len(stmt.body) != 1 or not isinstance(stmt.body[0], ast.Return):
|
||||||
if isinstance(child, ast.Return):
|
return None
|
||||||
try:
|
try:
|
||||||
value = _literal(child.value, env)
|
value = _literal(stmt.body[0].value, env)
|
||||||
except UnsupportedStaticExpression:
|
except UnsupportedStaticExpression:
|
||||||
return None
|
return None
|
||||||
return value if isinstance(value, dict) else None
|
return value if isinstance(value, dict) else None
|
||||||
@@ -209,46 +209,69 @@ def _mapping_value_name(value):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _node_class_mappings(tree, env):
|
def _name_is_assigned(stmt, name):
|
||||||
|
return name in _assignment_target_names(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
def _module_dict_entries(node, env, value_converter):
|
||||||
|
if not isinstance(node, ast.Dict):
|
||||||
|
raise UnsupportedStaticExpression(type(node).__name__)
|
||||||
|
result = {}
|
||||||
|
for key, value in zip(node.keys, node.values):
|
||||||
|
if key is None:
|
||||||
|
raise UnsupportedStaticExpression("dict unpacking is not supported")
|
||||||
|
converted_value = value_converter(value)
|
||||||
|
if converted_value is None:
|
||||||
|
continue
|
||||||
|
result[_literal(key, env)] = converted_value
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _final_module_dict(tree, env, name, value_converter):
|
||||||
|
value = _MISSING
|
||||||
for stmt in tree.body:
|
for stmt in tree.body:
|
||||||
if not isinstance(stmt, ast.Assign):
|
if isinstance(stmt, ast.Assign):
|
||||||
|
if not _name_is_assigned(stmt, name):
|
||||||
continue
|
continue
|
||||||
if not any(
|
if len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name):
|
||||||
isinstance(target, ast.Name) and target.id == "NODE_CLASS_MAPPINGS"
|
|
||||||
for target in stmt.targets
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
if not isinstance(stmt.value, ast.Dict):
|
|
||||||
continue
|
|
||||||
mappings = {}
|
|
||||||
for key, value in zip(stmt.value.keys, stmt.value.values):
|
|
||||||
try:
|
try:
|
||||||
node_type = _literal(key, env)
|
value = _module_dict_entries(stmt.value, env, value_converter)
|
||||||
except UnsupportedStaticExpression:
|
except UnsupportedStaticExpression:
|
||||||
|
value = _INVALID
|
||||||
|
else:
|
||||||
|
value = _INVALID
|
||||||
continue
|
continue
|
||||||
class_name = _mapping_value_name(value)
|
if isinstance(stmt, ast.AnnAssign):
|
||||||
if node_type and class_name:
|
if not _name_is_assigned(stmt, name):
|
||||||
mappings[str(node_type)] = class_name
|
continue
|
||||||
return mappings
|
if isinstance(stmt.target, ast.Name) and stmt.value is not None:
|
||||||
|
try:
|
||||||
|
value = _module_dict_entries(stmt.value, env, value_converter)
|
||||||
|
except UnsupportedStaticExpression:
|
||||||
|
value = _INVALID
|
||||||
|
else:
|
||||||
|
value = _INVALID
|
||||||
|
continue
|
||||||
|
if isinstance(stmt, ast.AugAssign):
|
||||||
|
if _name_is_assigned(stmt, name):
|
||||||
|
value = _INVALID
|
||||||
|
continue
|
||||||
|
if isinstance(stmt, (ast.If, ast.For, ast.AsyncFor, ast.While, ast.Try)):
|
||||||
|
if name in _assigned_names_in_control_flow(stmt):
|
||||||
|
value = _INVALID
|
||||||
|
if value in (_MISSING, _INVALID):
|
||||||
return {}
|
return {}
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def _node_class_mappings(tree, env):
|
||||||
|
mappings = _final_module_dict(tree, env, "NODE_CLASS_MAPPINGS", _mapping_value_name)
|
||||||
|
return {str(node_type): class_name for node_type, class_name in mappings.items() if node_type and class_name}
|
||||||
|
|
||||||
|
|
||||||
def _display_mappings(tree, env):
|
def _display_mappings(tree, env):
|
||||||
for stmt in tree.body:
|
displays = _final_module_dict(tree, env, "NODE_DISPLAY_NAME_MAPPINGS", lambda value: _literal(value, env))
|
||||||
if not isinstance(stmt, ast.Assign):
|
return {str(k): str(v) for k, v in displays.items()}
|
||||||
continue
|
|
||||||
if not any(
|
|
||||||
isinstance(target, ast.Name) and target.id == "NODE_DISPLAY_NAME_MAPPINGS"
|
|
||||||
for target in stmt.targets
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
value = _literal(stmt.value, env)
|
|
||||||
except UnsupportedStaticExpression:
|
|
||||||
return {}
|
|
||||||
if isinstance(value, dict):
|
|
||||||
return {str(k): str(v) for k, v in value.items()}
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def _signature_from_class(node_type, cls, display, pack_meta, env):
|
def _signature_from_class(node_type, cls, display, pack_meta, env):
|
||||||
|
|||||||
Reference in New Issue
Block a user