Files
ComfyUI-Prompt-Calibrator/agent_bridge.py
T
Ethanfel 95198a15b5 Initial commit: VLM-as-judge prompt calibration loop
Qwen3-VL image-similarity judge node, external-prompt receptor node,
agent_bridge CLI, example SDXL workflow, and methodology/agent-loop/
calibration-policy docs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 22:15:56 +02:00

147 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
agent_bridge.py — drive one calibration iteration from a CLI agent.
The external agent (controller/brain) calls this once per loop step:
python agent_bridge.py \
--workflow workflow_api.json \
--prompt "1 woman, red lingerie, bedroom, full body, warm light" \
--run-tag iter003 \
--analysis-dir /path/to/ComfyUI/output/calibrator
It injects the prompt into the `CalibratorPromptReceptor` node, queues the graph
on a running ComfyUI (`POST /prompt`), waits for completion (`GET /history/{id}`),
then prints the Qwen3-VL Judge's analysis JSON to stdout for the agent to read.
Stdlib only — no third-party deps, so any agent can shell out to it.
Loop, from the agent's side:
1. build a prompt (calibrate from the previous analysis)
2. run this script -> capture stdout (the analysis JSON)
3. read overall_score + per-axis diffs + fix_suggestions
4. adjust the prompt and go to 1, until overall_score >= target
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
import uuid
RECEPTOR_CLASS = "CalibratorPromptReceptor"
JUDGE_CLASS = "QwenVLImageJudge"
def _http_json(url: str, payload: dict | None = None, timeout: int = 30):
data = json.dumps(payload).encode("utf-8") if payload is not None else None
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"} if data else {})
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else {}
def _inject(graph: dict, prompt: str, negative: str, seed: int, run_tag: str):
"""Set the receptor's prompt/negative/seed and the judge's run_tag in-place."""
found_receptor = False
for node in graph.values():
ctype = node.get("class_type")
inputs = node.setdefault("inputs", {})
if ctype == RECEPTOR_CLASS:
inputs["prompt"] = prompt
inputs["negative"] = negative
inputs["seed"] = int(seed)
found_receptor = True
elif ctype == JUDGE_CLASS:
inputs["run_tag"] = run_tag
inputs["prompt_used"] = prompt
if not found_receptor:
raise SystemExit(
f"[agent_bridge] no '{RECEPTOR_CLASS}' node in the workflow — add the "
f"'SxCP External Prompt (Receptor)' node and feed the sampler from it.")
def _wait_for_history(server: str, prompt_id: str, timeout: int):
deadline = time.time() + timeout
while time.time() < deadline:
hist = _http_json(f"http://{server}/history/{prompt_id}")
if prompt_id in hist:
entry = hist[prompt_id]
status = entry.get("status", {})
# ComfyUI marks completed=True (or status_str) when the run is done.
if status.get("completed", True):
return entry
time.sleep(1.0)
raise SystemExit(f"[agent_bridge] timed out after {timeout}s waiting for {prompt_id}")
def _read_report(analysis_file: str, analysis_dir: str, run_tag: str):
candidates = []
if analysis_file:
candidates.append(analysis_file)
if analysis_dir:
if run_tag:
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in run_tag)
candidates.append(os.path.join(analysis_dir, f"calib_{safe}.json"))
candidates.append(os.path.join(analysis_dir, "latest.json"))
for path in candidates:
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f), path
return None, None
def main(argv=None):
ap = argparse.ArgumentParser(description="Drive one ComfyUI calibration iteration.")
ap.add_argument("--server", default="127.0.0.1:8188")
ap.add_argument("--workflow", required=True, help="API-format workflow JSON")
ap.add_argument("--prompt", required=True)
ap.add_argument("--negative", default="")
ap.add_argument("--seed", type=int, default=0)
ap.add_argument("--run-tag", default="")
ap.add_argument("--analysis-file", default="",
help="explicit path to the report JSON the Judge writes")
ap.add_argument("--analysis-dir", default="",
help="dir holding calib_<tag>.json / latest.json (Judge report_dir)")
ap.add_argument("--timeout", type=int, default=600)
args = ap.parse_args(argv)
with open(args.workflow, "r", encoding="utf-8") as f:
graph = json.load(f)
_inject(graph, args.prompt, args.negative, args.seed, args.run_tag)
client_id = uuid.uuid4().hex
try:
queued = _http_json(f"http://{args.server}/prompt",
{"prompt": graph, "client_id": client_id})
except urllib.error.URLError as e:
raise SystemExit(f"[agent_bridge] cannot reach ComfyUI at {args.server}: {e}")
prompt_id = queued.get("prompt_id")
if not prompt_id:
raise SystemExit(f"[agent_bridge] queue rejected: {json.dumps(queued)[:400]}")
_wait_for_history(args.server, prompt_id, args.timeout)
report, path = _read_report(args.analysis_file, args.analysis_dir, args.run_tag)
if report is None:
raise SystemExit(
"[agent_bridge] run finished but no report file found. Set the Judge "
"node's report_dir and pass --analysis-dir (or --analysis-file).")
report["_prompt_id"] = prompt_id
report["_report_path"] = path
json.dump(report, sys.stdout, ensure_ascii=False, indent=2)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())