e4dfaac63b
The 4B over-uses 'partial' (mislabels identical ref/gen and clear opposites) and also mis-identifies fine-grained content (e.g. names a position 'doggy'/'cowgirl' when it is neither). Deterministic fix: force verdict=match when normalized ref==gen. Prompt hardened to not default to 'partial' (opposites=mismatch). Docs: the 4B is only reliable for coarse attributes — use the 30B for fine-grained recognition; prefer grounded geometry axes over named-position labels. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
729 lines
33 KiB
Python
729 lines
33 KiB
Python
"""
|
|
Qwen3-VL Image-Similarity Judge node for ComfyUI.
|
|
|
|
The "vllm node" of the Prompt Calibrator. It takes a REFERENCE image and a
|
|
GENERATED image and asks a local Qwen3-VL model how close the generated image is
|
|
to the reference, returning a machine-readable score + per-axis difference
|
|
analysis that the calibration controller can act on.
|
|
|
|
Reuses the standard transformers Qwen3-VL plumbing (the same approach used by
|
|
ComfyUI-QwenVL-MultiImage / ComfyUI_Qwen3-VL-Instruct), but forces strict JSON
|
|
output so the result is usable by an automated loop rather than a human reader.
|
|
|
|
Default model is the locally converted huihui-ai Qwen3-VL-4B-Instruct
|
|
*abliterated* (uncensored) weights, which do not refuse to analyze adult imagery.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
import numpy as np
|
|
import torch
|
|
from PIL import Image
|
|
|
|
# Default to the model already converted on this machine (works out of the box).
|
|
DEFAULT_MODEL_PATH = "/media/p5/qwen3vl_4b_abliterated_comfy_convert/hf_bf16"
|
|
DEFAULT_MODEL_PATH_FP8 = "/media/p5/qwen3vl_4b_abliterated_comfy_convert/hf_fp8"
|
|
|
|
# Recommended abliterated upgrades for the RTX 5090 32 GB (latest Qwen VL family).
|
|
# Download with: hf download <repo> --local-dir <dir>, then point model_path at it.
|
|
RECOMMENDED_MODELS = {
|
|
# Best judge that fits 32 GB. MoE (3B active -> fast). Use precision="nf4"
|
|
# (~18 GB) on 32 GB, or the GGUF quants via a GGUF node. transformers class:
|
|
# Qwen3VLMoeForConditionalGeneration (auto-detected below).
|
|
"30b-a3b": "huihui-ai/Huihui-Qwen3-VL-30B-A3B-Instruct-abliterated",
|
|
# Easy middle ground: bf16 ~17 GB, no quantization hassle, drop-in here.
|
|
"8b": "huihui-ai/Huihui-Qwen3-VL-8B-Instruct-abliterated",
|
|
# Lightweight, already local.
|
|
"4b": "huihui-ai/Huihui-Qwen3-VL-4B-Instruct-abliterated",
|
|
}
|
|
|
|
# Difference axes + a one-line definition each. Definitions are injected into the
|
|
# prompt so the model fills the right axis (e.g. gender_mix = a count, not a position)
|
|
# and the action/pose cluster is captured in detail. Fully configurable on the node;
|
|
# any axis not in this map is still allowed (shown to the model by name only).
|
|
AXIS_DEFS = {
|
|
# identity / cast
|
|
"subject_count": "how many people are present (a count)",
|
|
"gender_mix": "composition BY GENDER as a count, e.g. '1 female, 1 male' (NOT positions)",
|
|
"age_appearance": "apparent age range of each subject",
|
|
"ethnicity_skin": "ethnicity and skin tone",
|
|
# body
|
|
"body_type": "overall physique / build (slim, curvy, athletic, BBW...)",
|
|
"breast_size": "breast size and shape of female subject(s)",
|
|
"distinctive_features": "tattoos, piercings, nail polish, scars — identity anchors",
|
|
"hair": "hair length, color, texture, and style",
|
|
# wardrobe
|
|
"clothing_state": "degree of undress and any garments / lingerie / accessories",
|
|
# action & pose cluster (the crux for explicit content — be specific)
|
|
"sexual_act": "type of activity: vaginal, anal, oral/blowjob, handjob, fingering, none...",
|
|
"position_name": "the named sex position if identifiable (doggy, missionary, cowgirl/reverse, spooning, 69...)",
|
|
"body_orientation": "how bodies are oriented: who is on top/bottom/side, facing each other or from behind",
|
|
"limb_arrangement": "placement of legs and arms (spread, bent, raised, over shoulder, kneeling) and hand placement",
|
|
"penetration": "penetration type, depth (shallow/full), angle, and how visible it is",
|
|
"contact_points": "where bodies touch: grip/hands location, mouth, points of contact",
|
|
"genital_visibility": "which genitals are visible and how explicitly the frame shows them",
|
|
"pose": "overall body posture not covered above (torso/head lean, arch, twist)",
|
|
# affect
|
|
"facial_expression": "facial expression / affect (eyes, mouth, brow)",
|
|
"gaze": "gaze direction / eye contact (at camera, partner, away, eyes closed)",
|
|
# camera
|
|
"framing": "shot type and crop (close-up, medium, full body) and what the frame centers on",
|
|
"camera_angle": "camera angle / POV (low, high, eye-level, POV/first-person)",
|
|
# render
|
|
"scene": "location, furniture, props, background",
|
|
"lighting_color": "lighting quality and color palette / grade",
|
|
"art_style": "rendering style and realism (photoreal, anime, illustration, 3D)",
|
|
}
|
|
DEFAULT_AXES = ", ".join(AXIS_DEFS)
|
|
|
|
# Cache loaded (model, processor) keyed by (path, precision) so the loop does not
|
|
# reload weights every iteration.
|
|
_MODEL_CACHE: dict[tuple[str, str], tuple] = {}
|
|
|
|
|
|
def _looks_like_repo_id(s: str) -> bool:
|
|
"""'org/name' HF repo id, not an absolute/local filesystem path."""
|
|
return ("/" in s) and (" " not in s) and (not os.path.isabs(s)) and (not s.startswith("."))
|
|
|
|
|
|
def _download_target_dir(repo_id: str) -> str:
|
|
"""Where to put downloaded weights — prefer ComfyUI's models/prompt_generator/."""
|
|
name = repo_id.split("/")[-1]
|
|
try:
|
|
import folder_paths # available when running inside ComfyUI
|
|
base = os.path.join(folder_paths.models_dir, "prompt_generator")
|
|
except Exception:
|
|
base = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models")
|
|
return os.path.join(base, name)
|
|
|
|
|
|
def _resolve_model_source(model_path: str, auto_download: bool) -> str:
|
|
"""Turn model_path (local dir | short alias | HF repo id) into a local dir.
|
|
|
|
Downloads from the Hub on first use if needed (and auto_download is on).
|
|
"""
|
|
# Short alias -> full repo id (e.g. "30b-a3b", "8b", "4b").
|
|
if model_path in RECOMMENDED_MODELS:
|
|
model_path = RECOMMENDED_MODELS[model_path]
|
|
|
|
if os.path.isdir(model_path):
|
|
return model_path
|
|
|
|
if _looks_like_repo_id(model_path):
|
|
target = _download_target_dir(model_path)
|
|
# Already downloaded? (a config.json is enough to trust the local copy)
|
|
if os.path.isfile(os.path.join(target, "config.json")):
|
|
return target
|
|
if not auto_download:
|
|
raise FileNotFoundError(
|
|
f"[QwenVLImageJudge] '{model_path}' is not downloaded and auto_download is off. "
|
|
f"Enable auto_download or pre-fetch it to {target}.")
|
|
from huggingface_hub import snapshot_download
|
|
print(f"[QwenVLImageJudge] downloading {model_path} -> {target} (first run only, may be large)...")
|
|
local = snapshot_download(
|
|
repo_id=model_path,
|
|
local_dir=target,
|
|
# weights + processor/tokenizer/config/template; skip duplicate GGUF/onnx blobs.
|
|
allow_patterns=["*.json", "*.jinja", "*.safetensors", "*.txt", "*.model", "merges.txt", "*.py"],
|
|
)
|
|
print(f"[QwenVLImageJudge] download complete: {local}")
|
|
return local
|
|
|
|
# A local path that simply doesn't exist.
|
|
raise FileNotFoundError(
|
|
f"[QwenVLImageJudge] model_path not found: {model_path}. "
|
|
f"Use a local checkpoint dir, a HF repo id (org/name), or an alias "
|
|
f"({', '.join(RECOMMENDED_MODELS)}).")
|
|
|
|
|
|
def _tensor_to_pil(image: "torch.Tensor") -> Image.Image:
|
|
"""ComfyUI IMAGE tensor (B,H,W,C float 0..1) -> first-frame PIL.Image (RGB)."""
|
|
if image is None:
|
|
raise ValueError("Judge node received an empty image input.")
|
|
arr = image
|
|
if hasattr(arr, "detach"):
|
|
arr = arr.detach().cpu().numpy()
|
|
arr = np.asarray(arr)
|
|
if arr.ndim == 4: # batch -> take first frame
|
|
arr = arr[0]
|
|
arr = np.clip(arr * 255.0, 0, 255).astype(np.uint8)
|
|
if arr.ndim == 2:
|
|
arr = np.stack([arr] * 3, axis=-1)
|
|
if arr.shape[-1] == 4: # drop alpha
|
|
arr = arr[..., :3]
|
|
return Image.fromarray(arr, mode="RGB")
|
|
|
|
|
|
def _resolve_vl_class(model_path: str):
|
|
"""Pick the right transformers class. AutoModelForImageTextToText reads the
|
|
checkpoint's `architectures` and instantiates the correct dense
|
|
(Qwen3VLForConditionalGeneration) or MoE (Qwen3VLMoeForConditionalGeneration)
|
|
class automatically — so 4B/8B *and* 30B-A3B all work without branching."""
|
|
try:
|
|
from transformers import AutoModelForImageTextToText as _Auto
|
|
return _Auto
|
|
except ImportError: # pragma: no cover - older transformers
|
|
name = model_path.lower()
|
|
is_moe = any(t in name for t in ("a3b", "moe", "30b", "235b"))
|
|
if is_moe:
|
|
from transformers import Qwen3VLMoeForConditionalGeneration as _C
|
|
else:
|
|
from transformers import Qwen3VLForConditionalGeneration as _C
|
|
return _C
|
|
|
|
|
|
def _load_model(model_path: str, precision: str):
|
|
key = (model_path, precision)
|
|
if key in _MODEL_CACHE:
|
|
return _MODEL_CACHE[key]
|
|
|
|
# Imported lazily so the node can be registered even if transformers is old.
|
|
from transformers import AutoProcessor
|
|
|
|
_VLModel = _resolve_vl_class(model_path)
|
|
load_kwargs = dict(device_map="auto", trust_remote_code=True, low_cpu_mem_usage=True)
|
|
|
|
if precision == "nf4":
|
|
# 4-bit (bitsandbytes) — lets the 30B-A3B abliterated MoE fit in ~18 GB on 32 GB.
|
|
from transformers import BitsAndBytesConfig
|
|
load_kwargs["quantization_config"] = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.bfloat16,
|
|
bnb_4bit_use_double_quant=True,
|
|
)
|
|
elif precision == "fp8":
|
|
# Pre-quantized FP8 weights: let the checkpoint dictate dtype.
|
|
pass
|
|
else:
|
|
load_kwargs["dtype"] = torch.bfloat16 if precision == "bf16" else torch.float16
|
|
|
|
model = _VLModel.from_pretrained(model_path, **load_kwargs)
|
|
model.eval()
|
|
processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)
|
|
_ensure_chat_template(processor, model_path)
|
|
_MODEL_CACHE[key] = (model, processor)
|
|
return model, processor
|
|
|
|
|
|
def _ensure_chat_template(processor, model_path: str):
|
|
"""Some ComfyUI-converted checkpoints ship the template as chat_template.jinja
|
|
(or only on the tokenizer), which AutoProcessor doesn't always pick up. Backfill
|
|
processor.chat_template from those sources so apply_chat_template works."""
|
|
if getattr(processor, "chat_template", None):
|
|
return
|
|
for fn in ("chat_template.jinja", "chat_template.json"):
|
|
fp = os.path.join(model_path, fn)
|
|
if os.path.isfile(fp):
|
|
try:
|
|
with open(fp, "r", encoding="utf-8") as f:
|
|
raw = f.read()
|
|
processor.chat_template = json.loads(raw).get("chat_template") if fn.endswith(".json") else raw
|
|
if processor.chat_template:
|
|
return
|
|
except (OSError, ValueError):
|
|
pass
|
|
tok = getattr(processor, "tokenizer", None)
|
|
if tok is not None and getattr(tok, "chat_template", None):
|
|
processor.chat_template = tok.chat_template
|
|
|
|
|
|
def _axis_definition_block(axes: list[str]) -> str:
|
|
return "\n".join(f" - {a}: {AXIS_DEFS.get(a, 'as named')}" for a in axes)
|
|
|
|
|
|
def _build_system_prompt(axes: list[str], reference_description: str = "") -> str:
|
|
axis_lines = "\n".join(
|
|
f' "{a}": {{"verdict": "match|partial|mismatch", "ref": "<ref value>", "gen": "<generated image>"}},'
|
|
for a in axes)
|
|
verdict_rule = (
|
|
" - verdict: 'match' if ref and gen are the same; 'mismatch' if they are "
|
|
"opposite or clearly different (e.g. 'on top' vs 'on bottom', 'doggy' vs "
|
|
"'cowgirl', 'short' vs 'long', 'eyes closed' vs 'at camera'); 'partial' ONLY "
|
|
"for a genuine middle ground (same category, minor difference). Do NOT default "
|
|
"to 'partial' — if the values are identical use 'match', if clearly different "
|
|
"use 'mismatch'.\n")
|
|
tail = (
|
|
"Reply with STRICT JSON only, no prose, no markdown fences, exactly:\n"
|
|
"{\n"
|
|
' "axes": {\n'
|
|
f"{axis_lines}\n"
|
|
" }\n"
|
|
"}\n")
|
|
|
|
if reference_description.strip():
|
|
# Anchored mode: the reference is a fixed canonical description (text), only the
|
|
# GENERATED image is shown. Keeps the ref side consistent across iterations.
|
|
return (
|
|
"You are a meticulous visual-similarity judge for an image-generation "
|
|
"calibration loop. You are given an AUTHORITATIVE REFERENCE description "
|
|
"(text — the target) and ONE GENERATED image. For every axis report:\n"
|
|
" - ref: the reference value taken FROM THE DESCRIPTION BELOW (quote it; do not invent)\n"
|
|
" - gen: concretely what the GENERATED image shows for this axis\n"
|
|
+ verdict_rule +
|
|
"Describe ONLY what you observe in the generated image; do NOT suggest fixes.\n\n"
|
|
"=== AUTHORITATIVE REFERENCE (the target) ===\n"
|
|
f"{reference_description.strip()}\n"
|
|
"=== end reference ===\n\n"
|
|
"Axes and exactly what each one means:\n"
|
|
f"{_axis_definition_block(axes)}\n\n"
|
|
+ tail +
|
|
"If the reference does not address an axis, verdict 'match' and ref/gen 'n/a'."
|
|
)
|
|
|
|
# Two-image mode: compare the reference image directly against the generated image.
|
|
return (
|
|
"You are a meticulous visual-similarity judge for an image-generation "
|
|
"calibration loop. You are shown two images: IMAGE 1 is the REFERENCE "
|
|
"(the target) and IMAGE 2 is the GENERATED candidate.\n\n"
|
|
"For every axis report THREE things:\n"
|
|
" - ref: concretely what IMAGE 1 (reference) shows for this axis\n"
|
|
" - gen: concretely what IMAGE 2 (generated) shows for this axis\n"
|
|
+ verdict_rule +
|
|
"Use specific concrete values (e.g. ref 'doggy style', gen 'cowgirl'), not "
|
|
"vague notes. Describe ONLY what you observe — do NOT suggest fixes.\n\n"
|
|
"Axes and exactly what each one means:\n"
|
|
f"{_axis_definition_block(axes)}\n\n"
|
|
+ tail +
|
|
"If an axis does not apply to either image, verdict 'match' and ref/gen 'n/a'."
|
|
)
|
|
|
|
|
|
def _format_chatml_qwenvl(messages):
|
|
"""Manual Qwen-VL ChatML prompt, used when the processor has no chat template
|
|
(e.g. checkpoints converted for ComfyUI that drop chat_template.json). Mirrors
|
|
apply_chat_template: each image -> <|vision_start|><|image_pad|><|vision_end|>,
|
|
which the processor then expands to the right number of image tokens."""
|
|
parts = []
|
|
for msg in messages:
|
|
parts.append(f"<|im_start|>{msg['role']}\n")
|
|
content = msg["content"]
|
|
if isinstance(content, str):
|
|
parts.append(content)
|
|
else:
|
|
for item in content:
|
|
if item.get("type") == "image":
|
|
parts.append("<|vision_start|><|image_pad|><|vision_end|>")
|
|
elif item.get("type") == "text":
|
|
parts.append(item.get("text", ""))
|
|
parts.append("<|im_end|>\n")
|
|
parts.append("<|im_start|>assistant\n")
|
|
return "".join(parts)
|
|
|
|
|
|
def _generate_from_messages(model, processor, messages, images, max_new_tokens, temperature):
|
|
"""Template + forward pass for a chat-message list; returns the decoded string."""
|
|
try:
|
|
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
|
|
except (ValueError, AttributeError):
|
|
# Processor/tokenizer carries no chat template -> build ChatML by hand.
|
|
text = _format_chatml_qwenvl(messages)
|
|
inputs = processor(text=[text], images=images, return_tensors="pt")
|
|
inputs = inputs.to(model.device)
|
|
|
|
gen_kwargs = dict(max_new_tokens=max_new_tokens)
|
|
if temperature and temperature > 0:
|
|
gen_kwargs.update(do_sample=True, temperature=float(temperature))
|
|
else:
|
|
gen_kwargs.update(do_sample=False)
|
|
|
|
with torch.inference_mode():
|
|
out = model.generate(**inputs, **gen_kwargs)
|
|
trimmed = out[:, inputs.input_ids.shape[1]:]
|
|
decoded = processor.batch_decode(trimmed, skip_special_tokens=True)[0]
|
|
return decoded.strip()
|
|
|
|
|
|
def _run_once(model, processor, ref_pil, gen_pil, axes, max_new_tokens, temperature):
|
|
"""Compare pass: ref vs gen -> raw JSON judgement string."""
|
|
messages = [
|
|
{"role": "system", "content": _build_system_prompt(axes)},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "IMAGE 1 = REFERENCE (target):"},
|
|
{"type": "image", "image": ref_pil},
|
|
{"type": "text", "text": "IMAGE 2 = GENERATED candidate:"},
|
|
{"type": "image", "image": gen_pil},
|
|
{"type": "text", "text": "Now return the strict JSON judgement."},
|
|
],
|
|
},
|
|
]
|
|
return _generate_from_messages(model, processor, messages, [ref_pil, gen_pil],
|
|
max_new_tokens, temperature)
|
|
|
|
|
|
def _run_anchored(model, processor, gen_pil, axes, max_new_tokens, temperature, reference_description):
|
|
"""Anchored compare: fixed canonical reference text + one generated image."""
|
|
messages = [
|
|
{"role": "system", "content": _build_system_prompt(axes, reference_description)},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "GENERATED candidate image:"},
|
|
{"type": "image", "image": gen_pil},
|
|
{"type": "text", "text": "Compare it to the reference description and return the strict JSON."},
|
|
],
|
|
},
|
|
]
|
|
return _generate_from_messages(model, processor, messages, [gen_pil],
|
|
max_new_tokens, temperature)
|
|
|
|
|
|
def _build_describe_prompt(axes: list[str]) -> str:
|
|
axis_lines = "\n".join(f' "{a}": "<concrete value or n/a>",' for a in axes)
|
|
return (
|
|
"You are writing the ONE canonical description of a REFERENCE image that an "
|
|
"image generator must reproduce. This description is the single source of truth "
|
|
"for the whole calibration loop, so it must be coherent and internally "
|
|
"consistent: the per-axis values must agree with each other and with the "
|
|
"paragraph (e.g. if the woman is on top, every axis that mentions arrangement "
|
|
"must say so). Describe ONLY what you observe, concretely, in prompt-ready "
|
|
"phrasing (the words a text-to-image prompt would use).\n\n"
|
|
"Axes and exactly what each one means:\n"
|
|
f"{_axis_definition_block(axes)}\n\n"
|
|
"Reply with STRICT JSON only, no prose, no markdown fences, exactly:\n"
|
|
"{\n"
|
|
' "description": "<one detailed, self-consistent paragraph describing the whole scene as a generation prompt>",\n'
|
|
' "axes": {\n'
|
|
f"{axis_lines}\n"
|
|
" }\n"
|
|
"}\n"
|
|
"Each axis value is a concrete description of that aspect (or \"n/a\" if absent) "
|
|
"and must not contradict the paragraph. The description is directly usable as a prompt."
|
|
)
|
|
|
|
|
|
def _run_describe(model, processor, ref_pil, axes, max_new_tokens, temperature):
|
|
"""Describe pass: reference only -> raw JSON {caption, axes} string."""
|
|
messages = [
|
|
{"role": "system", "content": _build_describe_prompt(axes)},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "Describe this reference image:"},
|
|
{"type": "image", "image": ref_pil},
|
|
{"type": "text", "text": "Return the strict JSON description."},
|
|
],
|
|
},
|
|
]
|
|
return _generate_from_messages(model, processor, messages, [ref_pil],
|
|
max_new_tokens, temperature)
|
|
|
|
|
|
def _parse_json(raw: str) -> dict | None:
|
|
"""Best-effort: pull the first balanced JSON object out of the model output."""
|
|
# Strip code fences if present.
|
|
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL)
|
|
candidate = fenced.group(1) if fenced else None
|
|
if candidate is None:
|
|
start = raw.find("{")
|
|
if start == -1:
|
|
return None
|
|
depth = 0
|
|
for i in range(start, len(raw)):
|
|
if raw[i] == "{":
|
|
depth += 1
|
|
elif raw[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = raw[start:i + 1]
|
|
break
|
|
if candidate is None:
|
|
return None
|
|
try:
|
|
return json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
_VERDICT_ORDINAL = {"match": 1.0, "partial": 0.5, "mismatch": 0.0}
|
|
|
|
|
|
def _verdict_ordinal(verdict) -> float:
|
|
return _VERDICT_ORDINAL.get(str(verdict).strip().lower(), 0.0)
|
|
|
|
|
|
def _ordinal_verdict(x: float) -> str:
|
|
return "match" if x >= 0.75 else ("partial" if x >= 0.25 else "mismatch")
|
|
|
|
|
|
def _normalize_value(s) -> str:
|
|
return re.sub(r"\s+", " ", str(s).strip().lower()).strip(" .,:;")
|
|
|
|
|
|
def _apply_identical_match(axes: dict) -> dict:
|
|
"""Deterministic correction: small VLMs over-use 'partial', mislabeling axes
|
|
where ref and gen are identical. Force 'match' when the texts are equal — this
|
|
doesn't depend on the model getting the verdict right."""
|
|
for v in axes.values():
|
|
ref = v.get("ref", "")
|
|
if ref and _normalize_value(ref) == _normalize_value(v.get("gen", "")):
|
|
v["verdict"] = "match"
|
|
return axes
|
|
|
|
|
|
def _score_from_axes(axes: dict) -> tuple[float, int]:
|
|
"""Deterministic overall score (mean verdict ordinal) + mismatch count.
|
|
Computed here, not by the model, so it's reliable and monotonic."""
|
|
if not axes:
|
|
return 0.0, 0
|
|
ordinals = [_verdict_ordinal(v.get("verdict")) for v in axes.values()]
|
|
mismatches = sum(1 for o in ordinals if o == 0.0)
|
|
return round(sum(ordinals) / len(ordinals), 4), mismatches
|
|
|
|
|
|
def _merge_swapped(a: dict, b: dict) -> dict:
|
|
"""Average two judgements (normal + order-swapped) to cut position bias."""
|
|
if not b:
|
|
return a
|
|
if not a:
|
|
return b
|
|
out = {"axes": {}}
|
|
axes = set(a.get("axes", {})) | set(b.get("axes", {}))
|
|
for ax in axes:
|
|
sa = a.get("axes", {}).get(ax, {})
|
|
sb = b.get("axes", {}).get(ax, {})
|
|
# Average the two passes' verdicts on a 0/0.5/1 scale, then re-bucket.
|
|
ord_avg = (_verdict_ordinal(sa.get("verdict")) + _verdict_ordinal(sb.get("verdict"))) / 2.0
|
|
# In pass b the images were swapped, so b.ref describes the generated image
|
|
# and b.gen the reference -> invert b when falling back.
|
|
ref = sa.get("ref") or sb.get("gen") or ""
|
|
gen = sa.get("gen") or sb.get("ref") or ""
|
|
out["axes"][ax] = {"verdict": _ordinal_verdict(ord_avg), "ref": ref, "gen": gen}
|
|
return out
|
|
|
|
|
|
def _report_base_dir(report_dir: str) -> str:
|
|
if report_dir:
|
|
return report_dir
|
|
try:
|
|
import folder_paths
|
|
return os.path.join(folder_paths.get_output_directory(), "calibrator")
|
|
except Exception:
|
|
return os.path.join(os.path.dirname(os.path.dirname(__file__)), "output", "calibrator")
|
|
|
|
|
|
def _write_report(report_dir, run_tag, overall, merged, diff_analysis, raw_all, prompt_used,
|
|
mismatch_count=0):
|
|
"""Persist the analysis so the external CLI agent can read it after a queue.
|
|
|
|
Writes a per-run file plus a stable `latest.json` the agent can always poll.
|
|
Returns the per-run file path (or "" on failure)."""
|
|
base = _report_base_dir(report_dir)
|
|
try:
|
|
os.makedirs(base, exist_ok=True)
|
|
except OSError as e:
|
|
print(f"[QwenVLImageJudge] could not create report dir {base}: {e}")
|
|
return ""
|
|
|
|
payload = {
|
|
"run_tag": run_tag,
|
|
"overall_score": round(float(overall), 4),
|
|
"mismatch_count": mismatch_count,
|
|
"axes": (merged or {}).get("axes", {}),
|
|
"diff_analysis": diff_analysis,
|
|
"prompt_used": prompt_used,
|
|
"raw": raw_all,
|
|
}
|
|
tag = re.sub(r"[^A-Za-z0-9._-]", "_", run_tag) if run_tag else "latest"
|
|
run_path = os.path.join(base, f"calib_{tag}.json")
|
|
for path in (run_path, os.path.join(base, "latest.json")):
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
except OSError as e:
|
|
print(f"[QwenVLImageJudge] failed writing report {path}: {e}")
|
|
# A markdown sibling is handy for the agent to read as plain text.
|
|
try:
|
|
md = (f"# Calibration analysis ({tag})\n\n"
|
|
f"**overall_score:** {payload['overall_score']}\n\n"
|
|
f"**prompt_used:**\n\n{prompt_used or '(not provided)'}\n\n"
|
|
f"## per-axis\n\n{diff_analysis}\n")
|
|
with open(os.path.join(base, f"calib_{tag}.md"), "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
except OSError:
|
|
pass
|
|
return run_path
|
|
|
|
|
|
def _format_canonical_reference(caption: str, axes_spec: dict) -> str:
|
|
"""One canonical reference description = the paragraph + the per-axis target
|
|
values. The compare pass anchors on this so the reference side stays consistent
|
|
across iterations (no re-describing the reference each time)."""
|
|
lines = [caption.strip()] if caption else []
|
|
if axes_spec:
|
|
lines.append("")
|
|
for ax, val in axes_spec.items():
|
|
lines.append(f"- {ax}: {val}")
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
def _write_describe_report(report_dir, run_tag, caption, axes_spec, raw, canonical=""):
|
|
"""Persist the first-pass canonical description (target spec) to seed from."""
|
|
base = _report_base_dir(report_dir)
|
|
try:
|
|
os.makedirs(base, exist_ok=True)
|
|
except OSError as e:
|
|
print(f"[QwenVLImageJudge] could not create report dir {base}: {e}")
|
|
return ""
|
|
payload = {
|
|
"mode": "describe",
|
|
"run_tag": run_tag,
|
|
"caption": caption,
|
|
"axes": axes_spec, # per-axis target values -> the agent's initial axis_state
|
|
"canonical_reference": canonical or _format_canonical_reference(caption, axes_spec),
|
|
"raw": raw,
|
|
}
|
|
tag = re.sub(r"[^A-Za-z0-9._-]", "_", run_tag) if run_tag else "describe"
|
|
run_path = os.path.join(base, f"calib_{tag}.json")
|
|
for path in (run_path, os.path.join(base, "latest.json")):
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
except OSError as e:
|
|
print(f"[QwenVLImageJudge] failed writing report {path}: {e}")
|
|
return run_path
|
|
|
|
|
|
class QwenVLImageJudge:
|
|
"""ComfyUI node: describe a reference, or score how close a generated image is to it."""
|
|
|
|
CATEGORY = "prompt_calibrator"
|
|
FUNCTION = "judge"
|
|
RETURN_TYPES = ("FLOAT", "STRING", "STRING", "STRING", "STRING")
|
|
RETURN_NAMES = ("overall_score", "axis_scores_json", "analysis", "raw", "report_path")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"reference_image": ("IMAGE",),
|
|
# describe = reference only -> target description (first pass, seeds the
|
|
# initial prompt). compare = ref vs generated -> per-axis scoring.
|
|
"mode": (["compare", "describe"], {"default": "compare"}),
|
|
"model_path": ("STRING", {"default": DEFAULT_MODEL_PATH}),
|
|
"precision": (["bf16", "fp16", "fp8", "nf4"], {"default": "bf16"}),
|
|
"axes": ("STRING", {"default": DEFAULT_AXES, "multiline": True}),
|
|
"max_new_tokens": ("INT", {"default": 1024, "min": 64, "max": 4096}),
|
|
"temperature": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.5, "step": 0.05}),
|
|
"swap_eval": ("BOOLEAN", {"default": True}),
|
|
},
|
|
"optional": {
|
|
"generated_image": ("IMAGE",), # required for compare, ignored for describe
|
|
"keep_loaded": ("BOOLEAN", {"default": True}),
|
|
"auto_download": ("BOOLEAN", {"default": True}),
|
|
# The agent reads the analysis from these files after each queue.
|
|
"report_dir": ("STRING", {"default": ""}),
|
|
"run_tag": ("STRING", {"default": ""}),
|
|
"prompt_used": ("STRING", {"default": "", "multiline": True}),
|
|
# compare: canonical reference text (from describe). When set, compare
|
|
# anchors on it instead of re-reading the reference image each time.
|
|
"reference_description": ("STRING", {"default": "", "multiline": True}),
|
|
},
|
|
}
|
|
|
|
def judge(self, reference_image, mode, model_path, precision, axes,
|
|
max_new_tokens, temperature, swap_eval, generated_image=None,
|
|
keep_loaded=True, auto_download=True,
|
|
report_dir="", run_tag="", prompt_used="", reference_description=""):
|
|
axis_list = [a.strip() for a in re.split(r"[,\n]", axes) if a.strip()]
|
|
if not axis_list:
|
|
axis_list = [a.strip() for a in DEFAULT_AXES.split(",")]
|
|
|
|
try:
|
|
resolved_path = _resolve_model_source(model_path, auto_download)
|
|
except Exception as e: # missing model / download failure -> surface as score 0
|
|
msg = str(e)
|
|
print(msg)
|
|
return (0.0, "{}", msg, msg, "")
|
|
|
|
ref_pil = _tensor_to_pil(reference_image)
|
|
model, processor = _load_model(resolved_path, precision)
|
|
|
|
if mode == "describe":
|
|
return self._describe(model, processor, ref_pil, axis_list, max_new_tokens,
|
|
temperature, resolved_path, precision, keep_loaded,
|
|
report_dir, run_tag)
|
|
|
|
if generated_image is None:
|
|
msg = "[QwenVLImageJudge] compare mode needs generated_image (or set mode=describe)."
|
|
print(msg)
|
|
return (0.0, "{}", msg, msg, "")
|
|
gen_pil = _tensor_to_pil(generated_image)
|
|
|
|
if reference_description.strip():
|
|
# Anchored: fixed canonical reference text + one generated image. No swap
|
|
# (single image), and the reference side stays identical across iterations.
|
|
raw_all = _run_anchored(model, processor, gen_pil, axis_list, max_new_tokens,
|
|
temperature, reference_description)
|
|
merged = _parse_json(raw_all) or {}
|
|
else:
|
|
raw1 = _run_once(model, processor, ref_pil, gen_pil, axis_list, max_new_tokens, temperature)
|
|
parsed1 = _parse_json(raw1) or {}
|
|
raw_all = raw1
|
|
merged = parsed1
|
|
if swap_eval:
|
|
# Swap which image is called REFERENCE to average out position bias.
|
|
raw2 = _run_once(model, processor, gen_pil, ref_pil, axis_list, max_new_tokens, temperature)
|
|
parsed2 = _parse_json(raw2) or {}
|
|
merged = _merge_swapped(parsed1, parsed2)
|
|
raw_all = raw1 + "\n--- SWAPPED ---\n" + raw2
|
|
|
|
if not keep_loaded:
|
|
_MODEL_CACHE.pop((resolved_path, precision), None)
|
|
del model
|
|
torch.cuda.empty_cache()
|
|
|
|
axes_map = merged.get("axes", {}) if merged else {}
|
|
# Correct the 4B's bias toward 'partial' on identical values, then score.
|
|
axes_map = _apply_identical_match(axes_map)
|
|
overall, mismatch_count = _score_from_axes(axes_map)
|
|
axis_scores = json.dumps(axes_map, ensure_ascii=False, indent=2) if axes_map else "{}"
|
|
|
|
# Summary worst-first: mismatch, then partial, then match.
|
|
items = sorted(axes_map.items(), key=lambda kv: _verdict_ordinal(kv[1].get("verdict")))
|
|
diff_lines = [
|
|
f"- {ax}: {str(info.get('verdict', '?')).upper():8} "
|
|
f"ref:[{info.get('ref', '')}] gen:[{info.get('gen', '')}]"
|
|
for ax, info in items
|
|
]
|
|
header = f"overall {overall:.2f} | {mismatch_count} mismatch(es) of {len(axes_map)} axes"
|
|
diff_analysis = header + "\n" + "\n".join(diff_lines) if diff_lines else "(no parseable judgement)"
|
|
|
|
report_path = _write_report(
|
|
report_dir, run_tag, overall, merged, diff_analysis, raw_all, prompt_used, mismatch_count)
|
|
|
|
return (round(overall, 4), axis_scores, diff_analysis, raw_all, report_path)
|
|
|
|
def _describe(self, model, processor, ref_pil, axis_list, max_new_tokens,
|
|
temperature, resolved_path, precision, keep_loaded, report_dir, run_tag):
|
|
"""First pass: describe the reference image the generator must reproduce.
|
|
Outputs the target spec (per-axis values) + a prompt-ready caption."""
|
|
raw = _run_describe(model, processor, ref_pil, axis_list, max_new_tokens, temperature)
|
|
parsed = _parse_json(raw) or {}
|
|
|
|
if not keep_loaded:
|
|
_MODEL_CACHE.pop((resolved_path, precision), None)
|
|
del model
|
|
torch.cuda.empty_cache()
|
|
|
|
caption = (parsed.get("description") or parsed.get("caption") or "").strip()
|
|
axes_spec = parsed.get("axes", {}) if isinstance(parsed.get("axes"), dict) else {}
|
|
axis_scores = json.dumps(axes_spec, ensure_ascii=False, indent=2)
|
|
# The canonical reference text the compare pass will anchor on: paragraph + axes.
|
|
canonical = _format_canonical_reference(caption, axes_spec)
|
|
analysis = canonical if caption else "(no parseable description)"
|
|
|
|
report_path = _write_describe_report(report_dir, run_tag, caption, axes_spec, raw, canonical)
|
|
# overall_score is n/a in describe mode; return 1.0 as a neutral placeholder.
|
|
return (1.0, axis_scores, analysis, raw, report_path)
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {"QwenVLImageJudge": QwenVLImageJudge}
|
|
NODE_DISPLAY_NAME_MAPPINGS = {"QwenVLImageJudge": "Qwen3-VL Image Judge (Calibrator)"}
|