#!/usr/bin/env python3 """ agent_bridge.py — drive one calibration iteration from a CLI agent. The external agent (controller/brain) calls this once per loop step: python agent_bridge.py \ --workflow workflow_api.json \ --prompt "1 woman, red lingerie, bedroom, full body, warm light" \ --run-tag iter003 \ --analysis-dir /path/to/ComfyUI/output/calibrator It injects the prompt into the `CalibratorPromptReceptor` node, queues the graph on a running ComfyUI (`POST /prompt`), waits for completion (`GET /history/{id}`), then prints the Qwen3-VL Judge's analysis JSON to stdout for the agent to read. Stdlib only — no third-party deps, so any agent can shell out to it. Loop, from the agent's side: 1. build a prompt (calibrate from the previous analysis) 2. run this script -> capture stdout (the analysis JSON) 3. read overall_score + per-axis {score, ref, gen} 4. adjust the prompt and go to 1, until overall_score >= target """ from __future__ import annotations import argparse import json import os import sys import time import urllib.error import urllib.request import uuid RECEPTOR_CLASS = "CalibratorPromptReceptor" JUDGE_CLASS = "QwenVLImageJudge" def _http_json(url: str, payload: dict | None = None, timeout: int = 30): data = json.dumps(payload).encode("utf-8") if payload is not None else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} if data else {}) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8") return json.loads(body) if body else {} def _inject(graph: dict, prompt: str, negative: str, seed: int, run_tag: str, mode: str, reference_description: str = "", profile: str = "", model_select: str = "", model_path: str = "", system_prompt: str = "", user_prompt: str = ""): """Set the receptor's prompt/seed and the judge's mode/run_tag in-place. compare mode needs a receptor (to inject the prompt). describe mode is the first pass over the reference only, so no receptor is required. reference_description, if given, anchors compare on the canonical reference text from the describe pass.""" found_receptor = False for node in graph.values(): ctype = node.get("class_type") inputs = node.setdefault("inputs", {}) if ctype == RECEPTOR_CLASS: inputs["prompt"] = prompt inputs["negative"] = negative inputs["seed"] = int(seed) found_receptor = True elif ctype == JUDGE_CLASS: inputs["mode"] = mode inputs["run_tag"] = run_tag if reference_description: inputs["reference_description"] = reference_description if profile: inputs["profile"] = profile if model_select: inputs["model_select"] = model_select if model_path: inputs["model_path"] = model_path if system_prompt: inputs["system_prompt"] = system_prompt if user_prompt: inputs["user_prompt"] = user_prompt if mode == "compare" and not found_receptor: raise SystemExit( f"[agent_bridge] no '{RECEPTOR_CLASS}' node in the workflow — add the " f"'SxCP External Prompt (Receptor)' node and feed the sampler from it.") def _wait_for_history(server: str, prompt_id: str, timeout: int): deadline = time.time() + timeout while time.time() < deadline: hist = _http_json(f"http://{server}/history/{prompt_id}") if prompt_id in hist: entry = hist[prompt_id] status = entry.get("status", {}) # ComfyUI marks completed=True (or status_str) when the run is done. if status.get("completed", True): return entry time.sleep(1.0) raise SystemExit(f"[agent_bridge] timed out after {timeout}s waiting for {prompt_id}") def _read_report(analysis_file: str, analysis_dir: str, run_tag: str): candidates = [] if analysis_file: candidates.append(analysis_file) if analysis_dir: if run_tag: safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in run_tag) candidates.append(os.path.join(analysis_dir, f"calib_{safe}.json")) candidates.append(os.path.join(analysis_dir, "latest.json")) for path in candidates: if os.path.isfile(path): with open(path, "r", encoding="utf-8") as f: return json.load(f), path return None, None def main(argv=None): ap = argparse.ArgumentParser(description="Drive one ComfyUI calibration iteration.") ap.add_argument("--server", default="127.0.0.1:8188") ap.add_argument("--workflow", required=True, help="API-format workflow JSON") ap.add_argument("--mode", choices=["compare", "describe", "chat"], default="compare", help="describe = first pass over the reference; chat = general VLM with your prompts") ap.add_argument("--system-prompt", default="", help="chat mode: system prompt") ap.add_argument("--user-prompt", default="", help="chat mode: user prompt over the image(s)") ap.add_argument("--prompt", default="", help="generation prompt (required for compare)") ap.add_argument("--negative", default="") ap.add_argument("--seed", type=int, default=0) ap.add_argument("--run-tag", default="") ap.add_argument("--profile", default="", help="analysis profile on the judge (general/oral/penetration/handjob/solo)") ap.add_argument("--model-select", default="", help="judge model dropdown label (overrides workflow)") ap.add_argument("--model-path", default="", help="manual judge model path/repo (overrides dropdown)") ap.add_argument("--ref-desc", default="", help="canonical reference text to anchor compare on (from the describe pass)") ap.add_argument("--ref-desc-file", default="", help="path to a describe report JSON; uses its canonical_reference to anchor compare") ap.add_argument("--analysis-file", default="", help="explicit path to the report JSON the Judge writes") ap.add_argument("--analysis-dir", default="", help="dir holding calib_.json / latest.json (Judge report_dir)") ap.add_argument("--timeout", type=int, default=600) args = ap.parse_args(argv) if args.mode == "compare" and not args.prompt: raise SystemExit("[agent_bridge] --prompt is required in compare mode.") ref_desc = args.ref_desc if args.ref_desc_file: with open(args.ref_desc_file, "r", encoding="utf-8") as f: rep = json.load(f) ref_desc = rep.get("canonical_reference") or rep.get("caption") or ref_desc with open(args.workflow, "r", encoding="utf-8") as f: graph = json.load(f) _inject(graph, args.prompt, args.negative, args.seed, args.run_tag, args.mode, ref_desc, args.profile, args.model_select, args.model_path, args.system_prompt, args.user_prompt) client_id = uuid.uuid4().hex try: queued = _http_json(f"http://{args.server}/prompt", {"prompt": graph, "client_id": client_id}) except urllib.error.URLError as e: raise SystemExit(f"[agent_bridge] cannot reach ComfyUI at {args.server}: {e}") prompt_id = queued.get("prompt_id") if not prompt_id: raise SystemExit(f"[agent_bridge] queue rejected: {json.dumps(queued)[:400]}") _wait_for_history(args.server, prompt_id, args.timeout) report, path = _read_report(args.analysis_file, args.analysis_dir, args.run_tag) if report is None: raise SystemExit( "[agent_bridge] run finished but no report file found. Set the Judge " "node's report_dir and pass --analysis-dir (or --analysis-file).") report["_prompt_id"] = prompt_id report["_report_path"] = path json.dump(report, sys.stdout, ensure_ascii=False, indent=2) sys.stdout.write("\n") return 0 if __name__ == "__main__": raise SystemExit(main())