Files
ComfyUI-Ethanfel-Prompt-Bui…/tools/prompt_route_simulation.py
T

1585 lines
58 KiB
Python

#!/usr/bin/env python3
"""Run representative prompt-route simulations and report quality issues.
This is a diagnostic tool, not a golden snapshot test. It builds a small set of
metadata rows/pairs, sends them through the Krea2, SDXL, and caption routes, and
reports route/noise/seed-control problems in a JSON-friendly structure.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
import caption_naturalizer # noqa: E402
import hardcore_action_metadata # noqa: E402
import hardcore_position_config # noqa: E402
import krea_formatter # noqa: E402
import prompt_builder as pb # noqa: E402
import sdxl_formatter # noqa: E402
import sdxl_tag_policy # noqa: E402
TRIGGER = "sxcppnl7"
SDXL_TRIGGER = "mythp0rt"
OLD_TRIGGER = "sxcpinup_coloredpencil"
SOFTCORE_NOISE_TERMS = (
"the image focuses",
"softcore version",
"non-explicit teaser setup",
"no sex act",
"genital contact",
"keep the softcore version",
"focused on woman a alone",
)
FORMATTER_LABEL_LEAKS = (
"body exposure:",
"camera control:",
"characters:",
"clothing:",
"clothing state:",
"composition:",
"facial expression:",
"facial expressions:",
"hardcore setup:",
"outfit:",
"pose:",
"pov participant:",
"role graph:",
"setting:",
"sexual pose:",
"sexual scene:",
"softcore setup:",
"softcore visual reference:",
"cast descriptors:",
"shared cast descriptors:",
"teaser outfit detail:",
"visual clothing state:",
"visible remaining styling:",
)
HARDCORE_NOISE_TERMS = (
"softcore visual reference",
"the same visibly adult",
"the scene contains",
)
def _json(value: Any) -> str:
return json.dumps(value, ensure_ascii=True, sort_keys=True)
def _clean_key(value: Any) -> str:
return re.sub(r"[^a-z0-9]+", " ", str(value or "").lower()).strip()
def _character_cast(*, pov_man: bool = False) -> str:
cast = pb.build_character_slot_json(
subject_type="woman",
label="A",
age="25-year-old adult",
ethnicity="western_european",
figure="balanced",
body="slim busty",
hair_color="blonde",
hair_length="long",
hair_style="loose_waves",
descriptor_detail="full",
expression_intensity=0.55,
softcore_expression_intensity=0.35,
hardcore_expression_intensity=0.75,
)["character_cast"]
return pb.build_character_slot_json(
subject_type="man",
label="A",
age="40-year-old adult",
ethnicity="western_european",
body="average",
descriptor_detail="compact",
expression_intensity=0.45,
softcore_expression_intensity=0.25,
hardcore_expression_intensity=0.65,
presence_mode="pov" if pov_man else "visible",
character_cast=cast,
)["character_cast"]
def _character_cast_subjects(subjects: list[str] | tuple[str, ...]) -> str:
cast = ""
counts = {"woman": 0, "man": 0}
for subject in subjects:
subject = str(subject)
counts[subject] += 1
label = chr(ord("A") + counts[subject] - 1)
cast = pb.build_character_slot_json(
subject_type=subject,
label=label,
age="25-year-old adult" if subject == "woman" else "40-year-old adult",
ethnicity="western_european",
figure="balanced",
body="slim busty" if subject == "woman" else "average",
descriptor_detail="compact",
expression_intensity=0.55,
softcore_expression_intensity=0.35,
hardcore_expression_intensity=0.75,
character_cast=cast,
)["character_cast"]
return cast
def _random_character_cast() -> str:
cast = pb.build_character_slot_json(
subject_type="woman",
label="A",
age="random",
ethnicity="random",
figure="random",
body="random",
hair_color="random",
hair_length="random",
hair_style="random",
descriptor_detail="full",
)["character_cast"]
return pb.build_character_slot_json(
subject_type="man",
label="A",
age="random",
ethnicity="random",
body="random",
descriptor_detail="compact",
character_cast=cast,
)["character_cast"]
def _coworking_location_config() -> str:
return pb.build_location_pool_json(
enabled=True,
combine_mode="replace",
preset="custom_only",
custom_locations=(
"coworking_sim: coworking lounge with tall windows, warm desks, "
"laptop tables, glass partition seams, repeated desk rows, plants, "
"and soft shared-office depth"
),
)
def _seed_probe_location_config() -> str:
return pb.build_location_pool_json(
enabled=True,
combine_mode="replace",
preset="custom_only",
custom_locations=(
"seed_coworking_desk: coworking desk row with warm lamps and laptops\n"
"seed_coworking_glass: coworking glass office with plants and partition seams\n"
"seed_coworking_windows: coworking window lounge with repeated desks and city light"
),
)
def _seed_probe_composition_config() -> str:
return pb.build_composition_pool_json(
enabled=True,
combine_mode="replace",
preset="custom_only",
custom_compositions=(
"seed composition near a desk edge\n"
"seed composition through a glass partition\n"
"seed composition down repeating desk rows"
),
)
def _orbit_camera(horizontal_angle: int = 45, vertical_angle: int = 0, zoom: float = 6.0) -> str:
return pb.build_camera_orbit_config_json(
enabled=True,
camera_mode="standard",
horizontal_angle=horizontal_angle,
vertical_angle=vertical_angle,
zoom=zoom,
framing="from_zoom",
subject_focus="action",
lens="auto",
orientation="auto",
phone_visibility="auto",
priority="soft_hint",
camera_detail="compact",
include_degrees=True,
)
def _position_filter(focus: str, family: str, positions: list[str] | tuple[str, ...] | str) -> str:
position_config = pb.build_hardcore_position_pool_json(
combine_mode="replace",
family=family,
selected_positions=positions,
)
kwargs = {
"allow_toys": False,
"allow_double": False,
"allow_penetration": focus in ("penetration_only", "keep_pool"),
"allow_foreplay": focus in ("foreplay_only", "keep_pool"),
"allow_interaction": focus in ("interaction_only", "keep_pool"),
"allow_manual": focus in ("manual_only", "keep_pool"),
"allow_oral": focus in ("oral_only", "keep_pool"),
"allow_outercourse": focus in ("outercourse_only", "keep_pool"),
"allow_anal": focus in ("anal_only", "keep_pool"),
"allow_climax": focus in ("climax_only", "keep_pool"),
}
return pb.build_hardcore_action_filter_json(
hardcore_position_config=position_config,
focus=focus,
**kwargs,
)
def _insta_options() -> str:
return pb.build_insta_of_options_json(
softcore_cast="same_as_hardcore",
hardcore_cast="couple",
hardcore_women_count=1,
hardcore_men_count=1,
softcore_level="lingerie_tease",
hardcore_level="hardcore",
softcore_expression_enabled=True,
hardcore_expression_enabled=True,
softcore_expression_intensity=0.35,
hardcore_expression_intensity=0.75,
platform_style="hybrid",
continuity="same_creator_same_room",
hardcore_clothing_continuity="explicit_nude",
softcore_camera_mode="from_camera_config",
hardcore_camera_mode="from_camera_config",
camera_detail="compact",
hardcore_detail_density="balanced",
)
HARDCORE_ROUTE_CASES = (
{
"name": "hardcore.single.oral",
"subcategory": "Oral sex",
"focus": "oral_only",
"family": "oral",
"expected_route": {"action_family": "oral", "position_family": "oral"},
"expected_terms": {
"krea": ("mouth",),
"sdxl": ("oral sex",),
"caption": ("oral action",),
},
},
{
"name": "hardcore.single.manual",
"subcategory": "Manual stimulation",
"focus": "manual_only",
"family": "manual",
"expected_route": {"action_family": "manual", "position_family": "manual"},
"expected_terms": {
"krea": ("hand",),
"sdxl": ("manual stimulation",),
"caption": ("manual action",),
},
},
{
"name": "hardcore.single.outercourse",
"subcategory": "Outercourse and genital teasing",
"focus": "outercourse_only",
"family": "outercourse",
"expected_route": {"action_family": "outercourse", "position_family": "outercourse"},
"expected_terms": {
"krea": ("penis",),
"sdxl": ("outercourse",),
"caption": ("non-penetrative action",),
},
},
{
"name": "hardcore.single.foreplay",
"subcategory": "Foreplay and teasing",
"focus": "foreplay_only",
"family": "foreplay",
"positions": ("undressing",),
"expected_route": {"action_family": "foreplay", "position_family": "foreplay"},
"expected_terms": {
"krea": ("clothing",),
"sdxl": ("foreplay",),
"caption": ("foreplay action",),
},
},
{
"name": "hardcore.single.interaction",
"subcategory": "Clothing and position transitions",
"focus": "interaction_only",
"family": "interaction",
"positions": ("position_transition",),
"expected_route": {"action_family": "foreplay", "position_family": "interaction"},
"expected_terms": {
"krea": ("clothing",),
"sdxl": ("interaction",),
"caption": ("interaction beat",),
},
},
{
"name": "hardcore.single.anal",
"subcategory": "Anal and double penetration",
"focus": "anal_only",
"family": "anal",
"positions": ("doggy", "face_down_ass_up"),
"expected_route": {"action_family": "anal", "position_family": "anal"},
"expected_terms": {
"krea": ("ass",),
"sdxl": ("anal sex",),
"caption": ("anal action",),
},
},
{
"name": "hardcore.single.threesome",
"subcategory": "Threesomes",
"focus": "threesome_only",
"family": "threesome",
"expected_route": {"action_family": "threesome", "position_family": "threesome"},
"expected_terms": {
"krea": ("three",),
"sdxl": ("threesome",),
"caption": ("three-person action",),
},
},
{
"name": "hardcore.single.group",
"subcategory": "Group sex and orgy",
"focus": "group_only",
"family": "group",
"expected_route": {"action_family": "group", "position_family": "group"},
"expected_terms": {
"krea": ("group",),
"sdxl": ("group sex",),
"caption": ("group action",),
},
},
{
"name": "hardcore.single.climax",
"subcategory": "Cumshot and climax",
"focus": "climax_only",
"family": "climax",
"expected_route": {"action_family": "climax", "position_family": "climax"},
"expected_terms": {
"krea": ("semen",),
"sdxl": ("climax", "semen"),
"caption": ("climax action",),
},
},
)
ROUTE_SIM_ACTION_FAMILY_EXCLUSIONS = {
"default",
# Dedicated double-contact route assertions live in prompt_smoke because they
# need a multi-person, position-specific fixture rather than a broad family case.
"toy_double",
}
ROUTE_SIM_POSITION_FAMILY_EXCLUSIONS = {"any"}
def _format_metadata(metadata: dict[str, Any], target: str) -> dict[str, Any]:
metadata_json = _json(metadata)
krea = krea_formatter.format_krea2_prompt(
"",
metadata_json=metadata_json,
input_hint="metadata_json",
target=target,
detail_level="balanced",
style_mode="preserve",
)
sdxl = sdxl_formatter.format_sdxl_prompt(
"",
metadata_json=metadata_json,
input_hint="metadata_json",
target=target,
formatter_profile="manual_controls",
style_preset="flat_vector_pony",
quality_preset="pony_high",
trigger=SDXL_TRIGGER,
prepend_trigger=True,
preserve_trigger=False,
nude_weight=1.29,
)
caption, caption_method, caption_trace = caption_naturalizer.naturalize_caption_with_trace(
"",
metadata_json=metadata_json,
input_hint="metadata_json",
target=target,
trigger=TRIGGER,
include_trigger=True,
detail_level="balanced",
style_policy="drop_style_tail",
caption_profile="training_dense",
)
return {
"krea": krea,
"sdxl": sdxl,
"caption": {
"natural_caption": caption,
"method": caption_method,
"route_trace_json": caption_trace,
},
}
def _duplicate_comma_items(value: Any) -> list[str]:
items = [_clean_key(part) for part in str(value or "").split(",")]
items = [part for part in items if part]
return sorted({part for part in items if items.count(part) > 1})
def _text_issues(label: str, value: Any, *, min_len: int = 8) -> list[str]:
text = str(value or "")
issues: list[str] = []
if len(text.strip()) < min_len:
issues.append(f"{label}: empty_or_short")
if "None" in text:
issues.append(f"{label}: leaked_None")
if " " in text:
issues.append(f"{label}: repeated_spaces")
if " ," in text or " ." in text:
issues.append(f"{label}: bad_punctuation_spacing")
return issues
def _contains_all(text: str, required: tuple[str, ...]) -> bool:
lower = text.lower()
return all(term.lower() in lower for term in required)
def _trigger_count(text: str, trigger: str) -> int:
return len(re.findall(rf"(?<![a-z0-9_]){re.escape(trigger)}(?![a-z0-9_])", text, flags=re.IGNORECASE))
def _formatter_trigger_issues(name: str, prompts: dict[str, str]) -> list[str]:
issues: list[str] = []
krea_prompt = prompts["krea"]
sdxl_prompt = prompts["sdxl"]
caption_text = prompts["caption"]
for trigger in (TRIGGER, SDXL_TRIGGER, OLD_TRIGGER):
if _trigger_count(krea_prompt, trigger):
issues.append(f"{name}.krea_prompt: unexpected_trigger:{trigger}")
sdxl_count = _trigger_count(sdxl_prompt, SDXL_TRIGGER)
if sdxl_count != 1:
issues.append(f"{name}.sdxl_prompt: trigger_count:{SDXL_TRIGGER}:{sdxl_count}")
for trigger in (TRIGGER, OLD_TRIGGER):
if _trigger_count(sdxl_prompt, trigger):
issues.append(f"{name}.sdxl_prompt: unexpected_trigger:{trigger}")
caption_count = _trigger_count(caption_text, TRIGGER)
if caption_count != 1:
issues.append(f"{name}.caption: trigger_count:{TRIGGER}:{caption_count}")
for trigger in (SDXL_TRIGGER, OLD_TRIGGER):
if _trigger_count(caption_text, trigger):
issues.append(f"{name}.caption: unexpected_trigger:{trigger}")
return issues
def _formatter_expectation_issues(
name: str,
formats: dict[str, Any],
expected_terms: dict[str, tuple[str, ...]] | None,
) -> list[str]:
if not expected_terms:
return []
prompts = {
"krea": str(formats["krea"].get("krea_prompt") or ""),
"sdxl": str(formats["sdxl"].get("sdxl_prompt") or ""),
"caption": str(formats["caption"].get("natural_caption") or ""),
}
issues: list[str] = []
for formatter_name, required in expected_terms.items():
if required and not _contains_all(prompts.get(formatter_name, ""), required):
issues.append(f"{name}.{formatter_name}: missing_route_terms:{required}")
return issues
def _caption_cast_descriptor_issues(name: str, row: dict[str, Any] | None, caption_text: str) -> list[str]:
if not isinstance(row, dict):
return []
descriptor = row.get("cast_descriptor_text") or row.get("shared_cast_descriptors")
if isinstance(descriptor, list):
descriptor_text = "; ".join(str(item or "").strip() for item in descriptor if str(item or "").strip())
else:
descriptor_text = str(descriptor or "").strip()
if not descriptor_text:
return []
natural_descriptor = caption_naturalizer._natural_cast_descriptor_text(descriptor_text)
if natural_descriptor and caption_text.count(natural_descriptor) > 1:
return [f"{name}.caption: repeated_cast_descriptor"]
return []
def _trace_dict(formatter_name: str, payload: dict[str, Any]) -> tuple[dict[str, Any], str]:
trace_text = str(payload.get("route_trace_json") or "")
if not trace_text:
return {}, f"{formatter_name}: missing_route_trace"
try:
trace = json.loads(trace_text)
except json.JSONDecodeError as exc:
return {}, f"{formatter_name}: invalid_route_trace:{exc}"
if not isinstance(trace, dict):
return {}, f"{formatter_name}: route_trace_not_object"
return trace, ""
def _formatter_trace_metadata_issues(name: str, trace: dict[str, Any], row: dict[str, Any] | None) -> list[str]:
if not isinstance(row, dict):
return []
issues: list[str] = []
expected_fields = {
"metadata_category": row.get("main_category") or row.get("category"),
"metadata_subcategory": row.get("subcategory"),
"action_family": row.get("action_family"),
"position_family": row.get("position_family"),
"position_key": row.get("position_key"),
"scene_profile": row.get("scene_camera_profile_key"),
}
for key, expected in expected_fields.items():
if expected in (None, "", []):
continue
if trace.get(key) != expected:
issues.append(f"{name}: trace_{key}_mismatch:{trace.get(key)} != {expected}")
expected_position_keys = [str(value) for value in (row.get("position_keys") or []) if str(value or "").strip()]
if expected_position_keys:
trace_position_keys = [str(value) for value in (trace.get("position_keys") or [])]
for key in expected_position_keys:
if key not in trace_position_keys:
issues.append(f"{name}: trace_missing_position_key:{key}")
expected_pov_labels = [str(value) for value in (row.get("pov_character_labels") or []) if str(value or "").strip()]
if expected_pov_labels:
trace_pov_labels = [str(value) for value in (trace.get("pov_labels") or [])]
for label in expected_pov_labels:
if label not in trace_pov_labels:
issues.append(f"{name}: trace_missing_pov_label:{label}")
return issues
def _formatter_trace_issues(
name: str,
formats: dict[str, Any],
*,
target: str,
row: dict[str, Any] | None = None,
) -> list[str]:
expected_formatters = {
"krea": "krea2",
"sdxl": "sdxl",
"caption": "caption",
}
issues: list[str] = []
for formatter_name, expected_formatter in expected_formatters.items():
payload = formats[formatter_name]
trace, error = _trace_dict(f"{name}.{formatter_name}", payload)
if error:
issues.append(error)
continue
method = str(payload.get("method") or "")
branch = str(trace.get("branch") or "")
if trace.get("formatter") != expected_formatter:
issues.append(f"{name}.{formatter_name}: trace_formatter_mismatch:{trace.get('formatter')} != {expected_formatter}")
if trace.get("method") != method:
issues.append(f"{name}.{formatter_name}: trace_method_mismatch:{trace.get('method')} != {method}")
if trace.get("target") != target:
issues.append(f"{name}.{formatter_name}: trace_target_mismatch:{trace.get('target')} != {target}")
if trace.get("input_hint") != "metadata_json":
issues.append(f"{name}.{formatter_name}: trace_input_hint_mismatch:{trace.get('input_hint')}")
if branch in ("", "fallback", "text"):
issues.append(f"{name}.{formatter_name}: trace_branch_not_metadata:{branch}")
if "metadata" not in method:
issues.append(f"{name}.{formatter_name}: trace_method_not_metadata:{method}")
if "insta_of_pair" in method:
if formatter_name in ("krea", "sdxl"):
if branch != "insta_of_pair":
issues.append(f"{name}.{formatter_name}: trace_pair_branch_mismatch:{branch}")
elif "metadata(insta_of_pair)" not in method:
issues.append(f"{name}.{formatter_name}: trace_caption_pair_method_mismatch:{method}")
if trace.get("selected_side") != target:
issues.append(f"{name}.{formatter_name}: trace_selected_side_mismatch:{trace.get('selected_side')} != {target}")
elif formatter_name == "krea" and not branch.startswith("metadata("):
issues.append(f"{name}.{formatter_name}: trace_krea_metadata_branch_mismatch:{branch}")
elif formatter_name in ("sdxl", "caption") and branch != "metadata":
issues.append(f"{name}.{formatter_name}: trace_metadata_branch_mismatch:{branch}")
issues.extend(_formatter_trace_metadata_issues(f"{name}.{formatter_name}", trace, row))
return issues
def _formatter_issues(
name: str,
formats: dict[str, Any],
*,
row: dict[str, Any] | None = None,
target: str,
expected_terms: dict[str, tuple[str, ...]] | None = None,
is_pov: bool = False,
) -> list[str]:
issues: list[str] = []
krea = formats["krea"]
sdxl = formats["sdxl"]
caption = formats["caption"]
krea_prompt = str(krea.get("krea_prompt") or "")
sdxl_prompt = str(sdxl.get("sdxl_prompt") or "")
caption_text = str(caption.get("natural_caption") or "")
prompts = {
"krea": krea_prompt,
"sdxl": sdxl_prompt,
"caption": caption_text,
}
for label, value in (
(f"{name}.krea_prompt", krea_prompt),
(f"{name}.sdxl_prompt", sdxl_prompt),
(f"{name}.caption", caption_text),
):
issues.extend(_text_issues(label, value, min_len=20))
issues.extend(_formatter_trigger_issues(name, prompts))
for formatter_name, method in (
("krea", krea.get("method")),
("sdxl", sdxl.get("method")),
("caption", caption.get("method")),
):
if "metadata" not in str(method or ""):
issues.append(f"{name}.{formatter_name}: not_metadata_route:{method}")
issues.extend(_formatter_trace_issues(name, formats, target=target, row=row))
issues.extend(_caption_cast_descriptor_issues(name, row, caption_text))
for label, value in (
(f"{name}.krea_negative", krea.get("negative_prompt")),
(f"{name}.sdxl_negative", sdxl.get("negative_prompt")),
):
duplicates = _duplicate_comma_items(value)
if duplicates:
issues.append(f"{label}: duplicate_comma_items:{duplicates[:5]}")
for formatter_name, prompt in prompts.items():
lower_prompt = prompt.lower()
for leak in FORMATTER_LABEL_LEAKS:
if leak in lower_prompt:
issues.append(f"{name}.{formatter_name}: leaked_label:{leak}")
lower_krea = krea_prompt.lower()
for noise in HARDCORE_NOISE_TERMS:
if noise in lower_krea:
issues.append(f"{name}.krea_prompt: hardcore_noise:{noise}")
if isinstance(row, dict):
sdxl_lower = f", {sdxl_prompt.lower()}, "
for scope, family in (("action", row.get("action_family")), ("position", row.get("position_family"))):
route_key = f"{scope}:{str(family or '').strip()}"
for tag in sdxl_tag_policy.INCOMPATIBLE_ROUTE_TAGS.get(route_key, ()):
if f", {tag}, " in sdxl_lower:
issues.append(f"{name}.sdxl_prompt: incompatible_family_tag:{route_key}:{tag}")
issues.extend(_formatter_expectation_issues(name, formats, expected_terms))
if is_pov:
if "viewer" not in lower_krea or "first-person" not in lower_krea:
issues.append(f"{name}.krea_prompt: pov_wording_missing")
if "camera:" in krea_prompt:
issues.append(f"{name}.krea_prompt: pov_emitted_third_person_camera")
return issues
def _softcore_issues(name: str, text: Any) -> list[str]:
lower = str(text or "").lower()
return [f"{name}: softcore_noise:{term}" for term in SOFTCORE_NOISE_TERMS if term in lower]
def _row_summary(row: dict[str, Any]) -> dict[str, Any]:
return {
"category": row.get("main_category"),
"subcategory": row.get("subcategory"),
"scene": row.get("scene"),
"scene_profile": row.get("scene_camera_profile_key"),
"action_family": row.get("action_family"),
"position_family": row.get("position_family"),
"position_key": row.get("position_key"),
"position_keys": row.get("position_keys") or [],
"pov_labels": row.get("pov_character_labels") or [],
}
def _route_metadata_issues(name: str, row: dict[str, Any]) -> list[str]:
config = row.get("hardcore_position_config") if isinstance(row.get("hardcore_position_config"), dict) else {}
configured = [str(value) for value in (config.get("positions") or [])]
if not configured:
return []
available = set(str(value) for value in (row.get("position_keys") or []))
selected_available = [value for value in configured if value in available]
if selected_available and row.get("position_key") not in selected_available:
return [
f"{name}: selected_position_not_primary:{row.get('position_key')} not in {selected_available}"
]
return []
def _route_expectation_issues(name: str, row: dict[str, Any], expected_route: dict[str, Any] | None) -> list[str]:
if not expected_route:
return []
issues: list[str] = []
for key in ("action_family", "position_family", "position_key"):
expected = expected_route.get(key)
if expected and row.get(key) != expected:
issues.append(f"{name}: {key}_mismatch:{row.get(key)} != {expected}")
for key, expected_values in (("position_keys", expected_route.get("position_keys") or ()),):
current = set(str(value) for value in (row.get(key) or []))
for value in expected_values:
if str(value) not in current:
issues.append(f"{name}: missing_{key}:{value}")
return issues
def _case_report(
name: str,
metadata: dict[str, Any],
*,
target: str,
include_prompts: bool,
expected_route: dict[str, Any] | None = None,
expected_terms: dict[str, tuple[str, ...]] | None = None,
is_pov: bool = False,
) -> dict[str, Any]:
formats = _format_metadata(metadata, target)
issues = _formatter_issues(
name,
formats,
row=metadata,
target=target,
expected_terms=expected_terms,
is_pov=is_pov,
)
issues.extend(_route_metadata_issues(name, metadata))
issues.extend(_route_expectation_issues(name, metadata, expected_route))
if target == "softcore":
issues.extend(_softcore_issues(f"{name}.krea_prompt", formats["krea"].get("krea_prompt")))
report = {
"name": name,
"target": target,
"summary": _row_summary(metadata),
"methods": {
"krea": formats["krea"].get("method"),
"sdxl": formats["sdxl"].get("method"),
"caption": formats["caption"].get("method"),
},
"issues": issues,
}
if include_prompts:
report["prompts"] = {
"raw": metadata.get("prompt", ""),
"krea": formats["krea"].get("krea_prompt", ""),
"sdxl": formats["sdxl"].get("sdxl_prompt", ""),
"caption": formats["caption"].get("natural_caption", ""),
}
return report
def _pair_reports(name: str, pair: dict[str, Any], *, include_prompts: bool) -> list[dict[str, Any]]:
soft_row = dict(pair.get("softcore_row") or {})
hard_row = dict(pair.get("hardcore_row") or {})
soft_formats = _format_metadata(pair, "softcore")
hard_formats = _format_metadata(pair, "hardcore")
soft_issues = _formatter_issues(f"{name}.softcore", soft_formats, row=soft_row, target="softcore")
soft_issues.extend(_route_metadata_issues(f"{name}.softcore", soft_row))
soft_issues.extend(_softcore_issues(f"{name}.softcore.krea_prompt", soft_formats["krea"].get("krea_prompt")))
hard_is_pov = bool(hard_row.get("pov_character_labels"))
hard_issues = _formatter_issues(
f"{name}.hardcore",
hard_formats,
row=hard_row,
target="hardcore",
is_pov=hard_is_pov,
)
hard_issues.extend(_route_metadata_issues(f"{name}.hardcore", hard_row))
reports = [
{
"name": f"{name}.softcore",
"target": "softcore",
"summary": _row_summary(soft_row),
"methods": {
"krea": soft_formats["krea"].get("method"),
"sdxl": soft_formats["sdxl"].get("method"),
"caption": soft_formats["caption"].get("method"),
},
"issues": soft_issues,
},
{
"name": f"{name}.hardcore",
"target": "hardcore",
"summary": _row_summary(hard_row),
"methods": {
"krea": hard_formats["krea"].get("method"),
"sdxl": hard_formats["sdxl"].get("method"),
"caption": hard_formats["caption"].get("method"),
},
"issues": hard_issues,
},
]
if include_prompts:
reports[0]["prompts"] = {
"raw": pair.get("softcore_prompt", ""),
"krea": soft_formats["krea"].get("krea_softcore_prompt", "") or soft_formats["krea"].get("krea_prompt", ""),
"sdxl": soft_formats["sdxl"].get("sdxl_softcore_prompt", "") or soft_formats["sdxl"].get("sdxl_prompt", ""),
"caption": soft_formats["caption"].get("natural_caption", ""),
}
reports[1]["prompts"] = {
"raw": pair.get("hardcore_prompt", ""),
"krea": hard_formats["krea"].get("krea_hardcore_prompt", "") or hard_formats["krea"].get("krea_prompt", ""),
"sdxl": hard_formats["sdxl"].get("sdxl_hardcore_prompt", "") or hard_formats["sdxl"].get("sdxl_prompt", ""),
"caption": hard_formats["caption"].get("natural_caption", ""),
}
return reports
def _regular_single_case(seed: int) -> dict[str, Any]:
return pb.build_prompt_from_configs(
row_number=1,
start_index=1,
seed=seed,
category_config=pb.build_category_config_json("Casual clothes", "Casual clothes / Streetwear"),
cast_config=pb.build_cast_config_json("solo_woman", 1, 0),
seed_config=pb.build_seed_lock_config_json(base_seed=seed),
camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5),
character_cast=_character_cast(),
location_config=_coworking_location_config(),
extra_positive="simulation marker",
)
def _hardcore_single_case(
seed: int,
subcategory: str,
focus: str,
family: str,
positions: list[str] | tuple[str, ...] | str = (),
) -> dict[str, Any]:
women_count, men_count, character_cast = {
"threesome": (1, 2, _character_cast_subjects(("woman", "man", "man"))),
"group": (2, 2, _character_cast_subjects(("woman", "woman", "man", "man"))),
}.get(family, (1, 1, _character_cast()))
return pb.build_prompt(
category="Hardcore sexual poses",
subcategory=subcategory,
row_number=1,
start_index=1,
seed=seed,
clothing="random",
ethnicity="any",
poses="random",
backside_bias=0.0,
figure="random",
no_plus_women=False,
no_black=False,
minimal_clothing_ratio=-1,
standard_pose_ratio=-1,
trigger=TRIGGER,
prepend_trigger_to_prompt=True,
extra_positive="",
extra_negative="",
seed_config=pb.build_seed_lock_config_json(base_seed=seed),
women_count=women_count,
men_count=men_count,
character_cast=character_cast,
hardcore_position_config=_position_filter(focus, family, positions),
location_config=_coworking_location_config(),
camera_config=_orbit_camera(horizontal_angle=35, vertical_angle=0, zoom=6.5),
)
def _insta_pair_case(seed: int, *, pov: bool, position: str, focus: str, family: str) -> dict[str, Any]:
return pb.build_insta_of_pair(
row_number=1,
start_index=1,
seed=seed,
ethnicity="any",
figure="random",
no_plus_women=False,
no_black=False,
trigger=TRIGGER,
prepend_trigger_to_prompt=True,
seed_config=pb.build_seed_lock_config_json(base_seed=seed),
options_json=_insta_options(),
character_cast=_character_cast(pov_man=pov),
hardcore_position_config=_position_filter(focus, family, [position]),
location_config=_coworking_location_config(),
camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=6.0),
softcore_camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5),
hardcore_camera_config=_orbit_camera(horizontal_angle=68 if pov else 135, vertical_angle=20, zoom=7.5),
)
def _pair_seed_probe(seed: int, *, reroll_axis: str = "none", reroll_seed: int = -1) -> dict[str, Any]:
return pb.build_insta_of_pair(
row_number=1,
start_index=1,
seed=seed,
ethnicity="any",
figure="random",
no_plus_women=False,
no_black=False,
trigger=TRIGGER,
prepend_trigger_to_prompt=True,
seed_config=pb.build_seed_lock_config_json(base_seed=seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed),
options_json=_insta_options(),
character_cast=_random_character_cast(),
hardcore_position_config=_position_filter("penetration_only", "penetration", ["missionary", "doggy", "cowgirl"]),
location_config=_seed_probe_location_config(),
composition_config=_seed_probe_composition_config(),
camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=6.0),
softcore_camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5),
hardcore_camera_config=_orbit_camera(horizontal_angle=135, vertical_angle=20, zoom=7.5),
)
def _seed_probe_row(seed: int, *, reroll_axis: str = "none", reroll_seed: int = -1) -> dict[str, Any]:
return pb.build_prompt(
category="Hardcore sexual poses",
subcategory="Penetrative sex",
row_number=1,
start_index=1,
seed=seed,
clothing="random",
ethnicity="any",
poses="random",
backside_bias=0.0,
figure="random",
no_plus_women=False,
no_black=False,
minimal_clothing_ratio=-1,
standard_pose_ratio=-1,
trigger=TRIGGER,
prepend_trigger_to_prompt=True,
extra_positive="",
extra_negative="",
seed_config=pb.build_seed_lock_config_json(base_seed=seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed),
women_count=1,
men_count=1,
character_cast=_random_character_cast(),
hardcore_position_config=_position_filter("penetration_only", "penetration", ["missionary", "doggy", "cowgirl"]),
location_config=_seed_probe_location_config(),
composition_config=_seed_probe_composition_config(),
expression_intensity=0.75,
)
def _seed_probe_snapshot(row: dict[str, Any]) -> dict[str, Any]:
return {
"cast_descriptor_text": row.get("cast_descriptor_text"),
"scene": row.get("scene"),
"scene_text": row.get("scene_text"),
"position_key": row.get("position_key"),
"position_keys": row.get("position_keys") or [],
"item": row.get("item"),
"source_role_graph": row.get("source_role_graph"),
"character_expression_text": row.get("character_expression_text"),
"composition": row.get("composition"),
}
def _pair_seed_snapshot(pair: dict[str, Any]) -> dict[str, Any]:
soft_row = pair.get("softcore_row") if isinstance(pair.get("softcore_row"), dict) else {}
hard_row = pair.get("hardcore_row") if isinstance(pair.get("hardcore_row"), dict) else {}
return {
"shared_cast_descriptors": pair.get("shared_cast_descriptors"),
"soft_cast_descriptor_text": soft_row.get("cast_descriptor_text"),
"hard_cast_descriptor_text": hard_row.get("cast_descriptor_text"),
"soft_scene_text": soft_row.get("scene_text"),
"hard_scene_text": hard_row.get("scene_text"),
"soft_item": soft_row.get("item"),
"soft_pose": soft_row.get("pose"),
"hard_item": hard_row.get("item"),
"hard_position_key": hard_row.get("position_key"),
"hard_position_keys": hard_row.get("position_keys") or [],
"hard_source_role_graph": hard_row.get("source_role_graph"),
"soft_composition": soft_row.get("composition"),
"hard_composition": hard_row.get("composition"),
"soft_expression": soft_row.get("character_expression_text"),
"hard_expression": hard_row.get("character_expression_text"),
}
def _same_fields_issues(
name: str,
base: dict[str, Any],
rerolled: dict[str, Any],
fields: tuple[str, ...],
reroll_seed: int,
) -> list[str]:
return [
f"{name}: stable_field_changed:{field}:reroll_seed={reroll_seed}"
for field in fields
if base.get(field) != rerolled.get(field)
]
def _formatter_output_texts(row: dict[str, Any]) -> dict[str, str]:
formatted = _format_metadata(row, "single")
return {
"krea": str(formatted["krea"].get("krea_prompt") or ""),
"sdxl": str(formatted["sdxl"].get("sdxl_prompt") or ""),
"caption": str(formatted["caption"].get("natural_caption") or ""),
}
def _pair_formatter_output_texts(pair: dict[str, Any]) -> dict[str, str]:
texts: dict[str, str] = {}
for target in ("softcore", "hardcore"):
formatted = _format_metadata(pair, target)
texts[f"{target}.krea"] = str(
formatted["krea"].get(f"krea_{target}_prompt")
or formatted["krea"].get("krea_prompt")
or ""
)
texts[f"{target}.sdxl"] = str(
formatted["sdxl"].get(f"sdxl_{target}_prompt")
or formatted["sdxl"].get("sdxl_prompt")
or ""
)
texts[f"{target}.caption"] = str(formatted["caption"].get("natural_caption") or "")
return texts
def _seed_determinism_check(seed: int) -> dict[str, Any]:
first = _seed_probe_row(seed)
second = _seed_probe_row(seed)
issues: list[str] = []
if first != second:
issues.append("locked seed config did not reproduce identical row metadata")
if _formatter_output_texts(first) != _formatter_output_texts(second):
issues.append("locked seed config did not reproduce identical formatter outputs")
return {
"name": "seed_axis.locked_determinism",
"base": _row_summary(first),
"changed": False,
"issues": issues,
}
def _pair_seed_determinism_check(seed: int) -> dict[str, Any]:
first = _pair_seed_probe(seed)
second = _pair_seed_probe(seed)
issues: list[str] = []
if first != second:
issues.append("locked seed config did not reproduce identical pair metadata")
if _pair_formatter_output_texts(first) != _pair_formatter_output_texts(second):
issues.append("locked seed config did not reproduce identical pair formatter outputs")
return {
"name": "pair_seed.locked_determinism",
"base": _row_summary(first.get("hardcore_row") or {}),
"changed": False,
"issues": issues,
}
def _pair_seed_reroll_check(
seed: int,
*,
name: str,
reroll_axis: str,
changed_fields: tuple[str, ...],
stable_fields: tuple[str, ...],
base_target: str,
) -> dict[str, Any]:
base = _pair_seed_probe(seed)
base_snapshot = _pair_seed_snapshot(base)
changed = False
changed_seed = None
changed_field_names: list[str] = []
issues: list[str] = []
for reroll_seed in range(seed + 1, seed + 16):
rerolled = _pair_seed_probe(seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed)
rerolled_snapshot = _pair_seed_snapshot(rerolled)
field_issues = _same_fields_issues(name, base_snapshot, rerolled_snapshot, stable_fields, reroll_seed)
if field_issues:
issues.extend(field_issues)
break
changed_field_names = [
field for field in changed_fields if base_snapshot.get(field) != rerolled_snapshot.get(field)
]
if changed_field_names:
changed = True
changed_seed = reroll_seed
break
if not changed:
issues.append(f"{name} did not change {', '.join(changed_fields)} within 15 attempts")
return {
"name": name,
"base": _row_summary(base.get(f"{base_target}_row") or {}),
"changed": changed,
"changed_seed": changed_seed,
"changed_fields": changed_field_names,
"issues": issues,
}
def _seed_reroll_check(
seed: int,
*,
reroll_axis: str,
changed_fields: tuple[str, ...],
stable_fields: tuple[str, ...],
) -> dict[str, Any]:
name = f"seed_axis.{reroll_axis}_reroll"
base = _seed_probe_row(seed)
base_snapshot = _seed_probe_snapshot(base)
changed = False
changed_seed = None
changed_field_names: list[str] = []
issues: list[str] = []
for reroll_seed in range(seed + 1, seed + 16):
rerolled = _seed_probe_row(seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed)
rerolled_snapshot = _seed_probe_snapshot(rerolled)
field_issues = _same_fields_issues(name, base_snapshot, rerolled_snapshot, stable_fields, reroll_seed)
if field_issues:
issues.extend(field_issues)
break
changed_field_names = [
field for field in changed_fields if base_snapshot.get(field) != rerolled_snapshot.get(field)
]
if changed_field_names:
changed = True
changed_seed = reroll_seed
break
if not changed:
issues.append(f"{reroll_axis} reroll did not change {', '.join(changed_fields)} within 15 attempts")
return {
"name": name,
"base": _row_summary(base),
"changed": changed,
"changed_seed": changed_seed,
"changed_fields": changed_field_names,
"issues": issues,
}
def _pair_seed_pose_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.pose_reroll",
reroll_axis="pose",
changed_fields=("hard_position_key", "hard_item", "hard_source_role_graph"),
stable_fields=(
"shared_cast_descriptors",
"soft_cast_descriptor_text",
"hard_cast_descriptor_text",
"soft_scene_text",
"hard_scene_text",
"soft_item",
"soft_pose",
"soft_composition",
"hard_composition",
),
base_target="hardcore",
)
def _pair_seed_content_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.content_reroll",
reroll_axis="content",
changed_fields=("soft_item", "soft_pose"),
stable_fields=(
"shared_cast_descriptors",
"soft_cast_descriptor_text",
"hard_cast_descriptor_text",
"soft_scene_text",
"hard_scene_text",
"hard_item",
"hard_position_key",
"hard_source_role_graph",
"soft_composition",
"hard_composition",
),
base_target="softcore",
)
def _pair_seed_person_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.person_reroll",
reroll_axis="person",
changed_fields=("shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text"),
stable_fields=(
"soft_scene_text",
"hard_scene_text",
"soft_item",
"soft_pose",
"hard_item",
"hard_position_key",
"hard_source_role_graph",
"soft_composition",
"hard_composition",
"soft_expression",
"hard_expression",
),
base_target="hardcore",
)
def _pair_seed_scene_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.scene_reroll",
reroll_axis="scene",
changed_fields=("soft_scene_text", "hard_scene_text"),
stable_fields=(
"shared_cast_descriptors",
"soft_cast_descriptor_text",
"hard_cast_descriptor_text",
"soft_item",
"soft_pose",
"hard_item",
"hard_position_key",
"hard_source_role_graph",
"soft_composition",
"hard_composition",
"soft_expression",
"hard_expression",
),
base_target="hardcore",
)
def _pair_seed_expression_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.expression_reroll",
reroll_axis="expression",
changed_fields=("soft_expression", "hard_expression"),
stable_fields=(
"shared_cast_descriptors",
"soft_cast_descriptor_text",
"hard_cast_descriptor_text",
"soft_scene_text",
"hard_scene_text",
"soft_item",
"soft_pose",
"hard_item",
"hard_position_key",
"hard_source_role_graph",
"soft_composition",
"hard_composition",
),
base_target="hardcore",
)
def _pair_seed_composition_reroll_check(seed: int) -> dict[str, Any]:
return _pair_seed_reroll_check(
seed,
name="pair_seed.composition_reroll",
reroll_axis="composition",
changed_fields=("soft_composition", "hard_composition"),
stable_fields=(
"shared_cast_descriptors",
"soft_cast_descriptor_text",
"hard_cast_descriptor_text",
"soft_scene_text",
"hard_scene_text",
"soft_item",
"soft_pose",
"hard_item",
"hard_position_key",
"hard_source_role_graph",
"soft_expression",
"hard_expression",
),
base_target="hardcore",
)
def _seed_axis_checks(seed: int) -> list[dict[str, Any]]:
return [
_seed_determinism_check(seed),
_seed_reroll_check(
seed,
reroll_axis="person",
changed_fields=("cast_descriptor_text",),
stable_fields=("scene_text", "position_key", "item", "source_role_graph", "character_expression_text", "composition"),
),
_seed_reroll_check(
seed,
reroll_axis="scene",
changed_fields=("scene", "scene_text"),
stable_fields=("cast_descriptor_text", "position_key", "item", "source_role_graph", "character_expression_text", "composition"),
),
_seed_reroll_check(
seed,
reroll_axis="pose",
changed_fields=("position_key", "item", "source_role_graph"),
stable_fields=("cast_descriptor_text", "scene_text", "character_expression_text", "composition"),
),
_seed_reroll_check(
seed,
reroll_axis="expression",
changed_fields=("character_expression_text",),
stable_fields=("cast_descriptor_text", "scene_text", "position_key", "item", "source_role_graph", "composition"),
),
_seed_reroll_check(
seed,
reroll_axis="composition",
changed_fields=("composition",),
stable_fields=("cast_descriptor_text", "scene_text", "position_key", "item", "source_role_graph", "character_expression_text"),
),
]
def _pair_seed_checks(seed: int) -> list[dict[str, Any]]:
return [
_pair_seed_determinism_check(seed),
_pair_seed_person_reroll_check(seed),
_pair_seed_scene_reroll_check(seed),
_pair_seed_content_reroll_check(seed),
_pair_seed_pose_reroll_check(seed),
_pair_seed_expression_reroll_check(seed),
_pair_seed_composition_reroll_check(seed),
]
def _route_family_coverage_check(
name: str,
*,
expected: set[str],
observed: set[str],
) -> dict[str, Any]:
missing = sorted(expected - observed)
unexpected = sorted(observed - expected)
issues: list[str] = []
if missing:
issues.append(f"{name}: missing_family_coverage:{missing}")
if unexpected:
issues.append(f"{name}: unexpected_family_coverage:{unexpected}")
return {
"name": name,
"expected": sorted(expected),
"observed": sorted(observed),
"missing": missing,
"unexpected": unexpected,
"issues": issues,
}
def _route_family_coverage_checks(cases: list[dict[str, Any]]) -> list[dict[str, Any]]:
summaries = [
case.get("summary") or {}
for case in cases
if case.get("target") in ("single", "hardcore")
]
observed_actions = {
hardcore_action_metadata.normalize_hardcore_action_family(summary.get("action_family"), "")
for summary in summaries
}
observed_actions.discard("")
observed_positions = {
hardcore_position_config.normalize_hardcore_position_family(summary.get("position_family"), "")
for summary in summaries
}
observed_positions.discard("")
expected_actions = set(hardcore_action_metadata.HARDCORE_ACTION_FAMILY_CHOICES) - ROUTE_SIM_ACTION_FAMILY_EXCLUSIONS
expected_positions = set(hardcore_position_config.hardcore_position_family_choices()) - ROUTE_SIM_POSITION_FAMILY_EXCLUSIONS
return [
_route_family_coverage_check(
"route_coverage.action_families",
expected=expected_actions,
observed=observed_actions,
),
_route_family_coverage_check(
"route_coverage.position_families",
expected=expected_positions,
observed=observed_positions,
),
]
def run_simulation(seed: int = 3901, *, include_prompts: bool = False) -> dict[str, Any]:
cases: list[dict[str, Any]] = []
regular = _regular_single_case(seed)
cases.append(_case_report("regular.single.casual", regular, target="single", include_prompts=include_prompts))
for offset, route_case in enumerate(HARDCORE_ROUTE_CASES, start=10):
row = _hardcore_single_case(
seed + offset,
str(route_case["subcategory"]),
str(route_case["focus"]),
str(route_case["family"]),
route_case.get("positions") or (),
)
cases.append(
_case_report(
str(route_case["name"]),
row,
target="single",
include_prompts=include_prompts,
expected_route=route_case.get("expected_route"),
expected_terms=route_case.get("expected_terms"),
)
)
penetration_pair = _insta_pair_case(seed + 1, pov=False, position="doggy", focus="penetration_only", family="penetration")
cases.extend(_pair_reports("insta_pair.penetration", penetration_pair, include_prompts=include_prompts))
pov_pair = _insta_pair_case(seed + 2, pov=True, position="penis_licking", focus="outercourse_only", family="outercourse")
cases.extend(_pair_reports("insta_pair.pov_outercourse", pov_pair, include_prompts=include_prompts))
coverage_checks = _route_family_coverage_checks(cases)
axis_checks = _seed_axis_checks(seed + 3)
pair_seed_checks = _pair_seed_checks(seed + 4)
issues = [
{"case": case["name"], "issue": issue}
for case in cases
for issue in case.get("issues", [])
]
issues.extend(
{"case": check["name"], "issue": issue}
for check in coverage_checks
for issue in check.get("issues", [])
)
issues.extend(
{"case": check["name"], "issue": issue}
for check in axis_checks
for issue in check.get("issues", [])
)
issues.extend(
{"case": check["name"], "issue": issue}
for check in pair_seed_checks
for issue in check.get("issues", [])
)
return {
"summary": {
"seed": seed,
"cases": len(cases),
"coverage_checks": len(coverage_checks),
"axis_checks": len(axis_checks),
"pair_seed_checks": len(pair_seed_checks),
"issues": len(issues),
},
"issues": issues,
"cases": cases,
"coverage_checks": coverage_checks,
"axis_checks": axis_checks,
"pair_seed_checks": pair_seed_checks,
}
def run_simulation_sweep(
seed: int = 3901,
*,
count: int = 3,
seed_step: int = 101,
include_prompts: bool = False,
) -> dict[str, Any]:
count = max(1, int(count))
seed_step = int(seed_step)
seeds = [seed + index * seed_step for index in range(count)]
runs = [run_simulation(seed=current_seed, include_prompts=include_prompts) for current_seed in seeds]
issues: list[dict[str, Any]] = []
for run in runs:
run_seed = (run.get("summary") or {}).get("seed")
issues.extend({"seed": run_seed, **issue} for issue in run.get("issues") or [])
return {
"summary": {
"seed": seed,
"seed_step": seed_step,
"seeds": seeds,
"runs": len(runs),
"cases": sum((run.get("summary") or {}).get("cases", 0) for run in runs),
"coverage_checks": sum((run.get("summary") or {}).get("coverage_checks", 0) for run in runs),
"axis_checks": sum((run.get("summary") or {}).get("axis_checks", 0) for run in runs),
"pair_seed_checks": sum((run.get("summary") or {}).get("pair_seed_checks", 0) for run in runs),
"issues": len(issues),
},
"issues": issues,
"runs": runs,
}
def _print_text_report(report: dict[str, Any]) -> None:
summary = report.get("summary") or {}
print(
f"Prompt route simulation: seed={summary.get('seed')} "
f"cases={summary.get('cases')} coverage_checks={summary.get('coverage_checks')} "
f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} "
f"issues={summary.get('issues')}"
)
for case in report.get("cases") or []:
summary_text = case.get("summary") or {}
route = ", ".join(f"{key}={value}" for key, value in summary_text.items() if value not in (None, "", []))
print(f"- {case.get('name')} [{case.get('target')}]: {route}")
for issue in case.get("issues") or []:
print(f" ISSUE {issue}")
for check in report.get("coverage_checks") or []:
print(
f"- {check.get('name')}: "
f"observed={', '.join(check.get('observed') or [])}"
)
for issue in check.get("issues") or []:
print(f" ISSUE {issue}")
for check in report.get("axis_checks") or []:
print(f"- {check.get('name')}: changed={check.get('changed')}")
for issue in check.get("issues") or []:
print(f" ISSUE {issue}")
for check in report.get("pair_seed_checks") or []:
print(f"- {check.get('name')}: changed={check.get('changed')}")
for issue in check.get("issues") or []:
print(f" ISSUE {issue}")
def _print_sweep_report(report: dict[str, Any]) -> None:
summary = report.get("summary") or {}
seeds = ", ".join(str(seed) for seed in (summary.get("seeds") or []))
print(
f"Prompt route simulation sweep: seed={summary.get('seed')} "
f"seed_step={summary.get('seed_step')} runs={summary.get('runs')} "
f"seeds={seeds} cases={summary.get('cases')} coverage_checks={summary.get('coverage_checks')} "
f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} "
f"issues={summary.get('issues')}"
)
for run in report.get("runs") or []:
run_summary = run.get("summary") or {}
print(
f"- seed {run_summary.get('seed')}: "
f"cases={run_summary.get('cases')} issues={run_summary.get('issues')}"
)
for issue in run.get("issues") or []:
print(f" ISSUE {issue.get('case')}: {issue.get('issue')}")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--seed", type=int, default=3901, help="Base seed for deterministic simulations.")
parser.add_argument("--sweep-count", type=int, default=1, help="Run this many seed-spaced simulations.")
parser.add_argument("--seed-step", type=int, default=101, help="Seed increment used by --sweep-count.")
parser.add_argument("--json", action="store_true", help="Print the full JSON report.")
parser.add_argument("--include-prompts", action="store_true", help="Include raw and formatted prompt text in the report.")
parser.add_argument("--fail-on-issues", action="store_true", help="Exit with code 1 when any issue is reported.")
args = parser.parse_args(argv)
if args.sweep_count > 1:
report = run_simulation_sweep(
seed=args.seed,
count=args.sweep_count,
seed_step=args.seed_step,
include_prompts=args.include_prompts,
)
else:
report = run_simulation(seed=args.seed, include_prompts=args.include_prompts)
if args.json:
print(json.dumps(report, ensure_ascii=True, indent=2, sort_keys=True))
elif args.sweep_count > 1:
_print_sweep_report(report)
else:
_print_text_report(report)
return 1 if args.fail_on_issues and report.get("issues") else 0
if __name__ == "__main__":
raise SystemExit(main())