#!/usr/bin/env python3 """Run representative prompt-route simulations and report quality issues. This is a diagnostic tool, not a golden snapshot test. It builds a small set of metadata rows/pairs, sends them through the Krea2, SDXL, and caption routes, and reports route/noise/seed-control problems in a JSON-friendly structure. """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) import caption_naturalizer # noqa: E402 import hardcore_action_metadata # noqa: E402 import hardcore_position_config # noqa: E402 import krea_formatter # noqa: E402 import prompt_builder as pb # noqa: E402 import sdxl_formatter # noqa: E402 import sdxl_tag_policy # noqa: E402 TRIGGER = "sxcppnl7" SDXL_TRIGGER = "mythp0rt" OLD_TRIGGER = "sxcpinup_coloredpencil" SOFTCORE_NOISE_TERMS = ( "the image focuses", "softcore version", "non-explicit teaser setup", "no sex act", "genital contact", "keep the softcore version", "focused on woman a alone", ) FORMATTER_LABEL_LEAKS = ( "body exposure:", "camera:", "camera control:", "characters:", "clothing:", "clothing state:", "composition:", "facial expression:", "facial expressions:", "expressions:", "hardcore setup:", "outfit:", "pose:", "pov participant:", "role graph:", "setting:", "sexual pose:", "sexual scene:", "softcore setup:", "softcore visual reference:", "cast descriptors:", "shared cast descriptors:", "teaser outfit detail:", "visual clothing state:", "visible remaining styling:", ) HARDCORE_NOISE_TERMS = ( "softcore visual reference", "the same visibly adult", "the scene contains", ) def _json(value: Any) -> str: return json.dumps(value, ensure_ascii=True, sort_keys=True) def _clean_key(value: Any) -> str: return re.sub(r"[^a-z0-9]+", " ", str(value or "").lower()).strip() def _character_cast(*, pov_man: bool = False) -> str: cast = pb.build_character_slot_json( subject_type="woman", label="A", age="25-year-old adult", ethnicity="western_european", figure="balanced", body="slim busty", hair_color="blonde", hair_length="long", hair_style="loose_waves", descriptor_detail="full", expression_intensity=0.55, softcore_expression_intensity=0.35, hardcore_expression_intensity=0.75, )["character_cast"] return pb.build_character_slot_json( subject_type="man", label="A", age="40-year-old adult", ethnicity="western_european", body="average", descriptor_detail="compact", expression_intensity=0.45, softcore_expression_intensity=0.25, hardcore_expression_intensity=0.65, presence_mode="pov" if pov_man else "visible", character_cast=cast, )["character_cast"] def _character_cast_subjects(subjects: list[str] | tuple[str, ...]) -> str: cast = "" counts = {"woman": 0, "man": 0} for subject in subjects: subject = str(subject) counts[subject] += 1 label = chr(ord("A") + counts[subject] - 1) cast = pb.build_character_slot_json( subject_type=subject, label=label, age="25-year-old adult" if subject == "woman" else "40-year-old adult", ethnicity="western_european", figure="balanced", body="slim busty" if subject == "woman" else "average", descriptor_detail="compact", expression_intensity=0.55, softcore_expression_intensity=0.35, hardcore_expression_intensity=0.75, character_cast=cast, )["character_cast"] return cast def _random_character_cast() -> str: cast = pb.build_character_slot_json( subject_type="woman", label="A", age="random", ethnicity="random", figure="random", body="random", hair_color="random", hair_length="random", hair_style="random", descriptor_detail="full", )["character_cast"] return pb.build_character_slot_json( subject_type="man", label="A", age="random", ethnicity="random", body="random", descriptor_detail="compact", character_cast=cast, )["character_cast"] def _coworking_location_config() -> str: return pb.build_location_pool_json( enabled=True, combine_mode="replace", preset="custom_only", custom_locations=( "coworking_sim: coworking lounge with tall windows, warm desks, " "laptop tables, glass partition seams, repeated desk rows, plants, " "and soft shared-office depth" ), ) def _seed_probe_location_config() -> str: return pb.build_location_pool_json( enabled=True, combine_mode="replace", preset="custom_only", custom_locations=( "seed_coworking_desk: coworking desk row with warm lamps and laptops\n" "seed_coworking_glass: coworking glass office with plants and partition seams\n" "seed_coworking_windows: coworking window lounge with repeated desks and city light" ), ) def _seed_probe_composition_config() -> str: return pb.build_composition_pool_json( enabled=True, combine_mode="replace", preset="custom_only", custom_compositions=( "seed composition near a desk edge\n" "seed composition through a glass partition\n" "seed composition down repeating desk rows" ), ) def _orbit_camera(horizontal_angle: int = 45, vertical_angle: int = 0, zoom: float = 6.0) -> str: return pb.build_camera_orbit_config_json( enabled=True, camera_mode="standard", horizontal_angle=horizontal_angle, vertical_angle=vertical_angle, zoom=zoom, framing="from_zoom", subject_focus="action", lens="auto", orientation="auto", phone_visibility="auto", priority="soft_hint", camera_detail="compact", include_degrees=True, ) def _position_filter(focus: str, family: str, positions: list[str] | tuple[str, ...] | str) -> str: position_config = pb.build_hardcore_position_pool_json( combine_mode="replace", family=family, selected_positions=positions, ) kwargs = { "allow_toys": False, "allow_double": False, "allow_penetration": focus in ("penetration_only", "keep_pool"), "allow_foreplay": focus in ("foreplay_only", "keep_pool"), "allow_interaction": focus in ("interaction_only", "keep_pool"), "allow_manual": focus in ("manual_only", "keep_pool"), "allow_oral": focus in ("oral_only", "keep_pool"), "allow_outercourse": focus in ("outercourse_only", "keep_pool"), "allow_anal": focus in ("anal_only", "keep_pool"), "allow_climax": focus in ("climax_only", "keep_pool"), } return pb.build_hardcore_action_filter_json( hardcore_position_config=position_config, focus=focus, **kwargs, ) def _insta_options() -> str: return pb.build_insta_of_options_json( softcore_cast="same_as_hardcore", hardcore_cast="couple", hardcore_women_count=1, hardcore_men_count=1, softcore_level="lingerie_tease", hardcore_level="hardcore", softcore_expression_enabled=True, hardcore_expression_enabled=True, softcore_expression_intensity=0.35, hardcore_expression_intensity=0.75, platform_style="hybrid", continuity="same_creator_same_room", hardcore_clothing_continuity="explicit_nude", softcore_camera_mode="from_camera_config", hardcore_camera_mode="from_camera_config", camera_detail="compact", hardcore_detail_density="balanced", ) HARDCORE_ROUTE_CASES = ( { "name": "hardcore.single.oral", "subcategory": "Oral sex", "focus": "oral_only", "family": "oral", "expected_route": {"action_family": "oral", "position_family": "oral"}, "expected_terms": { "krea": ("mouth",), "sdxl": ("oral sex",), "caption": ("oral action",), }, }, { "name": "hardcore.single.manual", "subcategory": "Manual stimulation", "focus": "manual_only", "family": "manual", "expected_route": {"action_family": "manual", "position_family": "manual"}, "expected_terms": { "krea": ("hand",), "sdxl": ("manual stimulation",), "caption": ("manual action",), }, }, { "name": "hardcore.single.outercourse", "subcategory": "Outercourse and genital teasing", "focus": "outercourse_only", "family": "outercourse", "expected_route": {"action_family": "outercourse", "position_family": "outercourse"}, "expected_terms": { "krea": ("penis",), "sdxl": ("outercourse",), "caption": ("non-penetrative action",), }, }, { "name": "hardcore.single.foreplay", "subcategory": "Foreplay and teasing", "focus": "foreplay_only", "family": "foreplay", "positions": ("undressing",), "expected_route": {"action_family": "foreplay", "position_family": "foreplay"}, "expected_terms": { "krea": ("clothing",), "sdxl": ("foreplay",), "caption": ("foreplay action",), }, }, { "name": "hardcore.single.interaction", "subcategory": "Clothing and position transitions", "focus": "interaction_only", "family": "interaction", "positions": ("position_transition",), "expected_route": {"action_family": "foreplay", "position_family": "interaction"}, "expected_terms": { "krea": ("clothing",), "sdxl": ("interaction",), "caption": ("interaction beat",), }, }, { "name": "hardcore.single.anal", "subcategory": "Anal and double penetration", "focus": "anal_only", "family": "anal", "positions": ("doggy", "face_down_ass_up"), "expected_route": {"action_family": "anal", "position_family": "anal"}, "expected_terms": { "krea": ("ass",), "sdxl": ("anal sex",), "caption": ("anal action",), }, }, { "name": "hardcore.single.threesome", "subcategory": "Threesomes", "focus": "threesome_only", "family": "threesome", "expected_route": {"action_family": "threesome", "position_family": "threesome"}, "expected_terms": { "krea": ("three",), "sdxl": ("threesome",), "caption": ("three-person action",), }, }, { "name": "hardcore.single.group", "subcategory": "Group sex and orgy", "focus": "group_only", "family": "group", "expected_route": {"action_family": "group", "position_family": "group"}, "expected_terms": { "krea": ("group",), "sdxl": ("group sex",), "caption": ("group action",), }, }, { "name": "hardcore.single.climax", "subcategory": "Cumshot and climax", "focus": "climax_only", "family": "climax", "expected_route": {"action_family": "climax", "position_family": "climax"}, "expected_terms": { "krea": ("semen",), "sdxl": ("climax", "semen"), "caption": ("climax action",), }, }, ) ROUTE_SIM_ACTION_FAMILY_EXCLUSIONS = { "default", # Dedicated double-contact route assertions live in prompt_smoke because they # need a multi-person, position-specific fixture rather than a broad family case. "toy_double", } ROUTE_SIM_POSITION_FAMILY_EXCLUSIONS = {"any"} def _format_metadata(metadata: dict[str, Any], target: str) -> dict[str, Any]: metadata_json = _json(metadata) krea = krea_formatter.format_krea2_prompt( "", metadata_json=metadata_json, input_hint="metadata_json", target=target, detail_level="balanced", style_mode="preserve", ) sdxl = sdxl_formatter.format_sdxl_prompt( "", metadata_json=metadata_json, input_hint="metadata_json", target=target, formatter_profile="manual_controls", style_preset="flat_vector_pony", quality_preset="pony_high", trigger=SDXL_TRIGGER, prepend_trigger=True, preserve_trigger=False, nude_weight=1.29, ) caption, caption_method, caption_trace = caption_naturalizer.naturalize_caption_with_trace( "", metadata_json=metadata_json, input_hint="metadata_json", target=target, trigger=TRIGGER, include_trigger=True, detail_level="balanced", style_policy="drop_style_tail", caption_profile="training_dense", ) return { "krea": krea, "sdxl": sdxl, "caption": { "natural_caption": caption, "method": caption_method, "route_trace_json": caption_trace, }, } def _duplicate_comma_items(value: Any) -> list[str]: items = [_clean_key(part) for part in str(value or "").split(",")] items = [part for part in items if part] return sorted({part for part in items if items.count(part) > 1}) def _text_issues(label: str, value: Any, *, min_len: int = 8) -> list[str]: text = str(value or "") issues: list[str] = [] if len(text.strip()) < min_len: issues.append(f"{label}: empty_or_short") if "None" in text: issues.append(f"{label}: leaked_None") if " " in text: issues.append(f"{label}: repeated_spaces") if " ," in text or " ." in text: issues.append(f"{label}: bad_punctuation_spacing") return issues def _contains_all(text: str, required: tuple[str, ...]) -> bool: lower = text.lower() return all(term.lower() in lower for term in required) def _trigger_count(text: str, trigger: str) -> int: return len(re.findall(rf"(? list[str]: issues: list[str] = [] krea_prompt = prompts["krea"] sdxl_prompt = prompts["sdxl"] caption_text = prompts["caption"] for trigger in (TRIGGER, SDXL_TRIGGER, OLD_TRIGGER): if _trigger_count(krea_prompt, trigger): issues.append(f"{name}.krea_prompt: unexpected_trigger:{trigger}") sdxl_count = _trigger_count(sdxl_prompt, SDXL_TRIGGER) if sdxl_count != 1: issues.append(f"{name}.sdxl_prompt: trigger_count:{SDXL_TRIGGER}:{sdxl_count}") for trigger in (TRIGGER, OLD_TRIGGER): if _trigger_count(sdxl_prompt, trigger): issues.append(f"{name}.sdxl_prompt: unexpected_trigger:{trigger}") caption_count = _trigger_count(caption_text, TRIGGER) if caption_count != 1: issues.append(f"{name}.caption: trigger_count:{TRIGGER}:{caption_count}") for trigger in (SDXL_TRIGGER, OLD_TRIGGER): if _trigger_count(caption_text, trigger): issues.append(f"{name}.caption: unexpected_trigger:{trigger}") return issues def _formatter_expectation_issues( name: str, formats: dict[str, Any], expected_terms: dict[str, tuple[str, ...]] | None, ) -> list[str]: if not expected_terms: return [] prompts = { "krea": str(formats["krea"].get("krea_prompt") or ""), "sdxl": str(formats["sdxl"].get("sdxl_prompt") or ""), "caption": str(formats["caption"].get("natural_caption") or ""), } issues: list[str] = [] for formatter_name, required in expected_terms.items(): if required and not _contains_all(prompts.get(formatter_name, ""), required): issues.append(f"{name}.{formatter_name}: missing_route_terms:{required}") return issues def _caption_cast_descriptor_issues(name: str, row: dict[str, Any] | None, caption_text: str) -> list[str]: if not isinstance(row, dict): return [] descriptor = row.get("cast_descriptor_text") or row.get("shared_cast_descriptors") if isinstance(descriptor, list): descriptor_text = "; ".join(str(item or "").strip() for item in descriptor if str(item or "").strip()) else: descriptor_text = str(descriptor or "").strip() if not descriptor_text: return [] natural_descriptor = caption_naturalizer._natural_cast_descriptor_text(descriptor_text) if natural_descriptor and caption_text.count(natural_descriptor) > 1: return [f"{name}.caption: repeated_cast_descriptor"] return [] def _caption_expression_grammar_issues(name: str, caption_text: str) -> list[str]: if re.search( r"\b(?:with|are|show|shows|is|include|includes)\s+(?:woman|man) [a-z]\s+has\b", caption_text, flags=re.IGNORECASE, ): return [f"{name}.caption: character_expression_has_grammar"] return [] def _trace_dict(formatter_name: str, payload: dict[str, Any]) -> tuple[dict[str, Any], str]: trace_text = str(payload.get("route_trace_json") or "") if not trace_text: return {}, f"{formatter_name}: missing_route_trace" try: trace = json.loads(trace_text) except json.JSONDecodeError as exc: return {}, f"{formatter_name}: invalid_route_trace:{exc}" if not isinstance(trace, dict): return {}, f"{formatter_name}: route_trace_not_object" return trace, "" def _formatter_trace_metadata_issues(name: str, trace: dict[str, Any], row: dict[str, Any] | None) -> list[str]: if not isinstance(row, dict): return [] issues: list[str] = [] expected_fields = { "metadata_category": row.get("main_category") or row.get("category"), "metadata_subcategory": row.get("subcategory"), "action_family": row.get("action_family"), "position_family": row.get("position_family"), "position_key": row.get("position_key"), "scene_profile": row.get("scene_camera_profile_key"), } for key, expected in expected_fields.items(): if expected in (None, "", []): continue if trace.get(key) != expected: issues.append(f"{name}: trace_{key}_mismatch:{trace.get(key)} != {expected}") expected_position_keys = [str(value) for value in (row.get("position_keys") or []) if str(value or "").strip()] if expected_position_keys: trace_position_keys = [str(value) for value in (trace.get("position_keys") or [])] for key in expected_position_keys: if key not in trace_position_keys: issues.append(f"{name}: trace_missing_position_key:{key}") expected_pov_labels = [str(value) for value in (row.get("pov_character_labels") or []) if str(value or "").strip()] if expected_pov_labels: trace_pov_labels = [str(value) for value in (trace.get("pov_labels") or [])] for label in expected_pov_labels: if label not in trace_pov_labels: issues.append(f"{name}: trace_missing_pov_label:{label}") return issues def _formatter_trace_issues( name: str, formats: dict[str, Any], *, target: str, row: dict[str, Any] | None = None, ) -> list[str]: expected_formatters = { "krea": "krea2", "sdxl": "sdxl", "caption": "caption", } issues: list[str] = [] for formatter_name, expected_formatter in expected_formatters.items(): payload = formats[formatter_name] trace, error = _trace_dict(f"{name}.{formatter_name}", payload) if error: issues.append(error) continue method = str(payload.get("method") or "") branch = str(trace.get("branch") or "") if trace.get("formatter") != expected_formatter: issues.append(f"{name}.{formatter_name}: trace_formatter_mismatch:{trace.get('formatter')} != {expected_formatter}") if trace.get("method") != method: issues.append(f"{name}.{formatter_name}: trace_method_mismatch:{trace.get('method')} != {method}") if trace.get("target") != target: issues.append(f"{name}.{formatter_name}: trace_target_mismatch:{trace.get('target')} != {target}") if trace.get("input_hint") != "metadata_json": issues.append(f"{name}.{formatter_name}: trace_input_hint_mismatch:{trace.get('input_hint')}") if branch in ("", "fallback", "text"): issues.append(f"{name}.{formatter_name}: trace_branch_not_metadata:{branch}") if "metadata" not in method: issues.append(f"{name}.{formatter_name}: trace_method_not_metadata:{method}") if "insta_of_pair" in method: if formatter_name in ("krea", "sdxl"): if branch != "insta_of_pair": issues.append(f"{name}.{formatter_name}: trace_pair_branch_mismatch:{branch}") elif "metadata(insta_of_pair)" not in method: issues.append(f"{name}.{formatter_name}: trace_caption_pair_method_mismatch:{method}") if trace.get("selected_side") != target: issues.append(f"{name}.{formatter_name}: trace_selected_side_mismatch:{trace.get('selected_side')} != {target}") elif formatter_name == "krea" and not branch.startswith("metadata("): issues.append(f"{name}.{formatter_name}: trace_krea_metadata_branch_mismatch:{branch}") elif formatter_name in ("sdxl", "caption") and branch != "metadata": issues.append(f"{name}.{formatter_name}: trace_metadata_branch_mismatch:{branch}") issues.extend(_formatter_trace_metadata_issues(f"{name}.{formatter_name}", trace, row)) return issues def _formatter_issues( name: str, formats: dict[str, Any], *, row: dict[str, Any] | None = None, target: str, expected_terms: dict[str, tuple[str, ...]] | None = None, is_pov: bool = False, ) -> list[str]: issues: list[str] = [] krea = formats["krea"] sdxl = formats["sdxl"] caption = formats["caption"] krea_prompt = str(krea.get("krea_prompt") or "") sdxl_prompt = str(sdxl.get("sdxl_prompt") or "") caption_text = str(caption.get("natural_caption") or "") prompts = { "krea": krea_prompt, "sdxl": sdxl_prompt, "caption": caption_text, } for label, value in ( (f"{name}.krea_prompt", krea_prompt), (f"{name}.sdxl_prompt", sdxl_prompt), (f"{name}.caption", caption_text), ): issues.extend(_text_issues(label, value, min_len=20)) issues.extend(_formatter_trigger_issues(name, prompts)) for formatter_name, method in ( ("krea", krea.get("method")), ("sdxl", sdxl.get("method")), ("caption", caption.get("method")), ): if "metadata" not in str(method or ""): issues.append(f"{name}.{formatter_name}: not_metadata_route:{method}") issues.extend(_formatter_trace_issues(name, formats, target=target, row=row)) issues.extend(_caption_cast_descriptor_issues(name, row, caption_text)) issues.extend(_caption_expression_grammar_issues(name, caption_text)) for label, value in ( (f"{name}.krea_negative", krea.get("negative_prompt")), (f"{name}.sdxl_negative", sdxl.get("negative_prompt")), ): duplicates = _duplicate_comma_items(value) if duplicates: issues.append(f"{label}: duplicate_comma_items:{duplicates[:5]}") for formatter_name, prompt in prompts.items(): lower_prompt = prompt.lower() for leak in FORMATTER_LABEL_LEAKS: if leak in lower_prompt: issues.append(f"{name}.{formatter_name}: leaked_label:{leak}") lower_krea = krea_prompt.lower() for noise in HARDCORE_NOISE_TERMS: if noise in lower_krea: issues.append(f"{name}.krea_prompt: hardcore_noise:{noise}") if isinstance(row, dict): sdxl_lower = f", {sdxl_prompt.lower()}, " for scope, family in (("action", row.get("action_family")), ("position", row.get("position_family"))): route_key = f"{scope}:{str(family or '').strip()}" for tag in sdxl_tag_policy.INCOMPATIBLE_ROUTE_TAGS.get(route_key, ()): if f", {tag}, " in sdxl_lower: issues.append(f"{name}.sdxl_prompt: incompatible_family_tag:{route_key}:{tag}") issues.extend(_formatter_expectation_issues(name, formats, expected_terms)) if is_pov: if "viewer" not in lower_krea or "first-person" not in lower_krea: issues.append(f"{name}.krea_prompt: pov_wording_missing") if "camera:" in krea_prompt: issues.append(f"{name}.krea_prompt: pov_emitted_third_person_camera") return issues def _softcore_issues(name: str, text: Any) -> list[str]: lower = str(text or "").lower() return [f"{name}: softcore_noise:{term}" for term in SOFTCORE_NOISE_TERMS if term in lower] def _row_summary(row: dict[str, Any]) -> dict[str, Any]: return { "category": row.get("main_category"), "subcategory": row.get("subcategory"), "scene": row.get("scene"), "scene_profile": row.get("scene_camera_profile_key"), "action_family": row.get("action_family"), "position_family": row.get("position_family"), "position_key": row.get("position_key"), "position_keys": row.get("position_keys") or [], "pov_labels": row.get("pov_character_labels") or [], } def _route_metadata_issues(name: str, row: dict[str, Any]) -> list[str]: config = row.get("hardcore_position_config") if isinstance(row.get("hardcore_position_config"), dict) else {} configured = [str(value) for value in (config.get("positions") or [])] if not configured: return [] available = set(str(value) for value in (row.get("position_keys") or [])) selected_available = [value for value in configured if value in available] if selected_available and row.get("position_key") not in selected_available: return [ f"{name}: selected_position_not_primary:{row.get('position_key')} not in {selected_available}" ] return [] def _route_expectation_issues(name: str, row: dict[str, Any], expected_route: dict[str, Any] | None) -> list[str]: if not expected_route: return [] issues: list[str] = [] for key in ("action_family", "position_family", "position_key"): expected = expected_route.get(key) if expected and row.get(key) != expected: issues.append(f"{name}: {key}_mismatch:{row.get(key)} != {expected}") for key, expected_values in (("position_keys", expected_route.get("position_keys") or ()),): current = set(str(value) for value in (row.get(key) or [])) for value in expected_values: if str(value) not in current: issues.append(f"{name}: missing_{key}:{value}") return issues def _case_report( name: str, metadata: dict[str, Any], *, target: str, include_prompts: bool, expected_route: dict[str, Any] | None = None, expected_terms: dict[str, tuple[str, ...]] | None = None, is_pov: bool = False, ) -> dict[str, Any]: formats = _format_metadata(metadata, target) issues = _formatter_issues( name, formats, row=metadata, target=target, expected_terms=expected_terms, is_pov=is_pov, ) issues.extend(_route_metadata_issues(name, metadata)) issues.extend(_route_expectation_issues(name, metadata, expected_route)) if target == "softcore": issues.extend(_softcore_issues(f"{name}.krea_prompt", formats["krea"].get("krea_prompt"))) report = { "name": name, "target": target, "summary": _row_summary(metadata), "methods": { "krea": formats["krea"].get("method"), "sdxl": formats["sdxl"].get("method"), "caption": formats["caption"].get("method"), }, "issues": issues, } if include_prompts: report["prompts"] = { "raw": metadata.get("prompt", ""), "krea": formats["krea"].get("krea_prompt", ""), "sdxl": formats["sdxl"].get("sdxl_prompt", ""), "caption": formats["caption"].get("natural_caption", ""), } return report def _pair_reports(name: str, pair: dict[str, Any], *, include_prompts: bool) -> list[dict[str, Any]]: soft_row = dict(pair.get("softcore_row") or {}) hard_row = dict(pair.get("hardcore_row") or {}) soft_formats = _format_metadata(pair, "softcore") hard_formats = _format_metadata(pair, "hardcore") soft_issues = _formatter_issues(f"{name}.softcore", soft_formats, row=soft_row, target="softcore") soft_issues.extend(_route_metadata_issues(f"{name}.softcore", soft_row)) soft_issues.extend(_softcore_issues(f"{name}.softcore.krea_prompt", soft_formats["krea"].get("krea_prompt"))) hard_is_pov = bool(hard_row.get("pov_character_labels")) hard_issues = _formatter_issues( f"{name}.hardcore", hard_formats, row=hard_row, target="hardcore", is_pov=hard_is_pov, ) hard_issues.extend(_route_metadata_issues(f"{name}.hardcore", hard_row)) reports = [ { "name": f"{name}.softcore", "target": "softcore", "summary": _row_summary(soft_row), "methods": { "krea": soft_formats["krea"].get("method"), "sdxl": soft_formats["sdxl"].get("method"), "caption": soft_formats["caption"].get("method"), }, "issues": soft_issues, }, { "name": f"{name}.hardcore", "target": "hardcore", "summary": _row_summary(hard_row), "methods": { "krea": hard_formats["krea"].get("method"), "sdxl": hard_formats["sdxl"].get("method"), "caption": hard_formats["caption"].get("method"), }, "issues": hard_issues, }, ] if include_prompts: reports[0]["prompts"] = { "raw": pair.get("softcore_prompt", ""), "krea": soft_formats["krea"].get("krea_softcore_prompt", "") or soft_formats["krea"].get("krea_prompt", ""), "sdxl": soft_formats["sdxl"].get("sdxl_softcore_prompt", "") or soft_formats["sdxl"].get("sdxl_prompt", ""), "caption": soft_formats["caption"].get("natural_caption", ""), } reports[1]["prompts"] = { "raw": pair.get("hardcore_prompt", ""), "krea": hard_formats["krea"].get("krea_hardcore_prompt", "") or hard_formats["krea"].get("krea_prompt", ""), "sdxl": hard_formats["sdxl"].get("sdxl_hardcore_prompt", "") or hard_formats["sdxl"].get("sdxl_prompt", ""), "caption": hard_formats["caption"].get("natural_caption", ""), } return reports def _regular_single_case(seed: int) -> dict[str, Any]: return pb.build_prompt_from_configs( row_number=1, start_index=1, seed=seed, category_config=pb.build_category_config_json("Casual clothes", "Casual clothes / Streetwear"), cast_config=pb.build_cast_config_json("solo_woman", 1, 0), seed_config=pb.build_seed_lock_config_json(base_seed=seed), camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5), character_cast=_character_cast(), location_config=_coworking_location_config(), extra_positive="simulation marker", ) def _hardcore_single_case( seed: int, subcategory: str, focus: str, family: str, positions: list[str] | tuple[str, ...] | str = (), ) -> dict[str, Any]: women_count, men_count, character_cast = { "threesome": (1, 2, _character_cast_subjects(("woman", "man", "man"))), "group": (2, 2, _character_cast_subjects(("woman", "woman", "man", "man"))), }.get(family, (1, 1, _character_cast())) return pb.build_prompt( category="Hardcore sexual poses", subcategory=subcategory, row_number=1, start_index=1, seed=seed, clothing="random", ethnicity="any", poses="random", backside_bias=0.0, figure="random", no_plus_women=False, no_black=False, minimal_clothing_ratio=-1, standard_pose_ratio=-1, trigger=TRIGGER, prepend_trigger_to_prompt=True, extra_positive="", extra_negative="", seed_config=pb.build_seed_lock_config_json(base_seed=seed), women_count=women_count, men_count=men_count, character_cast=character_cast, hardcore_position_config=_position_filter(focus, family, positions), location_config=_coworking_location_config(), camera_config=_orbit_camera(horizontal_angle=35, vertical_angle=0, zoom=6.5), ) def _insta_pair_case(seed: int, *, pov: bool, position: str, focus: str, family: str) -> dict[str, Any]: return pb.build_insta_of_pair( row_number=1, start_index=1, seed=seed, ethnicity="any", figure="random", no_plus_women=False, no_black=False, trigger=TRIGGER, prepend_trigger_to_prompt=True, seed_config=pb.build_seed_lock_config_json(base_seed=seed), options_json=_insta_options(), character_cast=_character_cast(pov_man=pov), hardcore_position_config=_position_filter(focus, family, [position]), location_config=_coworking_location_config(), camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=6.0), softcore_camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5), hardcore_camera_config=_orbit_camera(horizontal_angle=68 if pov else 135, vertical_angle=20, zoom=7.5), ) def _pair_seed_probe(seed: int, *, reroll_axis: str = "none", reroll_seed: int = -1) -> dict[str, Any]: return pb.build_insta_of_pair( row_number=1, start_index=1, seed=seed, ethnicity="any", figure="random", no_plus_women=False, no_black=False, trigger=TRIGGER, prepend_trigger_to_prompt=True, seed_config=pb.build_seed_lock_config_json(base_seed=seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed), options_json=_insta_options(), character_cast=_random_character_cast(), hardcore_position_config=_position_filter("penetration_only", "penetration", ["missionary", "doggy", "cowgirl"]), location_config=_seed_probe_location_config(), composition_config=_seed_probe_composition_config(), camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=6.0), softcore_camera_config=_orbit_camera(horizontal_angle=45, vertical_angle=0, zoom=5.5), hardcore_camera_config=_orbit_camera(horizontal_angle=135, vertical_angle=20, zoom=7.5), ) def _seed_probe_row(seed: int, *, reroll_axis: str = "none", reroll_seed: int = -1) -> dict[str, Any]: return pb.build_prompt( category="Hardcore sexual poses", subcategory="Penetrative sex", row_number=1, start_index=1, seed=seed, clothing="random", ethnicity="any", poses="random", backside_bias=0.0, figure="random", no_plus_women=False, no_black=False, minimal_clothing_ratio=-1, standard_pose_ratio=-1, trigger=TRIGGER, prepend_trigger_to_prompt=True, extra_positive="", extra_negative="", seed_config=pb.build_seed_lock_config_json(base_seed=seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed), women_count=1, men_count=1, character_cast=_random_character_cast(), hardcore_position_config=_position_filter("penetration_only", "penetration", ["missionary", "doggy", "cowgirl"]), location_config=_seed_probe_location_config(), composition_config=_seed_probe_composition_config(), expression_intensity=0.75, ) def _seed_probe_snapshot(row: dict[str, Any]) -> dict[str, Any]: return { "cast_descriptor_text": row.get("cast_descriptor_text"), "scene": row.get("scene"), "scene_text": row.get("scene_text"), "position_key": row.get("position_key"), "position_keys": row.get("position_keys") or [], "item": row.get("item"), "source_role_graph": row.get("source_role_graph"), "character_expression_text": row.get("character_expression_text"), "composition": row.get("composition"), } def _pair_seed_snapshot(pair: dict[str, Any]) -> dict[str, Any]: soft_row = pair.get("softcore_row") if isinstance(pair.get("softcore_row"), dict) else {} hard_row = pair.get("hardcore_row") if isinstance(pair.get("hardcore_row"), dict) else {} return { "shared_cast_descriptors": pair.get("shared_cast_descriptors"), "soft_cast_descriptor_text": soft_row.get("cast_descriptor_text"), "hard_cast_descriptor_text": hard_row.get("cast_descriptor_text"), "soft_scene_text": soft_row.get("scene_text"), "hard_scene_text": hard_row.get("scene_text"), "soft_item": soft_row.get("item"), "soft_pose": soft_row.get("pose"), "hard_item": hard_row.get("item"), "hard_position_key": hard_row.get("position_key"), "hard_position_keys": hard_row.get("position_keys") or [], "hard_source_role_graph": hard_row.get("source_role_graph"), "soft_composition": soft_row.get("composition"), "hard_composition": hard_row.get("composition"), "soft_expression": soft_row.get("character_expression_text"), "hard_expression": hard_row.get("character_expression_text"), } def _same_fields_issues( name: str, base: dict[str, Any], rerolled: dict[str, Any], fields: tuple[str, ...], reroll_seed: int, ) -> list[str]: return [ f"{name}: stable_field_changed:{field}:reroll_seed={reroll_seed}" for field in fields if base.get(field) != rerolled.get(field) ] def _formatter_output_texts(row: dict[str, Any]) -> dict[str, str]: formatted = _format_metadata(row, "single") return { "krea": str(formatted["krea"].get("krea_prompt") or ""), "sdxl": str(formatted["sdxl"].get("sdxl_prompt") or ""), "caption": str(formatted["caption"].get("natural_caption") or ""), } def _pair_formatter_output_texts(pair: dict[str, Any]) -> dict[str, str]: texts: dict[str, str] = {} for target in ("softcore", "hardcore"): formatted = _format_metadata(pair, target) texts[f"{target}.krea"] = str( formatted["krea"].get(f"krea_{target}_prompt") or formatted["krea"].get("krea_prompt") or "" ) texts[f"{target}.sdxl"] = str( formatted["sdxl"].get(f"sdxl_{target}_prompt") or formatted["sdxl"].get("sdxl_prompt") or "" ) texts[f"{target}.caption"] = str(formatted["caption"].get("natural_caption") or "") return texts def _seed_determinism_check(seed: int) -> dict[str, Any]: first = _seed_probe_row(seed) second = _seed_probe_row(seed) issues: list[str] = [] if first != second: issues.append("locked seed config did not reproduce identical row metadata") if _formatter_output_texts(first) != _formatter_output_texts(second): issues.append("locked seed config did not reproduce identical formatter outputs") return { "name": "seed_axis.locked_determinism", "base": _row_summary(first), "changed": False, "issues": issues, } def _pair_seed_determinism_check(seed: int) -> dict[str, Any]: first = _pair_seed_probe(seed) second = _pair_seed_probe(seed) issues: list[str] = [] if first != second: issues.append("locked seed config did not reproduce identical pair metadata") if _pair_formatter_output_texts(first) != _pair_formatter_output_texts(second): issues.append("locked seed config did not reproduce identical pair formatter outputs") return { "name": "pair_seed.locked_determinism", "base": _row_summary(first.get("hardcore_row") or {}), "changed": False, "issues": issues, } def _pair_seed_reroll_check( seed: int, *, name: str, reroll_axis: str, changed_fields: tuple[str, ...], stable_fields: tuple[str, ...], base_target: str, ) -> dict[str, Any]: base = _pair_seed_probe(seed) base_snapshot = _pair_seed_snapshot(base) changed = False changed_seed = None changed_field_names: list[str] = [] issues: list[str] = [] for reroll_seed in range(seed + 1, seed + 16): rerolled = _pair_seed_probe(seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed) rerolled_snapshot = _pair_seed_snapshot(rerolled) field_issues = _same_fields_issues(name, base_snapshot, rerolled_snapshot, stable_fields, reroll_seed) if field_issues: issues.extend(field_issues) break changed_field_names = [ field for field in changed_fields if base_snapshot.get(field) != rerolled_snapshot.get(field) ] if changed_field_names: changed = True changed_seed = reroll_seed break if not changed: issues.append(f"{name} did not change {', '.join(changed_fields)} within 15 attempts") return { "name": name, "base": _row_summary(base.get(f"{base_target}_row") or {}), "changed": changed, "changed_seed": changed_seed, "changed_fields": changed_field_names, "issues": issues, } def _seed_reroll_check( seed: int, *, reroll_axis: str, changed_fields: tuple[str, ...], stable_fields: tuple[str, ...], ) -> dict[str, Any]: name = f"seed_axis.{reroll_axis}_reroll" base = _seed_probe_row(seed) base_snapshot = _seed_probe_snapshot(base) changed = False changed_seed = None changed_field_names: list[str] = [] issues: list[str] = [] for reroll_seed in range(seed + 1, seed + 16): rerolled = _seed_probe_row(seed, reroll_axis=reroll_axis, reroll_seed=reroll_seed) rerolled_snapshot = _seed_probe_snapshot(rerolled) field_issues = _same_fields_issues(name, base_snapshot, rerolled_snapshot, stable_fields, reroll_seed) if field_issues: issues.extend(field_issues) break changed_field_names = [ field for field in changed_fields if base_snapshot.get(field) != rerolled_snapshot.get(field) ] if changed_field_names: changed = True changed_seed = reroll_seed break if not changed: issues.append(f"{reroll_axis} reroll did not change {', '.join(changed_fields)} within 15 attempts") return { "name": name, "base": _row_summary(base), "changed": changed, "changed_seed": changed_seed, "changed_fields": changed_field_names, "issues": issues, } def _pair_seed_pose_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.pose_reroll", reroll_axis="pose", changed_fields=("hard_position_key", "hard_item", "hard_source_role_graph"), stable_fields=( "shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text", "soft_scene_text", "hard_scene_text", "soft_item", "soft_pose", "soft_composition", "hard_composition", ), base_target="hardcore", ) def _pair_seed_content_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.content_reroll", reroll_axis="content", changed_fields=("soft_item", "soft_pose"), stable_fields=( "shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text", "soft_scene_text", "hard_scene_text", "hard_item", "hard_position_key", "hard_source_role_graph", "soft_composition", "hard_composition", ), base_target="softcore", ) def _pair_seed_person_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.person_reroll", reroll_axis="person", changed_fields=("shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text"), stable_fields=( "soft_scene_text", "hard_scene_text", "soft_item", "soft_pose", "hard_item", "hard_position_key", "hard_source_role_graph", "soft_composition", "hard_composition", "soft_expression", "hard_expression", ), base_target="hardcore", ) def _pair_seed_scene_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.scene_reroll", reroll_axis="scene", changed_fields=("soft_scene_text", "hard_scene_text"), stable_fields=( "shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text", "soft_item", "soft_pose", "hard_item", "hard_position_key", "hard_source_role_graph", "soft_composition", "hard_composition", "soft_expression", "hard_expression", ), base_target="hardcore", ) def _pair_seed_expression_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.expression_reroll", reroll_axis="expression", changed_fields=("soft_expression", "hard_expression"), stable_fields=( "shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text", "soft_scene_text", "hard_scene_text", "soft_item", "soft_pose", "hard_item", "hard_position_key", "hard_source_role_graph", "soft_composition", "hard_composition", ), base_target="hardcore", ) def _pair_seed_composition_reroll_check(seed: int) -> dict[str, Any]: return _pair_seed_reroll_check( seed, name="pair_seed.composition_reroll", reroll_axis="composition", changed_fields=("soft_composition", "hard_composition"), stable_fields=( "shared_cast_descriptors", "soft_cast_descriptor_text", "hard_cast_descriptor_text", "soft_scene_text", "hard_scene_text", "soft_item", "soft_pose", "hard_item", "hard_position_key", "hard_source_role_graph", "soft_expression", "hard_expression", ), base_target="hardcore", ) def _seed_axis_checks(seed: int) -> list[dict[str, Any]]: return [ _seed_determinism_check(seed), _seed_reroll_check( seed, reroll_axis="person", changed_fields=("cast_descriptor_text",), stable_fields=("scene_text", "position_key", "item", "source_role_graph", "character_expression_text", "composition"), ), _seed_reroll_check( seed, reroll_axis="scene", changed_fields=("scene", "scene_text"), stable_fields=("cast_descriptor_text", "position_key", "item", "source_role_graph", "character_expression_text", "composition"), ), _seed_reroll_check( seed, reroll_axis="pose", changed_fields=("position_key", "item", "source_role_graph"), stable_fields=("cast_descriptor_text", "scene_text", "character_expression_text", "composition"), ), _seed_reroll_check( seed, reroll_axis="expression", changed_fields=("character_expression_text",), stable_fields=("cast_descriptor_text", "scene_text", "position_key", "item", "source_role_graph", "composition"), ), _seed_reroll_check( seed, reroll_axis="composition", changed_fields=("composition",), stable_fields=("cast_descriptor_text", "scene_text", "position_key", "item", "source_role_graph", "character_expression_text"), ), ] def _pair_seed_checks(seed: int) -> list[dict[str, Any]]: return [ _pair_seed_determinism_check(seed), _pair_seed_person_reroll_check(seed), _pair_seed_scene_reroll_check(seed), _pair_seed_content_reroll_check(seed), _pair_seed_pose_reroll_check(seed), _pair_seed_expression_reroll_check(seed), _pair_seed_composition_reroll_check(seed), ] def _route_family_coverage_check( name: str, *, expected: set[str], observed: set[str], ) -> dict[str, Any]: missing = sorted(expected - observed) unexpected = sorted(observed - expected) issues: list[str] = [] if missing: issues.append(f"{name}: missing_family_coverage:{missing}") if unexpected: issues.append(f"{name}: unexpected_family_coverage:{unexpected}") return { "name": name, "expected": sorted(expected), "observed": sorted(observed), "missing": missing, "unexpected": unexpected, "issues": issues, } def _route_family_coverage_checks(cases: list[dict[str, Any]]) -> list[dict[str, Any]]: summaries = [ case.get("summary") or {} for case in cases if case.get("target") in ("single", "hardcore") ] observed_actions = { hardcore_action_metadata.normalize_hardcore_action_family(summary.get("action_family"), "") for summary in summaries } observed_actions.discard("") observed_positions = { hardcore_position_config.normalize_hardcore_position_family(summary.get("position_family"), "") for summary in summaries } observed_positions.discard("") expected_actions = set(hardcore_action_metadata.HARDCORE_ACTION_FAMILY_CHOICES) - ROUTE_SIM_ACTION_FAMILY_EXCLUSIONS expected_positions = set(hardcore_position_config.hardcore_position_family_choices()) - ROUTE_SIM_POSITION_FAMILY_EXCLUSIONS return [ _route_family_coverage_check( "route_coverage.action_families", expected=expected_actions, observed=observed_actions, ), _route_family_coverage_check( "route_coverage.position_families", expected=expected_positions, observed=observed_positions, ), ] def _issue_bucket(issue: Any) -> str: text = str(issue or "").strip() if not text: return "empty_issue" if ":" not in text: return _clean_key(text.split()[0] if text.split() else text) or "message" parts = [part.strip() for part in text.split(":") if part.strip()] if len(parts) >= 2: return _clean_key(parts[1]).replace(" ", "_") or "message" return _clean_key(parts[0]).replace(" ", "_") or "message" def _counter_increment(table: dict[str, int], key: Any, amount: int = 1) -> None: label = str(key or "").strip() if not label: return table[label] = table.get(label, 0) + amount def _quality_group_increment(groups: dict[str, dict[str, int]], key: Any, *, issues: int) -> None: label = str(key or "").strip() if not label: return group = groups.setdefault(label, {"cases": 0, "issues": 0}) group["cases"] += 1 group["issues"] += issues def _quality_summary( cases: list[dict[str, Any]], coverage_checks: list[dict[str, Any]], axis_checks: list[dict[str, Any]], pair_seed_checks: list[dict[str, Any]], ) -> dict[str, Any]: issue_buckets: dict[str, int] = {} targets: dict[str, dict[str, int]] = {} action_families: dict[str, dict[str, int]] = {} position_families: dict[str, dict[str, int]] = {} weakest_cases: list[dict[str, Any]] = [] route_issues = 0 for case in cases: issues = list(case.get("issues") or []) issue_count = len(issues) route_issues += issue_count summary = case.get("summary") if isinstance(case.get("summary"), dict) else {} _quality_group_increment(targets, case.get("target"), issues=issue_count) _quality_group_increment(action_families, summary.get("action_family"), issues=issue_count) _quality_group_increment(position_families, summary.get("position_family"), issues=issue_count) for issue in issues: _counter_increment(issue_buckets, _issue_bucket(issue)) if issue_count: weakest_cases.append( { "name": case.get("name"), "target": case.get("target"), "issues": issue_count, "action_family": summary.get("action_family"), "position_family": summary.get("position_family"), "position_key": summary.get("position_key"), } ) check_groups = { "coverage": coverage_checks, "axis": axis_checks, "pair_seed": pair_seed_checks, } check_issues_by_group: dict[str, int] = {} for group_name, checks in check_groups.items(): issue_count = sum(len(check.get("issues") or []) for check in checks) check_issues_by_group[group_name] = issue_count for check in checks: for issue in check.get("issues") or []: _counter_increment(issue_buckets, _issue_bucket(issue)) weakest_cases.sort(key=lambda item: (-int(item.get("issues") or 0), str(item.get("name") or ""))) return { "route_cases": len(cases), "route_issues": route_issues, "check_issues": sum(check_issues_by_group.values()), "check_issues_by_group": check_issues_by_group, "issue_buckets": dict(sorted(issue_buckets.items())), "targets": dict(sorted(targets.items())), "action_families": dict(sorted(action_families.items())), "position_families": dict(sorted(position_families.items())), "weakest_cases": weakest_cases[:8], } def _merge_quality_groups(target: dict[str, dict[str, int]], source: dict[str, Any]) -> None: for key, raw_group in source.items(): if not isinstance(raw_group, dict): continue group = target.setdefault(str(key), {"cases": 0, "issues": 0}) group["cases"] += int(raw_group.get("cases") or 0) group["issues"] += int(raw_group.get("issues") or 0) def _sweep_quality_summary(runs: list[dict[str, Any]]) -> dict[str, Any]: totals = { "route_cases": 0, "route_issues": 0, "check_issues": 0, } check_issues_by_group: dict[str, int] = {} issue_buckets: dict[str, int] = {} targets: dict[str, dict[str, int]] = {} action_families: dict[str, dict[str, int]] = {} position_families: dict[str, dict[str, int]] = {} weakest_cases: list[dict[str, Any]] = [] for run in runs: run_seed = (run.get("summary") or {}).get("seed") quality = run.get("quality") if isinstance(run.get("quality"), dict) else {} for key in totals: totals[key] += int(quality.get(key) or 0) for key, value in (quality.get("check_issues_by_group") or {}).items(): _counter_increment(check_issues_by_group, key, int(value or 0)) for key, value in (quality.get("issue_buckets") or {}).items(): _counter_increment(issue_buckets, key, int(value or 0)) _merge_quality_groups(targets, quality.get("targets") or {}) _merge_quality_groups(action_families, quality.get("action_families") or {}) _merge_quality_groups(position_families, quality.get("position_families") or {}) for case in quality.get("weakest_cases") or []: if not isinstance(case, dict): continue weakest_cases.append({"seed": run_seed, **case}) weakest_cases.sort(key=lambda item: (-int(item.get("issues") or 0), int(item.get("seed") or 0), str(item.get("name") or ""))) return { **totals, "check_issues_by_group": dict(sorted(check_issues_by_group.items())), "issue_buckets": dict(sorted(issue_buckets.items())), "targets": dict(sorted(targets.items())), "action_families": dict(sorted(action_families.items())), "position_families": dict(sorted(position_families.items())), "weakest_cases": weakest_cases[:12], } def run_simulation(seed: int = 3901, *, include_prompts: bool = False) -> dict[str, Any]: cases: list[dict[str, Any]] = [] regular = _regular_single_case(seed) cases.append(_case_report("regular.single.casual", regular, target="single", include_prompts=include_prompts)) for offset, route_case in enumerate(HARDCORE_ROUTE_CASES, start=10): row = _hardcore_single_case( seed + offset, str(route_case["subcategory"]), str(route_case["focus"]), str(route_case["family"]), route_case.get("positions") or (), ) cases.append( _case_report( str(route_case["name"]), row, target="single", include_prompts=include_prompts, expected_route=route_case.get("expected_route"), expected_terms=route_case.get("expected_terms"), ) ) penetration_pair = _insta_pair_case(seed + 1, pov=False, position="doggy", focus="penetration_only", family="penetration") cases.extend(_pair_reports("insta_pair.penetration", penetration_pair, include_prompts=include_prompts)) pov_pair = _insta_pair_case(seed + 2, pov=True, position="penis_licking", focus="outercourse_only", family="outercourse") cases.extend(_pair_reports("insta_pair.pov_outercourse", pov_pair, include_prompts=include_prompts)) coverage_checks = _route_family_coverage_checks(cases) axis_checks = _seed_axis_checks(seed + 3) pair_seed_checks = _pair_seed_checks(seed + 4) issues = [ {"case": case["name"], "issue": issue} for case in cases for issue in case.get("issues", []) ] issues.extend( {"case": check["name"], "issue": issue} for check in coverage_checks for issue in check.get("issues", []) ) issues.extend( {"case": check["name"], "issue": issue} for check in axis_checks for issue in check.get("issues", []) ) issues.extend( {"case": check["name"], "issue": issue} for check in pair_seed_checks for issue in check.get("issues", []) ) quality = _quality_summary(cases, coverage_checks, axis_checks, pair_seed_checks) return { "summary": { "seed": seed, "cases": len(cases), "coverage_checks": len(coverage_checks), "axis_checks": len(axis_checks), "pair_seed_checks": len(pair_seed_checks), "issues": len(issues), }, "quality": quality, "issues": issues, "cases": cases, "coverage_checks": coverage_checks, "axis_checks": axis_checks, "pair_seed_checks": pair_seed_checks, } def run_simulation_sweep( seed: int = 3901, *, count: int = 3, seed_step: int = 101, include_prompts: bool = False, ) -> dict[str, Any]: count = max(1, int(count)) seed_step = int(seed_step) seeds = [seed + index * seed_step for index in range(count)] runs = [run_simulation(seed=current_seed, include_prompts=include_prompts) for current_seed in seeds] issues: list[dict[str, Any]] = [] for run in runs: run_seed = (run.get("summary") or {}).get("seed") issues.extend({"seed": run_seed, **issue} for issue in run.get("issues") or []) quality = _sweep_quality_summary(runs) return { "summary": { "seed": seed, "seed_step": seed_step, "seeds": seeds, "runs": len(runs), "cases": sum((run.get("summary") or {}).get("cases", 0) for run in runs), "coverage_checks": sum((run.get("summary") or {}).get("coverage_checks", 0) for run in runs), "axis_checks": sum((run.get("summary") or {}).get("axis_checks", 0) for run in runs), "pair_seed_checks": sum((run.get("summary") or {}).get("pair_seed_checks", 0) for run in runs), "issues": len(issues), }, "quality": quality, "issues": issues, "runs": runs, } def _print_text_report(report: dict[str, Any]) -> None: summary = report.get("summary") or {} quality = report.get("quality") or {} print( f"Prompt route simulation: seed={summary.get('seed')} " f"cases={summary.get('cases')} coverage_checks={summary.get('coverage_checks')} " f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} " f"issues={summary.get('issues')}" ) print( f"Quality: route_issues={quality.get('route_issues')} " f"check_issues={quality.get('check_issues')} " f"targets={quality.get('targets')}" ) if quality.get("issue_buckets"): print(f"Quality issue buckets: {quality.get('issue_buckets')}") if quality.get("weakest_cases"): print(f"Quality weakest cases: {quality.get('weakest_cases')}") for case in report.get("cases") or []: summary_text = case.get("summary") or {} route = ", ".join(f"{key}={value}" for key, value in summary_text.items() if value not in (None, "", [])) print(f"- {case.get('name')} [{case.get('target')}]: {route}") for issue in case.get("issues") or []: print(f" ISSUE {issue}") for check in report.get("coverage_checks") or []: print( f"- {check.get('name')}: " f"observed={', '.join(check.get('observed') or [])}" ) for issue in check.get("issues") or []: print(f" ISSUE {issue}") for check in report.get("axis_checks") or []: print(f"- {check.get('name')}: changed={check.get('changed')}") for issue in check.get("issues") or []: print(f" ISSUE {issue}") for check in report.get("pair_seed_checks") or []: print(f"- {check.get('name')}: changed={check.get('changed')}") for issue in check.get("issues") or []: print(f" ISSUE {issue}") def _print_sweep_report(report: dict[str, Any]) -> None: summary = report.get("summary") or {} quality = report.get("quality") or {} seeds = ", ".join(str(seed) for seed in (summary.get("seeds") or [])) print( f"Prompt route simulation sweep: seed={summary.get('seed')} " f"seed_step={summary.get('seed_step')} runs={summary.get('runs')} " f"seeds={seeds} cases={summary.get('cases')} coverage_checks={summary.get('coverage_checks')} " f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} " f"issues={summary.get('issues')}" ) print( f"Quality: route_issues={quality.get('route_issues')} " f"check_issues={quality.get('check_issues')} " f"targets={quality.get('targets')}" ) if quality.get("issue_buckets"): print(f"Quality issue buckets: {quality.get('issue_buckets')}") if quality.get("weakest_cases"): print(f"Quality weakest cases: {quality.get('weakest_cases')}") for run in report.get("runs") or []: run_summary = run.get("summary") or {} print( f"- seed {run_summary.get('seed')}: " f"cases={run_summary.get('cases')} issues={run_summary.get('issues')}" ) for issue in run.get("issues") or []: print(f" ISSUE {issue.get('case')}: {issue.get('issue')}") def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--seed", type=int, default=3901, help="Base seed for deterministic simulations.") parser.add_argument("--sweep-count", type=int, default=1, help="Run this many seed-spaced simulations.") parser.add_argument("--seed-step", type=int, default=101, help="Seed increment used by --sweep-count.") parser.add_argument("--json", action="store_true", help="Print the full JSON report.") parser.add_argument("--include-prompts", action="store_true", help="Include raw and formatted prompt text in the report.") parser.add_argument("--fail-on-issues", action="store_true", help="Exit with code 1 when any issue is reported.") args = parser.parse_args(argv) if args.sweep_count > 1: report = run_simulation_sweep( seed=args.seed, count=args.sweep_count, seed_step=args.seed_step, include_prompts=args.include_prompts, ) else: report = run_simulation(seed=args.seed, include_prompts=args.include_prompts) if args.json: print(json.dumps(report, ensure_ascii=True, indent=2, sort_keys=True)) elif args.sweep_count > 1: _print_sweep_report(report) else: _print_text_report(report) return 1 if args.fail_on_issues and report.get("issues") else 0 if __name__ == "__main__": raise SystemExit(main())