8b567cb531
For JSON-producing system prompts (e.g. LTX prompt-relay), json_output=true pulls the JSON object out of the reply (strips reasoning/prose/code-fences via _parse_json, which handles nested schemas and reasoning-then-JSON) and returns it re-serialized; falls back to raw text if none parses. agent_bridge gains --json-output. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
193 lines
8.3 KiB
Python
193 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
agent_bridge.py — drive one calibration iteration from a CLI agent.
|
|
|
|
The external agent (controller/brain) calls this once per loop step:
|
|
|
|
python agent_bridge.py \
|
|
--workflow workflow_api.json \
|
|
--prompt "1 woman, red lingerie, bedroom, full body, warm light" \
|
|
--run-tag iter003 \
|
|
--analysis-dir /path/to/ComfyUI/output/calibrator
|
|
|
|
It injects the prompt into the `CalibratorPromptReceptor` node, queues the graph
|
|
on a running ComfyUI (`POST /prompt`), waits for completion (`GET /history/{id}`),
|
|
then prints the Qwen3-VL Judge's analysis JSON to stdout for the agent to read.
|
|
|
|
Stdlib only — no third-party deps, so any agent can shell out to it.
|
|
|
|
Loop, from the agent's side:
|
|
1. build a prompt (calibrate from the previous analysis)
|
|
2. run this script -> capture stdout (the analysis JSON)
|
|
3. read overall_score + per-axis {score, ref, gen}
|
|
4. adjust the prompt and go to 1, until overall_score >= target
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
import uuid
|
|
|
|
RECEPTOR_CLASS = "CalibratorPromptReceptor"
|
|
JUDGE_CLASS = "QwenVLImageJudge"
|
|
|
|
|
|
def _http_json(url: str, payload: dict | None = None, timeout: int = 30):
|
|
data = json.dumps(payload).encode("utf-8") if payload is not None else None
|
|
req = urllib.request.Request(
|
|
url, data=data, headers={"Content-Type": "application/json"} if data else {})
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8")
|
|
return json.loads(body) if body else {}
|
|
|
|
|
|
def _inject(graph: dict, prompt: str, negative: str, seed: int, run_tag: str, mode: str,
|
|
reference_description: str = "", profile: str = "", model_select: str = "",
|
|
model_path: str = "", system_prompt: str = "", user_prompt: str = "",
|
|
json_output: bool = False):
|
|
"""Set the receptor's prompt/seed and the judge's mode/run_tag in-place.
|
|
|
|
compare mode needs a receptor (to inject the prompt). describe mode is the first
|
|
pass over the reference only, so no receptor is required. reference_description, if
|
|
given, anchors compare on the canonical reference text from the describe pass."""
|
|
found_receptor = False
|
|
for node in graph.values():
|
|
ctype = node.get("class_type")
|
|
inputs = node.setdefault("inputs", {})
|
|
if ctype == RECEPTOR_CLASS:
|
|
inputs["prompt"] = prompt
|
|
inputs["negative"] = negative
|
|
inputs["seed"] = int(seed)
|
|
found_receptor = True
|
|
elif ctype == JUDGE_CLASS:
|
|
inputs["mode"] = mode
|
|
inputs["run_tag"] = run_tag
|
|
if reference_description:
|
|
inputs["reference_description"] = reference_description
|
|
if profile:
|
|
inputs["profile"] = profile
|
|
if model_select:
|
|
inputs["model_select"] = model_select
|
|
if model_path:
|
|
inputs["model_path"] = model_path
|
|
if system_prompt:
|
|
inputs["system_prompt"] = system_prompt
|
|
if user_prompt:
|
|
inputs["user_prompt"] = user_prompt
|
|
if json_output:
|
|
inputs["json_output"] = True
|
|
if mode == "compare" and not found_receptor:
|
|
raise SystemExit(
|
|
f"[agent_bridge] no '{RECEPTOR_CLASS}' node in the workflow — add the "
|
|
f"'SxCP External Prompt (Receptor)' node and feed the sampler from it.")
|
|
|
|
|
|
def _wait_for_history(server: str, prompt_id: str, timeout: int):
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
hist = _http_json(f"http://{server}/history/{prompt_id}")
|
|
if prompt_id in hist:
|
|
entry = hist[prompt_id]
|
|
status = entry.get("status", {})
|
|
# ComfyUI marks completed=True (or status_str) when the run is done.
|
|
if status.get("completed", True):
|
|
return entry
|
|
time.sleep(1.0)
|
|
raise SystemExit(f"[agent_bridge] timed out after {timeout}s waiting for {prompt_id}")
|
|
|
|
|
|
def _read_report(analysis_file: str, analysis_dir: str, run_tag: str):
|
|
candidates = []
|
|
if analysis_file:
|
|
candidates.append(analysis_file)
|
|
if analysis_dir:
|
|
if run_tag:
|
|
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in run_tag)
|
|
candidates.append(os.path.join(analysis_dir, f"calib_{safe}.json"))
|
|
candidates.append(os.path.join(analysis_dir, "latest.json"))
|
|
for path in candidates:
|
|
if os.path.isfile(path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f), path
|
|
return None, None
|
|
|
|
|
|
def main(argv=None):
|
|
ap = argparse.ArgumentParser(description="Drive one ComfyUI calibration iteration.")
|
|
ap.add_argument("--server", default="127.0.0.1:8188")
|
|
ap.add_argument("--workflow", required=True, help="API-format workflow JSON")
|
|
ap.add_argument("--mode", choices=["compare", "describe", "chat"], default="compare",
|
|
help="describe = first pass over the reference; chat = general VLM with your prompts")
|
|
ap.add_argument("--system-prompt", default="", help="chat mode: system prompt")
|
|
ap.add_argument("--user-prompt", default="", help="chat mode: user prompt over the image(s)")
|
|
ap.add_argument("--json-output", action="store_true",
|
|
help="chat mode: extract & return clean JSON from the reply")
|
|
ap.add_argument("--prompt", default="", help="generation prompt (required for compare)")
|
|
ap.add_argument("--negative", default="")
|
|
ap.add_argument("--seed", type=int, default=0)
|
|
ap.add_argument("--run-tag", default="")
|
|
ap.add_argument("--profile", default="",
|
|
help="analysis profile on the judge (general/oral/penetration/handjob/solo)")
|
|
ap.add_argument("--model-select", default="", help="judge model dropdown label (overrides workflow)")
|
|
ap.add_argument("--model-path", default="", help="manual judge model path/repo (overrides dropdown)")
|
|
ap.add_argument("--ref-desc", default="",
|
|
help="canonical reference text to anchor compare on (from the describe pass)")
|
|
ap.add_argument("--ref-desc-file", default="",
|
|
help="path to a describe report JSON; uses its canonical_reference to anchor compare")
|
|
ap.add_argument("--analysis-file", default="",
|
|
help="explicit path to the report JSON the Judge writes")
|
|
ap.add_argument("--analysis-dir", default="",
|
|
help="dir holding calib_<tag>.json / latest.json (Judge report_dir)")
|
|
ap.add_argument("--timeout", type=int, default=600)
|
|
args = ap.parse_args(argv)
|
|
|
|
if args.mode == "compare" and not args.prompt:
|
|
raise SystemExit("[agent_bridge] --prompt is required in compare mode.")
|
|
|
|
ref_desc = args.ref_desc
|
|
if args.ref_desc_file:
|
|
with open(args.ref_desc_file, "r", encoding="utf-8") as f:
|
|
rep = json.load(f)
|
|
ref_desc = rep.get("canonical_reference") or rep.get("caption") or ref_desc
|
|
|
|
with open(args.workflow, "r", encoding="utf-8") as f:
|
|
graph = json.load(f)
|
|
|
|
_inject(graph, args.prompt, args.negative, args.seed, args.run_tag, args.mode, ref_desc,
|
|
args.profile, args.model_select, args.model_path, args.system_prompt, args.user_prompt,
|
|
args.json_output)
|
|
|
|
client_id = uuid.uuid4().hex
|
|
try:
|
|
queued = _http_json(f"http://{args.server}/prompt",
|
|
{"prompt": graph, "client_id": client_id})
|
|
except urllib.error.URLError as e:
|
|
raise SystemExit(f"[agent_bridge] cannot reach ComfyUI at {args.server}: {e}")
|
|
prompt_id = queued.get("prompt_id")
|
|
if not prompt_id:
|
|
raise SystemExit(f"[agent_bridge] queue rejected: {json.dumps(queued)[:400]}")
|
|
|
|
_wait_for_history(args.server, prompt_id, args.timeout)
|
|
|
|
report, path = _read_report(args.analysis_file, args.analysis_dir, args.run_tag)
|
|
if report is None:
|
|
raise SystemExit(
|
|
"[agent_bridge] run finished but no report file found. Set the Judge "
|
|
"node's report_dir and pass --analysis-dir (or --analysis-file).")
|
|
|
|
report["_prompt_id"] = prompt_id
|
|
report["_report_path"] = path
|
|
json.dump(report, sys.stdout, ensure_ascii=False, indent=2)
|
|
sys.stdout.write("\n")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|