Files
ComfyUI-Ethanfel-Prompt-Bui…/krea2_atlas_refine_manifest.py

4317 lines
206 KiB
Python

from __future__ import annotations
import argparse
import hashlib
import json
import re
from pathlib import Path
from typing import Any
try:
from . import krea2_pose_variant_catalog
except ImportError: # Allows local CLI/tests from the repository root.
import krea2_pose_variant_catalog
SCHEMA = "sxcp_krea2_atlas_refine_manifest_v1"
BATCH_SCHEMA = "sxcp_atlas_refine_prompt_batch_v1"
RESULT_SHEET_SCHEMA = "sxcp_atlas_refine_result_sheet_v1"
PROMOTION_REPORT_SCHEMA = "sxcp_atlas_refine_promotion_report_v1"
SIDECAR_UPDATE_DRAFT_SCHEMA = "sxcp_atlas_refine_sidecar_update_draft_v1"
SIDECAR_UPDATE_VALIDATION_SCHEMA = "sxcp_atlas_refine_sidecar_update_validation_v1"
SIDECAR_APPLY_REPORT_SCHEMA = "sxcp_atlas_refine_sidecar_apply_report_v1"
MATRIX_SIDECAR_UPDATE_DRAFT_SCHEMA = "sxcp_atlas_refine_matrix_sidecar_update_draft_v1"
MATRIX_SIDECAR_UPDATE_VALIDATION_SCHEMA = "sxcp_atlas_refine_matrix_sidecar_update_validation_v1"
MATRIX_SIDECAR_APPLY_REPORT_SCHEMA = "sxcp_atlas_refine_matrix_sidecar_apply_report_v1"
SEED_SELECTION_SCHEMA = "sxcp_atlas_refine_seed_selection_v1"
SEED_MATRIX_SCHEMA = "sxcp_atlas_refine_seed_matrix_v1"
SEED_MATRIX_RESULT_SHEET_SCHEMA = "sxcp_atlas_refine_seed_matrix_result_sheet_v1"
SEED_MATRIX_PROMOTION_REPORT_SCHEMA = "sxcp_atlas_refine_seed_matrix_promotion_report_v1"
CATALOG_CUE_DRAFT_SCHEMA = "sxcp_atlas_refine_catalog_cue_draft_v1"
COVERAGE_REPORT_SCHEMA = "sxcp_atlas_refine_coverage_report_v1"
REFERENCE_POOL_REPORT_SCHEMA = "sxcp_atlas_reference_pool_report_v1"
REFERENCE_CUE_REVIEW_SHEET_SCHEMA = "sxcp_atlas_reference_cue_review_sheet_v1"
REFERENCE_CUE_CANDIDATE_DRAFT_SCHEMA = "sxcp_atlas_reference_cue_candidate_draft_v1"
REFERENCE_CUE_SIDECAR_AUTHOR_DRAFT_SCHEMA = "sxcp_atlas_reference_cue_sidecar_author_draft_v1"
REFERENCE_CUE_SIDECAR_AUTHOR_VALIDATION_SCHEMA = "sxcp_atlas_reference_cue_sidecar_author_validation_v1"
REFERENCE_CUE_SIDECAR_AUTHOR_APPLY_REPORT_SCHEMA = "sxcp_atlas_reference_cue_sidecar_author_apply_report_v1"
SIDECAR_SCAFFOLD_SCHEMA = "sxcp_atlas_refine_sidecar_scaffold_v1"
BASELINE_SCORE_SHEET_SCHEMA = "sxcp_atlas_refine_baseline_score_sheet_v1"
BASELINE_SCORE_UPDATE_DRAFT_SCHEMA = "sxcp_atlas_refine_baseline_score_update_draft_v1"
BASELINE_SCORE_UPDATE_VALIDATION_SCHEMA = "sxcp_atlas_refine_baseline_score_update_validation_v1"
BASELINE_SCORE_APPLY_REPORT_SCHEMA = "sxcp_atlas_refine_baseline_score_apply_report_v1"
PROMPT_NOISE_REPORT_SCHEMA = "sxcp_atlas_refine_prompt_noise_report_v1"
PROMPT_CLEANUP_SHEET_SCHEMA = "sxcp_atlas_refine_prompt_cleanup_sheet_v1"
PROMPT_CLEANUP_VALIDATION_SCHEMA = "sxcp_atlas_refine_prompt_cleanup_validation_v1"
PROMPT_CLEANUP_APPLY_REPORT_SCHEMA = "sxcp_atlas_refine_prompt_cleanup_apply_report_v1"
DEFAULT_OUT_CHANNEL = "sxcp_eval_out"
DEFAULT_IN_CHANNEL = "sxcp_eval_in"
NEGATIVE_OUT_CHANNEL = "sxcp_eval_negative_out"
PROMPT_ORDERS = {"subject_first", "geometry_only", "prompt_order_test"}
PROMPT_SUFFIXES = {".txt", ".prompt"}
IMAGE_SUFFIXES = {".png"}
SIDECAR_SUFFIX = ".json"
SEED_METADATA_KEYS = (
"sampler_seed",
"generator_seed",
"atlas_cue_seed",
"micro_position_seed",
"workspace_seed",
)
SEED_SELECTION_SLOT_KEYS = tuple(key for key in SEED_METADATA_KEYS if key != "sampler_seed")
CUE_AXIS_KEYS = (
"contact_depth",
"hand_position",
"foot_position",
"body_angle",
"camera_height",
"workspace_surface",
"clothing_visibility",
"expression_eye_detail",
"anatomy_shape_detail",
)
SCORE_KEYS = (
"atlas_pose_match",
"contact_match",
"pose_ownership",
"workspace_continuity",
"clothing_visibility",
"subject_identity",
"expression_eye_control",
"anatomy_proportion",
"prompt_noise",
)
PROMOTION_PASS_VALUES = {"pass"}
PROMOTION_PROGRESS_VALUES = {"pass", "partial", "baseline"}
PROMOTION_REQUIRED_PASS_KEYS = (
"pose_ownership",
"workspace_continuity",
"clothing_visibility",
"subject_identity",
"prompt_noise",
)
PROMOTION_REQUIRED_PROGRESS_KEYS = (
"atlas_pose_match",
"contact_match",
"expression_eye_control",
"anatomy_proportion",
)
FORBIDDEN_PROMPT_FIELDS = (
"negative",
"negative_prompt",
"negative_text",
"negative_channel",
)
PROMPT_OPTION_WORD_RE = re.compile(r"\b(?:either|or|may|optionally)\b", re.IGNORECASE)
PROMPT_NEGATIVE_CONDITIONING_RE = re.compile(
r"\b(?:do not|must not|should not|never|without|no)\b",
re.IGNORECASE,
)
PROMPT_META_PHRASES = (
"keep the visible partner",
"visible partner and the action primary",
"context stays",
"camera layout",
"pov foreground clothing cue",
"pov foreground body cue",
"beside or behind the bodies",
)
PROMPT_DUPLICATE_PHRASE_RE = re.compile(r"[^.!?;]+(?:[.!?;]|$)")
PROMPT_DUPLICATE_MIN_WORDS = 6
MIN_STABLE_MATRIX_SAMPLER_SEEDS = 2
PROMPT_NOISE_CODES = (
"option_word",
"negative_conditioning",
"meta_instruction",
"duplicate_phrase",
)
def _sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _known_variant_keys() -> list[str]:
return sorted(krea2_pose_variant_catalog.variant_keys(), key=len, reverse=True)
def _variant_key_from_stem(stem: str, known_keys: list[str]) -> str:
for key in known_keys:
if stem == key or stem.startswith(f"{key}_"):
return key
match = re.match(r"^(?P<key>.+?)_\d+_?$", stem)
return match.group("key") if match else stem
def _files_by_stem(folder: Path, suffixes: set[str]) -> dict[str, Path]:
files: dict[str, Path] = {}
for path in sorted(folder.iterdir(), key=lambda item: item.name.lower()):
if path.is_file() and path.suffix.lower() in suffixes:
files[path.stem] = path
return files
def _seed_metadata() -> dict[str, None]:
return {key: None for key in SEED_METADATA_KEYS}
def _cue_axes() -> dict[str, None]:
return {key: None for key in CUE_AXIS_KEYS}
def _score_template() -> dict[str, None]:
return {key: None for key in SCORE_KEYS}
def _merge_known_values(defaults: dict[str, Any], raw: Any) -> dict[str, Any]:
merged = dict(defaults)
if not isinstance(raw, dict):
return merged
for key in merged:
if key in raw:
merged[key] = raw[key]
return merged
def _merge_non_null_known_values(defaults: dict[str, Any], raw: Any) -> dict[str, Any]:
merged = dict(defaults)
if not isinstance(raw, dict):
return merged
for key in merged:
value = raw.get(key)
if value is not None:
merged[key] = value
return merged
def _text(value: Any) -> str:
return "" if value is None else str(value).strip()
def _validate_no_negative_channel(value: Any, *, field: str) -> None:
text = _text(value)
if text == NEGATIVE_OUT_CHANNEL:
raise ValueError(f"{field} must not use {NEGATIVE_OUT_CHANNEL}")
if NEGATIVE_OUT_CHANNEL in text:
raise ValueError(f"{field} must not mention {NEGATIVE_OUT_CHANNEL}")
def _string_list(value: Any, *, field: str) -> list[str]:
if value is None:
return []
if not isinstance(value, list):
raise ValueError(f"{field} must be a list of strings")
items: list[str] = []
for index, item in enumerate(value):
text = _text(item)
if not text:
raise ValueError(f"{field}[{index}] must be a non-empty string")
_validate_no_negative_channel(text, field=f"{field}[{index}]")
items.append(text)
return items
def _reference_images(value: Any, *, field: str) -> list[str]:
refs = _string_list(value, field=field)
atlas_root = _atlas_root_path()
for index, ref in enumerate(refs):
path = Path(ref)
if path.is_absolute():
raise ValueError(f"{field}[{index}] must be relative to the atlas root")
if ".." in path.parts:
raise ValueError(f"{field}[{index}] must not contain .. path segments")
if path.suffix.lower() != ".png":
raise ValueError(f"{field}[{index}] must reference a PNG image")
if atlas_root is not None and not (atlas_root / path).is_file():
raise ValueError(f"{field}[{index}] missing atlas reference image: {atlas_root / path}")
return refs
def _atlas_root_path() -> Path | None:
try:
catalog = krea2_pose_variant_catalog.load_catalog()
except Exception:
return None
root_text = _text(catalog.get("atlas_root") if isinstance(catalog, dict) else "")
if not root_text:
return None
root = Path(root_text)
return root if root.is_dir() else None
def _atlas_relative_path(path_value: str | Path, *, atlas_root: Path, field: str) -> Path:
path = Path(path_value)
if path.is_absolute():
try:
path = path.relative_to(atlas_root)
except ValueError as exc:
raise ValueError(f"{field} must be inside the atlas root {atlas_root}") from exc
if ".." in path.parts:
raise ValueError(f"{field} must not contain .. path segments")
return path
def _reference_image_id(path: Path) -> str:
stem = path.stem
return stem.split("_", 1)[0]
def _atlas_folder_images(atlas_root: Path, folder: str | Path, *, field: str) -> list[dict[str, Any]]:
relative_folder = _atlas_relative_path(folder, atlas_root=atlas_root, field=field)
folder_path = atlas_root / relative_folder
if not folder_path.is_dir():
raise ValueError(f"{field} is missing atlas folder: {folder_path}")
images: list[dict[str, Any]] = []
for path in sorted(folder_path.iterdir(), key=lambda item: item.name.lower()):
if not path.is_file() or path.suffix.lower() != ".png":
continue
relative_path = relative_folder / path.name
images.append(
{
"id": _reference_image_id(path),
"relative_path": relative_path.as_posix(),
"filename": path.name,
"size_bytes": path.stat().st_size,
}
)
return images
def build_reference_pool_report(variant_key: str, *, supplemental_folders: list[str] | None = None) -> dict[str, Any]:
key = _text(variant_key)
if not key:
raise ValueError("variant_key is required")
atlas_root = _atlas_root_path()
if atlas_root is None:
raise ValueError("catalog atlas_root is missing or not readable")
variant = krea2_pose_variant_catalog.get_variant(key)
if not variant:
raise ValueError(f"unknown variant_key {key!r}")
canonical_folders = [str(folder) for folder in variant.get("atlas_folders") or [] if _text(folder)]
if not canonical_folders:
raise ValueError(f"variant {key!r} has no atlas_folders")
supplemental_folder_values = [str(folder) for folder in supplemental_folders or [] if _text(folder)]
canonical_images: list[dict[str, Any]] = []
for index, folder in enumerate(canonical_folders):
canonical_images.extend(_atlas_folder_images(atlas_root, folder, field=f"atlas_folders[{index}]"))
supplemental_images: list[dict[str, Any]] = []
for index, folder in enumerate(supplemental_folder_values):
supplemental_images.extend(_atlas_folder_images(atlas_root, folder, field=f"supplemental_folders[{index}]"))
canonical_by_id = {image["id"]: image for image in canonical_images}
supplemental_by_id = {image["id"]: image for image in supplemental_images}
matched_ids = sorted(set(canonical_by_id) & set(supplemental_by_id))
supplemental_extra_ids = sorted(set(supplemental_by_id) - set(canonical_by_id))
canonical_missing_ids = sorted(set(canonical_by_id) - set(supplemental_by_id))
catalog_reference_images = _reference_images(variant.get("reference_images"), field=f"{key}.reference_images")
return {
"schema": REFERENCE_POOL_REPORT_SCHEMA,
"variant_key": key,
"atlas_root": str(atlas_root),
"canonical_folders": canonical_folders,
"supplemental_folders": supplemental_folder_values,
"catalog_reference_images": catalog_reference_images,
"catalog_reference_count": len(catalog_reference_images),
"canonical_image_count": len(canonical_images),
"supplemental_image_count": len(supplemental_images),
"matched_image_count": len(matched_ids),
"supplemental_extra_count": len(supplemental_extra_ids),
"canonical_missing_supplemental_count": len(canonical_missing_ids),
"canonical_images": [image["relative_path"] for image in canonical_images],
"supplemental_images": [image["relative_path"] for image in supplemental_images],
"matched_images": [
{
"id": image_id,
"canonical_image": canonical_by_id[image_id]["relative_path"],
"supplemental_image": supplemental_by_id[image_id]["relative_path"],
}
for image_id in matched_ids
],
"supplemental_extra_images": [supplemental_by_id[image_id]["relative_path"] for image_id in supplemental_extra_ids],
"canonical_missing_supplemental_images": [canonical_by_id[image_id]["relative_path"] for image_id in canonical_missing_ids],
}
def _blank_review_cue_axes() -> dict[str, str]:
return {key: "" for key in CUE_AXIS_KEYS}
def _reference_review_item(
*,
image_id: str,
role: str,
canonical_image: str,
supplemental_image: str,
reference_images_template: list[str],
) -> dict[str, Any]:
return {
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reference_images_template": list(reference_images_template),
"cue_axes": _blank_review_cue_axes(),
"observed_positive_cues": [],
"rejected_cues": [],
"review_notes": "",
"prompt_variant_template": {
"id": "",
"prompt_order": "subject_first",
"append_cues": [],
"reference_images": list(reference_images_template),
"cue_axes": _cue_axes(),
"seed_metadata": _seed_metadata(),
"notes": "",
},
}
def build_reference_cue_review_sheet(variant_key: str, *, supplemental_folders: list[str] | None = None) -> dict[str, Any]:
report = build_reference_pool_report(variant_key, supplemental_folders=supplemental_folders)
catalog_reference_images = set(report.get("catalog_reference_images") or [])
matched_by_canonical = {
_text(item.get("canonical_image")): _text(item.get("supplemental_image"))
for item in report.get("matched_images") or []
if isinstance(item, dict)
}
review_items: list[dict[str, Any]] = []
for canonical_image in report.get("canonical_images") or []:
canonical_text = _text(canonical_image)
if not canonical_text:
continue
role = "catalog_reference" if canonical_text in catalog_reference_images else "canonical_reference"
review_items.append(
_reference_review_item(
image_id=_reference_image_id(Path(canonical_text)),
role=role,
canonical_image=canonical_text,
supplemental_image=matched_by_canonical.get(canonical_text, ""),
reference_images_template=[canonical_text],
)
)
for supplemental_image in report.get("supplemental_extra_images") or []:
supplemental_text = _text(supplemental_image)
if not supplemental_text:
continue
review_items.append(
_reference_review_item(
image_id=_reference_image_id(Path(supplemental_text)),
role="supplemental_extra",
canonical_image="",
supplemental_image=supplemental_text,
reference_images_template=[],
)
)
return {
"schema": REFERENCE_CUE_REVIEW_SHEET_SCHEMA,
"variant_key": report["variant_key"],
"atlas_root": report["atlas_root"],
"canonical_folders": report["canonical_folders"],
"supplemental_folders": report["supplemental_folders"],
"catalog_reference_count": report["catalog_reference_count"],
"canonical_image_count": report["canonical_image_count"],
"supplemental_image_count": report["supplemental_image_count"],
"matched_image_count": report["matched_image_count"],
"supplemental_extra_count": report["supplemental_extra_count"],
"review_item_count": len(review_items),
"instructions": (
"Fill observed_positive_cues and cue_axes from visual review only. "
"Use canonical/catalog items for sidecar reference_images; use supplemental_extra items as cue-mining evidence until promoted."
),
"review_items": review_items,
}
def _review_cue_axes(raw: Any, *, field: str) -> dict[str, Any]:
values = _cue_axes()
if not isinstance(raw, dict):
return values
for key in CUE_AXIS_KEYS:
value = _text(raw.get(key))
if value:
_validate_no_negative_channel(value, field=f"{field}.{key}")
values[key] = value
return values
def _prompt_variant_id_from_review_item(item: dict[str, Any], *, field: str) -> str:
variant_id = _text(item.get("prompt_variant_id"))
template = item.get("prompt_variant_template")
if not variant_id and isinstance(template, dict):
variant_id = _text(template.get("id"))
if variant_id:
_validate_no_negative_channel(variant_id, field=f"{field}.prompt_variant_id")
return variant_id
def build_reference_cue_candidate_draft(reference_cue_review_sheet: dict[str, Any]) -> dict[str, Any]:
if not isinstance(reference_cue_review_sheet, dict):
raise ValueError("reference cue review sheet must be an object")
schema = _text(reference_cue_review_sheet.get("schema"))
if schema and schema != REFERENCE_CUE_REVIEW_SHEET_SCHEMA:
raise ValueError(f"reference cue review sheet schema must be {REFERENCE_CUE_REVIEW_SHEET_SCHEMA}")
review_items = reference_cue_review_sheet.get("review_items")
if not isinstance(review_items, list):
raise ValueError("reference cue review sheet review_items must be a list")
variant_key = _text(reference_cue_review_sheet.get("variant_key"))
candidates: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
seen_variant_ids: set[str] = set()
for index, item in enumerate(review_items):
if not isinstance(item, dict):
skipped.append({"index": index, "id": "", "reason": "invalid_review_item"})
continue
field = f"review_items[{index}]"
image_id = _text(item.get("id"))
role = _text(item.get("role"))
canonical_image = _text(item.get("canonical_image"))
supplemental_image = _text(item.get("supplemental_image"))
cues = _string_list(item.get("observed_positive_cues"), field=f"{field}.observed_positive_cues")
if not cues:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reason": "no_observed_positive_cues",
}
)
continue
variant_id = _prompt_variant_id_from_review_item(item, field=field)
template = item.get("prompt_variant_template")
template = template if isinstance(template, dict) else {}
exact_text = _text(template.get("text"))
prompt_noise_issues: list[dict[str, Any]] = []
for cue_index, cue in enumerate(cues):
prompt_noise_issues.extend(
_prompt_noise_issues(
cue,
context="reference_cue_observed_positive_cue",
prompt_variant_id=variant_id,
cue_index=cue_index,
)
)
if exact_text:
prompt_noise_issues.extend(
_prompt_noise_issues(
exact_text,
context="reference_cue_exact_text",
prompt_variant_id=variant_id,
)
)
if prompt_noise_issues:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reason": "prompt_noise_issue",
"prompt_noise_issues": prompt_noise_issues,
"prompt_noise_code_counts": _prompt_noise_code_counts(prompt_noise_issues),
}
)
continue
reference_images_template = _reference_images(
item.get("reference_images_template"),
field=f"{field}.reference_images_template",
)
if role == "supplemental_extra" or not canonical_image:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reason": "supplemental_extra_needs_canonical_reference",
"observed_positive_cues": cues,
"cue_axes": _review_cue_axes(item.get("cue_axes"), field=f"{field}.cue_axes"),
}
)
continue
if not reference_images_template:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reason": "missing_reference_images_template",
"observed_positive_cues": cues,
}
)
continue
if not variant_id:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"reason": "missing_prompt_variant_id",
"observed_positive_cues": cues,
}
)
continue
if variant_id in seen_variant_ids:
skipped.append(
{
"index": index,
"id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"prompt_variant_id": variant_id,
"reason": "duplicate_prompt_variant_id",
"observed_positive_cues": cues,
}
)
continue
seen_variant_ids.add(variant_id)
prompt_order = _text(template.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
raise ValueError(f"{field}.prompt_variant_template.prompt_order must be one of {sorted(PROMPT_ORDERS)}")
cue_axes = _review_cue_axes(item.get("cue_axes"), field=f"{field}.cue_axes")
seed_metadata = _merge_known_values(_seed_metadata(), template.get("seed_metadata"))
notes = _text(template.get("notes") or item.get("review_notes"))
_validate_no_negative_channel(notes, field=f"{field}.notes")
prompt_variant = {
"id": variant_id,
"prompt_order": prompt_order,
"reference_images": reference_images_template,
"cue_axes": cue_axes,
"seed_metadata": seed_metadata,
"notes": notes,
}
if exact_text:
_validate_no_negative_channel(exact_text, field=f"{field}.prompt_variant_template.text")
prompt_variant["text"] = exact_text
else:
prompt_variant["append_cues"] = cues
candidates.append(
{
"variant_key": variant_key,
"reference_item_id": image_id,
"role": role,
"canonical_image": canonical_image,
"supplemental_image": supplemental_image,
"prompt_variant_id": variant_id,
"reference_images": reference_images_template,
"observed_positive_cues": cues,
"cue_axes": cue_axes,
"review_notes": _text(item.get("review_notes")),
"prompt_variant": prompt_variant,
}
)
return {
"schema": REFERENCE_CUE_CANDIDATE_DRAFT_SCHEMA,
"variant_key": variant_key,
"ready_candidate_count": len(candidates),
"skipped_count": len(skipped),
"instructions": (
"Copy reviewed prompt_variant objects into same-stem sidecars only after choosing the matching baseline deck; "
"raw-only supplemental rows remain cue-mining evidence until paired with a canonical reference."
),
"candidates": candidates,
"skipped": skipped,
}
def build_reference_cue_sidecar_author_draft(
manifest: dict[str, Any],
reference_cue_candidate_draft: dict[str, Any],
*,
variant_key: str = "",
) -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
schema = _text(reference_cue_candidate_draft.get("schema"))
if schema and schema != REFERENCE_CUE_CANDIDATE_DRAFT_SCHEMA:
raise ValueError(f"reference cue candidate draft schema must be {REFERENCE_CUE_CANDIDATE_DRAFT_SCHEMA}")
requested_variant_key = _text(variant_key or reference_cue_candidate_draft.get("variant_key"))
if not requested_variant_key:
raise ValueError("variant_key is required")
candidate_variants: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
for candidate_index, candidate in enumerate(reference_cue_candidate_draft.get("candidates") or []):
if not isinstance(candidate, dict):
skipped.append({"candidate_index": candidate_index, "reason": "invalid_candidate"})
continue
candidate_variant_key = _text(candidate.get("variant_key") or reference_cue_candidate_draft.get("variant_key"))
if candidate_variant_key and candidate_variant_key != requested_variant_key:
skipped.append(
{
"candidate_index": candidate_index,
"prompt_variant_id": _text(candidate.get("prompt_variant_id")),
"variant_key": candidate_variant_key,
"reason": "variant_key_mismatch",
}
)
continue
prompt_variant = candidate.get("prompt_variant")
if not isinstance(prompt_variant, dict):
skipped.append(
{
"candidate_index": candidate_index,
"prompt_variant_id": _text(candidate.get("prompt_variant_id")),
"reason": "missing_prompt_variant",
}
)
continue
variant_copy = dict(prompt_variant)
variant_id = _text(variant_copy.get("id"))
append_cues = _string_list(variant_copy.get("append_cues"), field=f"candidate prompt_variant {variant_id}.append_cues")
exact_text = _text(variant_copy.get("text"))
if variant_id and append_cues:
variant_copy.setdefault(
"prompt_source",
{
"kind": "append_cues",
"prompt_variant_id": variant_id,
"append_cues": list(append_cues),
},
)
elif variant_id and exact_text:
variant_copy.setdefault(
"prompt_source",
{
"kind": "text",
"prompt_variant_id": variant_id,
"tested_text_sha256": _sha256_text(exact_text),
},
)
candidate_variants.append(variant_copy)
updates: list[dict[str, Any]] = []
matching_entry_count = 0
for entry in entries:
if not isinstance(entry, dict):
continue
entry_variant_key = _text(entry.get("variant_key"))
if entry_variant_key != requested_variant_key:
continue
matching_entry_count += 1
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
if not bool(entry.get("known_variant")):
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"reason": "unknown_variant",
}
)
continue
if not candidate_variants:
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"reason": "no_ready_candidates",
}
)
continue
updates.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"source_prompt_sha256": _text(entry.get("prompt_sha256")),
"prompt_path": _text(entry.get("prompt_path")),
"image_path": _text(entry.get("image_path")),
"prompt_variants": [dict(variant) for variant in candidate_variants],
"notes": "Pre-test sidecar variants from reviewed atlas reference cue candidates.",
}
)
if matching_entry_count == 0:
skipped.append(
{
"variant_key": requested_variant_key,
"reason": "no_matching_manifest_entry",
}
)
return {
"schema": REFERENCE_CUE_SIDECAR_AUTHOR_DRAFT_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"candidate_count": len(candidate_variants),
"update_count": len(updates),
"skipped_count": len(skipped),
"instructions": (
"Validate, apply to the same manifest folder, then rebuild the manifest and run MCP fixed-seed prompt batches before promotion."
),
"updates": updates,
"skipped": skipped,
}
def _prompt_variant_evidence(raw: Any, *, field: str) -> dict[str, Any]:
if raw is None:
return {}
if not isinstance(raw, dict):
raise ValueError(f"{field} must be an object")
evidence: dict[str, Any] = {}
if "seed" in raw:
evidence["seed"] = _int_seed(raw.get("seed"), field=f"{field}.seed")
if "turn" in raw:
turn = raw.get("turn")
if turn is not None and (not isinstance(turn, int) or isinstance(turn, bool)):
raise ValueError(f"{field}.turn must be an integer when present")
evidence["turn"] = turn
if "image_path" in raw:
evidence["image_path"] = _image_path(raw.get("image_path"), field=f"{field}.image_path")
if "score" in raw:
evidence["score"] = _merge_known_values(_score_template(), raw.get("score"))
reference_images = _reference_images(raw.get("reference_images"), field=f"{field}.reference_images")
if reference_images:
evidence["reference_images"] = reference_images
return evidence
def _stable_matrix_evidence(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict) or raw.get("stable") is not True:
return {}
try:
selection_seed = _int_seed(raw.get("selection_seed"), field="matrix_evidence.selection_seed")
seed_slot = _text(raw.get("seed_slot"))
if seed_slot not in SEED_SELECTION_SLOT_KEYS:
return {}
sampler_seeds_raw = raw.get("sampler_seeds")
if not isinstance(sampler_seeds_raw, list) or not sampler_seeds_raw:
return {}
sampler_seeds = [
_int_seed(seed, field=f"matrix_evidence.sampler_seeds[{index}]")
for index, seed in enumerate(sampler_seeds_raw)
]
if len(set(sampler_seeds)) != len(sampler_seeds):
return {}
if len(sampler_seeds) < MIN_STABLE_MATRIX_SAMPLER_SEEDS:
return {}
jobs_raw = raw.get("jobs")
if not isinstance(jobs_raw, list) or not jobs_raw:
return {}
if raw.get("job_count") != len(jobs_raw) or raw.get("promotion_ready_count") != len(jobs_raw) or raw.get("blocked_count") != 0:
return {}
seen_job_ids: set[str] = set()
job_sampler_seeds: list[int] = []
for job_index, job in enumerate(jobs_raw):
if not isinstance(job, dict):
return {}
job_id = _text(job.get("id"))
if not job_id or job_id in seen_job_ids:
return {}
seen_job_ids.add(job_id)
if _text(job.get("decision")) != "seedable_candidate":
return {}
job_sampler_seed = _int_seed(job.get("sampler_seed"), field=f"matrix_evidence.jobs[{job_index}].sampler_seed")
if job_sampler_seed in job_sampler_seeds:
return {}
job_sampler_seeds.append(job_sampler_seed)
if _int_seed(job.get("selection_seed"), field=f"matrix_evidence.jobs[{job_index}].selection_seed") != selection_seed:
return {}
_image_path(job.get("image_path"), field=f"matrix_evidence.jobs[{job_index}].image_path")
turn = job.get("turn")
if not isinstance(turn, int) or isinstance(turn, bool):
return {}
decision, _blockers = _promotion_blockers(_merge_known_values(_score_template(), job.get("score")))
if decision != "seedable_candidate":
return {}
if sorted(job_sampler_seeds) != sorted(sampler_seeds):
return {}
except ValueError:
return {}
return dict(raw)
def _stable_matrix_evidence_for_variant(variant: dict[str, Any], *, field: str) -> dict[str, Any]:
matrix_evidence = _stable_matrix_evidence(variant.get("matrix_evidence"))
if not matrix_evidence:
return {}
try:
seed_slot = _text(matrix_evidence.get("seed_slot"))
selection_seed = _int_seed(matrix_evidence.get("selection_seed"), field=f"{field}.matrix_evidence.selection_seed")
seed_metadata = _merge_known_values(_seed_metadata(), variant.get("seed_metadata"))
if _int_seed(seed_metadata.get(seed_slot), field=f"{field}.seed_metadata.{seed_slot}") != selection_seed:
return {}
except ValueError:
return {}
return matrix_evidence
def _prompt_source(raw: Any, *, field: str) -> dict[str, Any]:
if raw is None:
return {}
if not isinstance(raw, dict):
raise ValueError(f"{field} must be an object")
kind = _text(raw.get("kind"))
if kind not in {"baseline", "text", "append_cues"}:
raise ValueError(f"{field}.kind must be baseline, text, or append_cues")
source: dict[str, Any] = {"kind": kind}
prompt_variant_id = _text(raw.get("prompt_variant_id"))
if prompt_variant_id:
_validate_no_negative_channel(prompt_variant_id, field=f"{field}.prompt_variant_id")
source["prompt_variant_id"] = prompt_variant_id
append_cues = _string_list(raw.get("append_cues"), field=f"{field}.append_cues")
if kind == "append_cues":
if not append_cues:
raise ValueError(f"{field}.append_cues is required when kind is append_cues")
source["append_cues"] = append_cues
elif append_cues:
source["append_cues"] = append_cues
tested_hash = _text(raw.get("tested_text_sha256"))
if tested_hash:
source["tested_text_sha256"] = tested_hash
return source
def _prompt_source_for_variant(variant: dict[str, Any], *, variant_id: str, text: str, append_cues: list[str]) -> dict[str, Any]:
source = _prompt_source(variant.get("prompt_source"), field=f"prompt variant {variant_id}.prompt_source")
if source:
source.setdefault("prompt_variant_id", variant_id)
source.setdefault("tested_text_sha256", _sha256_text(text))
return source
if append_cues:
return {
"kind": "append_cues",
"prompt_variant_id": variant_id,
"append_cues": list(append_cues),
"tested_text_sha256": _sha256_text(text),
}
return {
"kind": "text",
"prompt_variant_id": variant_id,
"tested_text_sha256": _sha256_text(text),
}
def _prompt_variants(raw: Any) -> list[dict[str, Any]]:
if raw is None:
return []
if not isinstance(raw, list):
raise ValueError("prompt_variants must be a list")
variants: list[dict[str, Any]] = []
seen_variant_ids: set[str] = set()
for index, item in enumerate(raw):
if not isinstance(item, dict):
raise ValueError(f"prompt_variants[{index}] must be an object")
for forbidden in FORBIDDEN_PROMPT_FIELDS:
if forbidden in item:
raise ValueError(f"prompt_variants[{index}] must not contain {forbidden}")
variant_id = _text(item.get("id"))
if not variant_id:
raise ValueError(f"prompt_variants[{index}].id is required")
_validate_no_negative_channel(variant_id, field=f"prompt_variants[{index}].id")
if variant_id in seen_variant_ids:
raise ValueError(f"prompt_variants[{index}].id {variant_id!r} is duplicated")
seen_variant_ids.add(variant_id)
prompt_order = _text(item.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
raise ValueError(f"prompt_variants[{index}].prompt_order must be one of {sorted(PROMPT_ORDERS)}")
text = _text(item.get("text"))
append_cues = _string_list(item.get("append_cues"), field=f"prompt_variants[{index}].append_cues")
if text:
_validate_no_negative_channel(text, field=f"prompt_variants[{index}].text")
if bool(text) == bool(append_cues):
raise ValueError(f"prompt_variants[{index}] must provide exactly one of text or append_cues")
notes = _text(item.get("notes"))
_validate_no_negative_channel(notes, field=f"prompt_variants[{index}].notes")
variant: dict[str, Any] = {
"id": variant_id,
"prompt_order": prompt_order,
"cue_axes": _merge_known_values(_cue_axes(), item.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), item.get("seed_metadata")),
"notes": notes,
}
evidence = _prompt_variant_evidence(item.get("evidence"), field=f"prompt_variants[{index}].evidence")
if evidence:
variant["evidence"] = evidence
reference_images = _reference_images(item.get("reference_images"), field=f"prompt_variants[{index}].reference_images")
if reference_images:
variant["reference_images"] = reference_images
matrix_evidence = item.get("matrix_evidence")
if isinstance(matrix_evidence, dict):
variant["matrix_evidence"] = dict(matrix_evidence)
prompt_source = _prompt_source(item.get("prompt_source"), field=f"prompt_variants[{index}].prompt_source")
if prompt_source:
source_variant_id = _text(prompt_source.get("prompt_variant_id"))
if source_variant_id and source_variant_id != variant_id:
raise ValueError(
f"prompt_variants[{index}].prompt_source.prompt_variant_id {source_variant_id!r} must match id {variant_id!r}"
)
variant["prompt_source"] = prompt_source
if text:
variant["text"] = text
else:
variant["append_cues"] = append_cues
variants.append(variant)
return variants
def _sidecar_for_stem(folder: Path, stem: str) -> dict[str, Any]:
path = folder / f"{stem}{SIDECAR_SUFFIX}"
if not path.is_file():
return {}
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
return data if isinstance(data, dict) else {}
def build_manifest(folder: str | Path, *, subject_id: str = "") -> dict[str, Any]:
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"atlas refine folder does not exist: {root}")
prompt_files = _files_by_stem(root, PROMPT_SUFFIXES)
image_files = _files_by_stem(root, IMAGE_SUFFIXES)
known_keys = _known_variant_keys()
known_key_set = set(known_keys)
paired_stems = sorted(set(prompt_files) & set(image_files))
missing_stems = sorted(set(prompt_files) ^ set(image_files))
entries: list[dict[str, Any]] = []
for stem in paired_stems:
prompt_path = prompt_files[stem].resolve()
image_path = image_files[stem].resolve()
prompt_text = prompt_path.read_text(encoding="utf-8").strip()
variant_key = _variant_key_from_stem(stem, known_keys)
sidecar = _sidecar_for_stem(root, stem)
entries.append(
{
"id": stem.rstrip("_"),
"source_stem": stem,
"variant_key": variant_key,
"known_variant": variant_key in known_key_set,
"prompt_path": str(prompt_path),
"image_path": str(image_path),
"prompt_text": prompt_text,
"prompt_sha256": _sha256_text(prompt_text),
"image_size_bytes": image_path.stat().st_size,
"seed_metadata": _merge_known_values(_seed_metadata(), sidecar.get("seed_metadata")),
"cue_axes": _merge_known_values(_cue_axes(), sidecar.get("cue_axes")),
"score": _merge_known_values(_score_template(), sidecar.get("score")),
"prompt_variants": _prompt_variants(sidecar.get("prompt_variants")),
"notes": str(sidecar.get("notes") or ""),
}
)
missing_pairs: list[dict[str, str]] = []
for stem in missing_stems:
prompt_path = prompt_files.get(stem)
image_path = image_files.get(stem)
missing_pairs.append(
{
"stem": stem,
"prompt_path": str(prompt_path.resolve()) if prompt_path else "",
"image_path": str(image_path.resolve()) if image_path else "",
}
)
return {
"schema": SCHEMA,
"root": str(root),
"subject_id": subject_id or root.name,
"entry_count": len(entries),
"missing_pair_count": len(missing_pairs),
"unknown_variant_count": sum(1 for entry in entries if not entry["known_variant"]),
"entries": entries,
"missing_pairs": missing_pairs,
}
def _int_seed(value: Any, *, field: str) -> int:
if not isinstance(value, int) or isinstance(value, bool):
raise ValueError(f"{field} must be an integer sampler seed")
return value
def _probe_list(raw: Any, *, field: str) -> list[dict[str, Any]]:
if not isinstance(raw, list) or not raw:
raise ValueError(f"{field} must be a non-empty list")
probes: list[dict[str, Any]] = []
for index, item in enumerate(raw):
if not isinstance(item, dict):
raise ValueError(f"{field}[{index}] must be an object")
probes.append(item)
return probes
def _image_path(value: Any, *, field: str) -> str:
path_text = _text(value)
if not path_text:
raise ValueError(f"{field} is required")
path = Path(path_text)
if not path.is_absolute():
raise ValueError(f"{field} must be absolute")
if path.suffix.lower() != ".png":
raise ValueError(f"{field} must reference a PNG artifact")
return path_text
def _entry_for_variant(manifest: dict[str, Any], variant_key: str) -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
for entry in entries:
if isinstance(entry, dict) and entry.get("variant_key") == variant_key:
return entry
raise ValueError(f"manifest does not contain variant_key {variant_key!r}")
def _append_cues(base_text: str, cues: list[str]) -> str:
text = _text(base_text)
if not text:
raise ValueError("source prompt text is required")
_validate_no_negative_channel(text, field="source prompt text")
for cue in cues:
if text[-1] not in ".!?":
text += "."
text += f" {cue}"
return re.sub(r"\s+", " ", text).strip()
def _probe_id(entry_id: Any, variant_id: str) -> str:
base_id = _text(entry_id)
if not base_id:
raise ValueError("source entry id is required")
return f"{base_id}__{variant_id}"
def _variant_id_from_probe_id(probe_id: str, source_entry_id: str) -> str:
prefix = f"{source_entry_id}__"
if source_entry_id and probe_id.startswith(prefix):
return probe_id[len(prefix):]
if "__" in probe_id:
return probe_id.rsplit("__", 1)[-1]
return probe_id
def _variant_prompt_text(base_prompt: str, variant: dict[str, Any], *, field: str) -> str:
text = _text(variant.get("text"))
if text:
_validate_no_negative_channel(text, field=f"{field}.text")
return text
append_cues = _string_list(variant.get("append_cues"), field=f"{field}.append_cues")
return _append_cues(base_prompt, append_cues)
def build_prompt_batch(
manifest: dict[str, Any],
variant_key: str,
*,
sampler_seed: int | None = None,
include_baseline: bool = True,
) -> dict[str, Any]:
entry = _entry_for_variant(manifest, variant_key)
seed_metadata = _merge_known_values(_seed_metadata(), entry.get("seed_metadata"))
seed = _int_seed(sampler_seed if sampler_seed is not None else seed_metadata.get("sampler_seed"), field="sampler_seed")
seed_metadata["sampler_seed"] = seed
prompt_text = _text(entry.get("prompt_text"))
_validate_no_negative_channel(prompt_text, field="prompt_text")
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
cue_axes = _merge_known_values(_cue_axes(), entry.get("cue_axes"))
probes: list[dict[str, Any]] = []
if include_baseline:
probes.append(
{
"id": _probe_id(entry_id, "baseline"),
"prompt_order": "subject_first",
"text": prompt_text,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"cue_axes": cue_axes,
"seed_metadata": seed_metadata,
"prompt_source": {
"kind": "baseline",
"tested_text_sha256": _sha256_text(prompt_text),
},
"notes": "baseline",
}
)
for variant in entry.get("prompt_variants") or []:
if not isinstance(variant, dict):
raise ValueError("entry prompt_variants must contain objects")
variant_id = _text(variant.get("id"))
if not variant_id:
raise ValueError("entry prompt variant id is required")
prompt_order = _text(variant.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
raise ValueError(f"entry prompt variant prompt_order must be one of {sorted(PROMPT_ORDERS)}")
exact_text = _text(variant.get("text"))
append_cues = _string_list(variant.get("append_cues"), field=f"entry prompt variant {variant_id}.append_cues")
if bool(exact_text) == bool(append_cues):
raise ValueError(f"entry prompt variant {variant_id} must provide exactly one of text or append_cues")
text = _variant_prompt_text(prompt_text, variant, field=f"entry prompt variant {variant_id}")
_validate_no_negative_channel(text, field=f"entry prompt variant {variant_id}.text")
prompt_source = _prompt_source_for_variant(
variant,
variant_id=variant_id,
text=text,
append_cues=append_cues,
)
variant_seed_metadata = _merge_non_null_known_values(seed_metadata, variant.get("seed_metadata"))
variant_seed_metadata["sampler_seed"] = seed
probe = {
"id": _probe_id(entry_id, variant_id),
"prompt_order": prompt_order,
"text": text,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"cue_axes": _merge_non_null_known_values(cue_axes, variant.get("cue_axes")),
"seed_metadata": variant_seed_metadata,
"evidence": _prompt_variant_evidence(variant.get("evidence"), field=f"entry prompt variant {variant_id}.evidence"),
"prompt_source": prompt_source,
"notes": _text(variant.get("notes")),
}
reference_images = _reference_images(variant.get("reference_images"), field=f"entry prompt variant {variant_id}.reference_images")
if reference_images:
probe["reference_images"] = reference_images
matrix_evidence = _stable_matrix_evidence_for_variant(variant, field=f"entry prompt variant {variant_id}")
if matrix_evidence:
probe["matrix_evidence"] = matrix_evidence
probes.append(probe)
if not probes:
raise ValueError("prompt batch would contain no probes")
return {
"schema": BATCH_SCHEMA,
"seed": seed,
"channel_out": DEFAULT_OUT_CHANNEL,
"channel_in": DEFAULT_IN_CHANNEL,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"source_prompt_sha256": _text(entry.get("prompt_sha256")),
"probes": probes,
}
def select_seeded_prompt_variant(
manifest: dict[str, Any],
variant_key: str,
*,
selection_seed: int,
seed_slot: str = "atlas_cue_seed",
) -> dict[str, Any]:
seed = _int_seed(selection_seed, field="selection_seed")
if seed_slot not in SEED_SELECTION_SLOT_KEYS:
raise ValueError(f"seed_slot must be one of {list(SEED_SELECTION_SLOT_KEYS)} and must not be sampler_seed")
entry = _entry_for_variant(manifest, variant_key)
prompt_text = _text(entry.get("prompt_text"))
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
eligible: list[dict[str, Any]] = []
ineligible: list[dict[str, Any]] = []
for variant in entry.get("prompt_variants") or []:
if not isinstance(variant, dict):
continue
variant_id = _text(variant.get("id"))
if not variant_id:
continue
evidence = _prompt_variant_evidence(variant.get("evidence"), field=f"prompt variant {variant_id}.evidence")
score = _merge_known_values(_score_template(), evidence.get("score"))
decision, blockers = _promotion_blockers(score)
if decision != "seedable_candidate":
reason = "missing_seedable_evidence" if blockers else "not_seedable"
if blockers:
reason += f": {', '.join(blockers)}"
ineligible.append(
{
"prompt_variant_id": variant_id,
"reason": reason,
"cue_axes": _merge_known_values(_cue_axes(), variant.get("cue_axes")),
"evidence": evidence,
}
)
continue
matrix_evidence = _stable_matrix_evidence_for_variant(variant, field=f"prompt variant {variant_id}")
if "matrix_evidence" in variant and not matrix_evidence:
ineligible_item = {
"prompt_variant_id": variant_id,
"reason": "unstable_matrix_evidence",
"cue_axes": _merge_known_values(_cue_axes(), variant.get("cue_axes")),
"evidence": evidence,
}
if isinstance(variant.get("matrix_evidence"), dict):
ineligible_item["matrix_evidence"] = dict(variant["matrix_evidence"])
ineligible.append(ineligible_item)
continue
append_cues = _string_list(variant.get("append_cues"), field=f"prompt variant {variant_id}.append_cues")
text = _variant_prompt_text(prompt_text, variant, field=f"prompt variant {variant_id}")
prompt_source = _prompt_source_for_variant(
variant,
variant_id=variant_id,
text=text,
append_cues=append_cues,
)
candidate = {
"prompt_variant_id": variant_id,
"prompt_order": _text(variant.get("prompt_order") or "subject_first"),
"text": text,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"cue_axes": _merge_known_values(_cue_axes(), variant.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), variant.get("seed_metadata")),
"evidence": evidence,
"prompt_source": prompt_source,
"notes": _text(variant.get("notes")),
}
reference_images = _reference_images(variant.get("reference_images"), field=f"prompt variant {variant_id}.reference_images")
if reference_images:
candidate["reference_images"] = reference_images
if matrix_evidence:
candidate["matrix_evidence"] = matrix_evidence
eligible.append(candidate)
eligible.sort(key=lambda candidate: _text(candidate.get("prompt_variant_id")))
ineligible.sort(key=lambda candidate: _text(candidate.get("prompt_variant_id")))
selected: dict[str, Any] = {}
selected_index = None
if eligible:
selected_index = seed % len(eligible)
selected = eligible[selected_index]
return {
"schema": SEED_SELECTION_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"selection_seed": seed,
"seed_slot": seed_slot,
"eligible_candidate_count": len(eligible),
"ineligible_candidate_count": len(ineligible),
"selected_index": selected_index,
"selected": selected,
"eligible": eligible,
"ineligible": ineligible,
}
def build_seed_selected_prompt_batch(
manifest: dict[str, Any],
variant_key: str,
*,
selection_seed: int,
sampler_seed: int,
seed_slot: str = "atlas_cue_seed",
include_baseline: bool = True,
) -> dict[str, Any]:
seed = _int_seed(sampler_seed, field="sampler_seed")
selection = select_seeded_prompt_variant(
manifest,
variant_key,
selection_seed=selection_seed,
seed_slot=seed_slot,
)
selected = selection.get("selected")
if not isinstance(selected, dict) or not selected:
raise ValueError(f"no seedable prompt variant is available for {variant_key!r}")
entry = _entry_for_variant(manifest, variant_key)
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
prompt_text = _text(entry.get("prompt_text"))
_validate_no_negative_channel(prompt_text, field="prompt_text")
entry_seed_metadata = _merge_known_values(_seed_metadata(), entry.get("seed_metadata"))
entry_seed_metadata["sampler_seed"] = seed
selected_seed_metadata = _merge_known_values(entry_seed_metadata, selected.get("seed_metadata"))
selected_seed_metadata["sampler_seed"] = seed
selected_seed_metadata[seed_slot] = selection["selection_seed"]
probes: list[dict[str, Any]] = []
if include_baseline:
probes.append(
{
"id": _probe_id(entry_id, "baseline"),
"prompt_order": "subject_first",
"text": prompt_text,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"cue_axes": _merge_known_values(_cue_axes(), entry.get("cue_axes")),
"seed_metadata": entry_seed_metadata,
"prompt_source": {
"kind": "baseline",
"tested_text_sha256": _sha256_text(prompt_text),
},
"notes": "baseline",
}
)
selected_id = _text(selected.get("prompt_variant_id"))
selected_text = _text(selected.get("text"))
if not selected_id or not selected_text:
raise ValueError("selected prompt variant id and text are required")
_validate_no_negative_channel(selected_text, field="selected prompt text")
selected_probe = {
"id": _probe_id(entry_id, selected_id),
"prompt_order": _text(selected.get("prompt_order") or "subject_first"),
"text": selected_text,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"cue_axes": _merge_known_values(_cue_axes(), selected.get("cue_axes")),
"seed_metadata": selected_seed_metadata,
"evidence": _prompt_variant_evidence(selected.get("evidence"), field=f"selected prompt variant {selected_id}.evidence"),
"prompt_source": _prompt_source(selected.get("prompt_source"), field=f"selected prompt variant {selected_id}.prompt_source"),
"selection": {
"selection_seed": selection["selection_seed"],
"seed_slot": selection["seed_slot"],
"selected_index": selection["selected_index"],
"prompt_variant_id": selected_id,
},
"notes": _text(selected.get("notes")),
}
reference_images = _reference_images(selected.get("reference_images"), field=f"selected prompt variant {selected_id}.reference_images")
if reference_images:
selected_probe["reference_images"] = reference_images
matrix_evidence = _stable_matrix_evidence_for_variant(selected, field=f"selected prompt variant {selected_id}")
if matrix_evidence:
selected_probe["matrix_evidence"] = matrix_evidence
probes.append(selected_probe)
return {
"schema": BATCH_SCHEMA,
"seed": seed,
"channel_out": DEFAULT_OUT_CHANNEL,
"channel_in": DEFAULT_IN_CHANNEL,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"source_prompt_sha256": _text(entry.get("prompt_sha256")),
"selection": selection,
"probes": probes,
}
def build_seed_matrix(
manifest: dict[str, Any],
variant_key: str,
*,
selection_seeds: list[int],
sampler_seeds: list[int],
seed_slot: str = "atlas_cue_seed",
) -> dict[str, Any]:
if not selection_seeds:
raise ValueError("selection_seeds must contain at least one cue seed")
if not sampler_seeds:
raise ValueError("sampler_seeds must contain at least one sampler seed")
if len(set(selection_seeds)) != len(selection_seeds):
raise ValueError("selection_seeds must not contain duplicate cue seeds")
if len(set(sampler_seeds)) != len(sampler_seeds):
raise ValueError("sampler_seeds must not contain duplicate sampler seeds")
jobs: list[dict[str, Any]] = []
for sampler_index, sampler_seed in enumerate(sampler_seeds):
sampler_seed_value = _int_seed(sampler_seed, field=f"sampler_seeds[{sampler_index}]")
for selection_index, selection_seed in enumerate(selection_seeds):
selection_seed_value = _int_seed(selection_seed, field=f"selection_seeds[{selection_index}]")
batch = build_seed_selected_prompt_batch(
manifest,
variant_key,
selection_seed=selection_seed_value,
sampler_seed=sampler_seed_value,
seed_slot=seed_slot,
)
probes = [probe for probe in batch.get("probes") or [] if isinstance(probe, dict)]
candidate_probe = probes[-1] if probes else {}
selection = dict(batch.get("selection")) if isinstance(batch.get("selection"), dict) else {}
selected = dict(selection.get("selected")) if isinstance(selection.get("selected"), dict) else {}
jobs.append(
{
"id": f"{variant_key}__sampler_{sampler_seed_value}__{seed_slot}_{selection_seed_value}",
"variant_key": variant_key,
"sampler_seed": sampler_seed_value,
"selection_seed": selection_seed_value,
"seed_slot": seed_slot,
"selected": selected,
"candidate_probe": candidate_probe,
"batch": batch,
}
)
return {
"schema": SEED_MATRIX_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": variant_key,
"seed_slot": seed_slot,
"sampler_seeds": list(sampler_seeds),
"selection_seeds": list(selection_seeds),
"sampler_seed_count": len(sampler_seeds),
"selection_seed_count": len(selection_seeds),
"job_count": len(jobs),
"jobs": jobs,
}
def _score_value(score: dict[str, Any], key: str) -> str:
return _text(score.get(key)).lower()
def _promotion_blockers(score: dict[str, Any]) -> tuple[str, list[str]]:
missing: list[str] = []
failed: list[str] = []
for key in PROMOTION_REQUIRED_PASS_KEYS:
value = _score_value(score, key)
if not value:
missing.append(key)
elif value not in PROMOTION_PASS_VALUES:
failed.append(f"{key}={value}")
for key in PROMOTION_REQUIRED_PROGRESS_KEYS:
value = _score_value(score, key)
if not value:
missing.append(key)
elif value not in PROMOTION_PROGRESS_VALUES:
failed.append(f"{key}={value}")
if missing:
return "needs_visual_score", missing
if failed:
return "rejected", failed
return "seedable_candidate", []
def build_promotion_report(result_sheet: dict[str, Any]) -> dict[str, Any]:
probes = _probe_list(result_sheet.get("probes"), field="result sheet probes")
seed = _int_seed(result_sheet.get("seed"), field="result sheet seed")
baseline_probe_id = _text(result_sheet.get("baseline_probe_id") or probes[0].get("id"))
source_entry_id = _text(result_sheet.get("source_entry_id"))
source_stem = _text(result_sheet.get("source_stem") or source_entry_id)
candidates: list[dict[str, Any]] = []
for probe in probes:
probe_id = _text(probe.get("id"))
if not probe_id:
raise ValueError("result sheet probe id is required")
if probe_id == baseline_probe_id:
continue
text = _text(probe.get("text"))
if not text:
raise ValueError(f"result sheet probe {probe_id}.text is required")
_validate_no_negative_channel(text, field=f"result sheet probe {probe_id}.text")
probe_source_entry_id = _text(probe.get("source_entry_id") or source_entry_id)
prompt_variant_id = _variant_id_from_probe_id(probe_id, probe_source_entry_id)
prompt_noise_issues = _prompt_noise_issues(
text,
context="result_sheet_probe",
prompt_variant_id=prompt_variant_id,
)
score = _merge_known_values(_score_template(), probe.get("score"))
decision, blockers = _promotion_blockers(score)
matrix_evidence = _stable_matrix_evidence_for_variant(probe, field=f"result sheet probe {probe_id}")
if decision == "seedable_candidate" and prompt_noise_issues:
decision = "rejected"
blockers = ["prompt_noise_issue"]
if decision == "seedable_candidate" and "matrix_evidence" in probe and not matrix_evidence:
decision = "rejected"
blockers = ["unstable_matrix_evidence"]
probe_source_stem = _text(probe.get("source_stem") or source_stem or probe_source_entry_id)
candidate = {
"id": probe_id,
"prompt_variant_id": prompt_variant_id,
"decision": decision,
"blockers": blockers,
"variant_key": _text(probe.get("variant_key") or result_sheet.get("variant_key")),
"source_entry_id": probe_source_entry_id,
"source_stem": probe_source_stem,
"seed": seed,
"prompt_order": _text(probe.get("prompt_order") or "subject_first"),
"text": text,
"turn": probe.get("turn"),
"image_path": _image_path(probe.get("image_path"), field=f"result sheet probe {probe_id}.image_path"),
"cue_axes": _merge_known_values(_cue_axes(), probe.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), probe.get("seed_metadata")),
"score": score,
"prompt_source": _prompt_source(probe.get("prompt_source"), field=f"result sheet probe {probe_id}.prompt_source"),
"analysis_notes": _text(probe.get("analysis_notes")),
}
reference_images = _reference_images(probe.get("reference_images"), field=f"result sheet probe {probe_id}.reference_images")
if reference_images:
candidate["reference_images"] = reference_images
if prompt_noise_issues:
candidate["prompt_noise_issues"] = prompt_noise_issues
candidate["prompt_noise_code_counts"] = _prompt_noise_code_counts(prompt_noise_issues)
if matrix_evidence:
candidate["matrix_evidence"] = matrix_evidence
candidates.append(candidate)
return {
"schema": PROMOTION_REPORT_SCHEMA,
"seed": seed,
"subject_id": _text(result_sheet.get("subject_id")),
"variant_key": _text(result_sheet.get("variant_key")),
"source_entry_id": source_entry_id,
"source_stem": source_stem,
"baseline_probe_id": baseline_probe_id,
"candidate_count": len(candidates),
"promotion_ready_count": sum(1 for candidate in candidates if candidate["decision"] == "seedable_candidate"),
"blocked_count": sum(1 for candidate in candidates if candidate["decision"] != "seedable_candidate"),
"required_pass_keys": list(PROMOTION_REQUIRED_PASS_KEYS),
"required_progress_keys": list(PROMOTION_REQUIRED_PROGRESS_KEYS),
"candidates": candidates,
}
def build_sidecar_update_draft(promotion_report: dict[str, Any]) -> dict[str, Any]:
candidates = _probe_list(promotion_report.get("candidates"), field="promotion report candidates")
seed = _int_seed(promotion_report.get("seed"), field="promotion report seed")
ready_candidates = [candidate for candidate in candidates if candidate.get("decision") == "seedable_candidate"]
updates_by_stem: dict[str, dict[str, Any]] = {}
for candidate in ready_candidates:
candidate_id = _text(candidate.get("id"))
prompt_variant_id = _text(candidate.get("prompt_variant_id"))
if not candidate_id or not prompt_variant_id:
raise ValueError("seedable candidate id and prompt_variant_id are required")
text = _text(candidate.get("text"))
if not text:
raise ValueError(f"seedable candidate {candidate_id}.text is required")
_validate_no_negative_channel(text, field=f"seedable candidate {candidate_id}.text")
source_entry_id = _text(candidate.get("source_entry_id") or promotion_report.get("source_entry_id"))
source_stem = _text(candidate.get("source_stem") or promotion_report.get("source_stem") or source_entry_id)
if not source_stem:
raise ValueError(f"seedable candidate {candidate_id}.source_stem is required")
update = updates_by_stem.setdefault(
source_stem,
{
"source_entry_id": source_entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"variant_key": _text(candidate.get("variant_key") or promotion_report.get("variant_key")),
"prompt_variants": [],
},
)
prompt_variant = {
"id": prompt_variant_id,
"prompt_order": _text(candidate.get("prompt_order") or "subject_first"),
"text": text,
"cue_axes": _merge_known_values(_cue_axes(), candidate.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), candidate.get("seed_metadata")),
"notes": _text(candidate.get("analysis_notes")),
"prompt_source": _prompt_source(candidate.get("prompt_source"), field=f"seedable candidate {candidate_id}.prompt_source"),
"evidence": {
"seed": seed,
"turn": candidate.get("turn"),
"image_path": _image_path(candidate.get("image_path"), field=f"seedable candidate {candidate_id}.image_path"),
"score": _merge_known_values(_score_template(), candidate.get("score")),
},
}
reference_images = _reference_images(candidate.get("reference_images"), field=f"seedable candidate {candidate_id}.reference_images")
if reference_images:
prompt_variant["reference_images"] = reference_images
prompt_variant["evidence"]["reference_images"] = reference_images
matrix_evidence = _stable_matrix_evidence_for_variant(candidate, field=f"seedable candidate {candidate_id}")
if matrix_evidence:
prompt_variant["matrix_evidence"] = matrix_evidence
update["prompt_variants"].append(prompt_variant)
updates = [updates_by_stem[key] for key in sorted(updates_by_stem)]
return {
"schema": SIDECAR_UPDATE_DRAFT_SCHEMA,
"seed": seed,
"subject_id": _text(promotion_report.get("subject_id")),
"variant_key": _text(promotion_report.get("variant_key")),
"ready_candidate_count": len(ready_candidates),
"skipped_candidate_count": len(candidates) - len(ready_candidates),
"update_count": len(updates),
"updates": updates,
}
def build_matrix_sidecar_update_draft(matrix_promotion_report: dict[str, Any]) -> dict[str, Any]:
schema = _text(matrix_promotion_report.get("schema"))
if schema and schema != SEED_MATRIX_PROMOTION_REPORT_SCHEMA:
raise ValueError(f"seed matrix promotion report schema must be {SEED_MATRIX_PROMOTION_REPORT_SCHEMA}")
jobs = [job for job in matrix_promotion_report.get("jobs") or [] if isinstance(job, dict)]
jobs_by_id = {_text(job.get("id")): job for job in jobs if _text(job.get("id"))}
updates_by_stem: dict[str, dict[str, Any]] = {}
skipped: list[dict[str, Any]] = []
ready_group_count = 0
for group in matrix_promotion_report.get("groups") or []:
if not isinstance(group, dict):
continue
prompt_variant_id = _text(group.get("prompt_variant_id"))
selection_seed = group.get("selection_seed")
blockers = [_text(blocker) for blocker in group.get("blockers") or [] if _text(blocker)]
group_context = {
"variant_key": _text(group.get("variant_key") or matrix_promotion_report.get("variant_key")),
"source_entry_id": _text(group.get("source_entry_id")),
"source_stem": _text(group.get("source_stem") or group.get("source_entry_id")),
"prompt_variant_id": prompt_variant_id,
"prompt_text_sha256": _text(group.get("prompt_text_sha256")),
"selection_seed": selection_seed,
"seed_slot": _text(group.get("seed_slot") or matrix_promotion_report.get("seed_slot")),
"sampler_seeds": list(group.get("sampler_seeds") or []),
"blockers": blockers,
}
if group.get("stable") is not True:
skipped.append({**group_context, "reason": "unstable_matrix_group"})
continue
group_job_ids = [_text(job_id) for job_id in group.get("job_ids") or [] if _text(job_id)]
duplicate_job_ids = sorted({job_id for job_id in group_job_ids if group_job_ids.count(job_id) > 1})
if duplicate_job_ids:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} job_ids contain duplicated ids: {', '.join(duplicate_job_ids)}"
)
missing_job_ids = [job_id for job_id in group_job_ids if job_id not in jobs_by_id]
if missing_job_ids:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} job_ids reference missing jobs: {', '.join(missing_job_ids)}"
)
group_jobs = [jobs_by_id[job_id] for job_id in group_job_ids if job_id in jobs_by_id]
expected_selection_seed = _int_seed(selection_seed, field=f"stable matrix group {prompt_variant_id}.selection_seed")
expected_prompt_text_sha256 = group_context["prompt_text_sha256"]
if not expected_prompt_text_sha256 and group_jobs:
first_candidate = group_jobs[0].get("candidate") if isinstance(group_jobs[0].get("candidate"), dict) else {}
first_text = _text(first_candidate.get("text")) if isinstance(first_candidate, dict) else ""
expected_prompt_text_sha256 = _sha256_text(first_text) if first_text else ""
for job in group_jobs:
job_id = _text(job.get("id"))
job_candidate = job.get("candidate") if isinstance(job.get("candidate"), dict) else {}
job_text = _text(job_candidate.get("text")) if isinstance(job_candidate, dict) else ""
job_prompt_text_sha256 = _sha256_text(job_text) if job_text else _text(job.get("prompt_text_sha256"))
declared_job_text_sha256 = _text(job.get("prompt_text_sha256"))
if declared_job_text_sha256 and job_prompt_text_sha256 and declared_job_text_sha256 != job_prompt_text_sha256:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} job {job_id!r} candidate prompt text "
f"{job_prompt_text_sha256!r} does not match job prompt_text_sha256; "
f"expected {declared_job_text_sha256!r}"
)
identity_checks = (
("prompt_variant_id", prompt_variant_id, _text(job.get("prompt_variant_id"))),
("prompt text", expected_prompt_text_sha256, job_prompt_text_sha256),
("selection_seed", expected_selection_seed, _int_seed(job.get("selection_seed"), field=f"matrix job {job_id}.selection_seed")),
("seed_slot", group_context["seed_slot"], _text(job.get("seed_slot"))),
("variant_key", group_context["variant_key"], _text(job.get("variant_key"))),
("source_entry_id", group_context["source_entry_id"], _text(job.get("source_entry_id"))),
("source_stem", group_context["source_stem"], _text(job.get("source_stem") or job.get("source_entry_id"))),
)
for field, expected_value, actual_value in identity_checks:
if expected_value and actual_value and actual_value != expected_value:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} job_ids include job {job_id!r} "
f"with {field} {actual_value!r}, expected {expected_value!r}"
)
declared_sampler_seeds = sorted(
{_int_seed(seed, field=f"stable matrix group {prompt_variant_id}.sampler_seeds") for seed in group_context["sampler_seeds"]}
)
job_sampler_seeds = sorted(
{_int_seed(job.get("sampler_seed"), field=f"stable matrix group {prompt_variant_id}.job_ids sampler_seed") for job in group_jobs}
)
if declared_sampler_seeds != job_sampler_seeds:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} sampler_seeds {declared_sampler_seeds} "
f"do not match job_ids sampler coverage {job_sampler_seeds}"
)
if len(job_sampler_seeds) < MIN_STABLE_MATRIX_SAMPLER_SEEDS:
raise ValueError(
f"stable matrix group {prompt_variant_id!r} sampler_seeds must include at least "
f"{MIN_STABLE_MATRIX_SAMPLER_SEEDS} unique sampler seeds"
)
actual_job_count = len(group_jobs)
actual_promotion_ready_count = sum(1 for job in group_jobs if job.get("decision") == "seedable_candidate")
actual_blocked_count = actual_job_count - actual_promotion_ready_count
count_mismatches: list[str] = []
for field, actual_value in (
("job_count", actual_job_count),
("promotion_ready_count", actual_promotion_ready_count),
("blocked_count", actual_blocked_count),
):
if field in group and group.get(field) is not None:
try:
declared_value = int(group.get(field))
except (TypeError, ValueError) as exc:
raise ValueError(f"stable matrix group {prompt_variant_id!r} {field} must be an integer") from exc
if declared_value != actual_value:
count_mismatches.append(f"{field} {declared_value} != job_ids count {actual_value}")
if count_mismatches:
raise ValueError(f"stable matrix group {prompt_variant_id!r} count mismatch: {'; '.join(count_mismatches)}")
ready_jobs = [job for job in group_jobs if job.get("decision") == "seedable_candidate"]
if not ready_jobs:
skipped.append({**group_context, "reason": "no_seedable_jobs"})
continue
representative_job = ready_jobs[0]
candidate = representative_job.get("candidate")
if not isinstance(candidate, dict):
skipped.append({**group_context, "reason": "missing_representative_candidate"})
continue
source_entry_id = _text(candidate.get("source_entry_id"))
source_stem = _text(candidate.get("source_stem") or source_entry_id)
if not source_stem:
skipped.append({**group_context, "reason": "missing_source_stem"})
continue
text = _text(candidate.get("text"))
if not text:
skipped.append({**group_context, "reason": "missing_candidate_text"})
continue
_validate_no_negative_channel(text, field=f"matrix candidate {prompt_variant_id}.text")
matrix_jobs: list[dict[str, Any]] = []
for job in ready_jobs:
job_candidate = job.get("candidate") if isinstance(job.get("candidate"), dict) else {}
matrix_jobs.append(
{
"id": _text(job.get("id")),
"sampler_seed": _int_seed(job.get("sampler_seed"), field=f"matrix job {job.get('id')}.sampler_seed"),
"selection_seed": _int_seed(job.get("selection_seed"), field=f"matrix job {job.get('id')}.selection_seed"),
"decision": _text(job.get("decision")),
"turn": job_candidate.get("turn"),
"image_path": _image_path(job_candidate.get("image_path"), field=f"matrix job {job.get('id')}.image_path"),
"score": _merge_known_values(_score_template(), job_candidate.get("score")),
}
)
matrix_evidence = {
"stable": True,
"selection_seed": expected_selection_seed,
"seed_slot": group_context["seed_slot"],
"sampler_seeds": declared_sampler_seeds,
"job_count": actual_job_count,
"promotion_ready_count": actual_promotion_ready_count,
"blocked_count": actual_blocked_count,
"jobs": matrix_jobs,
}
update = updates_by_stem.setdefault(
source_stem,
{
"source_entry_id": source_entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"variant_key": group_context["variant_key"],
"prompt_variants": [],
},
)
update["prompt_variants"].append(
{
"id": prompt_variant_id,
"prompt_order": _text(candidate.get("prompt_order") or "subject_first"),
"text": text,
"cue_axes": _merge_known_values(_cue_axes(), candidate.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), candidate.get("seed_metadata")),
"notes": f"stable matrix evidence for {group_context['seed_slot']}={matrix_evidence['selection_seed']}",
"prompt_source": _prompt_source(candidate.get("prompt_source"), field=f"matrix candidate {prompt_variant_id}.prompt_source"),
"evidence": {
"seed": _int_seed(representative_job.get("sampler_seed"), field="representative matrix sampler_seed"),
"turn": candidate.get("turn"),
"image_path": _image_path(candidate.get("image_path"), field=f"matrix candidate {prompt_variant_id}.image_path"),
"score": _merge_known_values(_score_template(), candidate.get("score")),
},
"matrix_evidence": matrix_evidence,
}
)
reference_images = _reference_images(candidate.get("reference_images"), field=f"matrix candidate {prompt_variant_id}.reference_images")
if reference_images:
update["prompt_variants"][-1]["reference_images"] = reference_images
update["prompt_variants"][-1]["evidence"]["reference_images"] = reference_images
ready_group_count += 1
updates = [updates_by_stem[key] for key in sorted(updates_by_stem)]
return {
"schema": MATRIX_SIDECAR_UPDATE_DRAFT_SCHEMA,
"subject_id": _text(matrix_promotion_report.get("subject_id")),
"variant_key": _text(matrix_promotion_report.get("variant_key")),
"ready_group_count": ready_group_count,
"skipped_group_count": len(skipped),
"update_count": len(updates),
"updates": updates,
"skipped": skipped,
}
def build_catalog_cue_draft(manifest: dict[str, Any], *, variant_key: str = "") -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
requested_variant_key = _text(variant_key)
candidates: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
for entry in entries:
entry_variant_key = _text(entry.get("variant_key"))
if requested_variant_key and entry_variant_key != requested_variant_key:
continue
prompt_text = _text(entry.get("prompt_text"))
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
for variant in entry.get("prompt_variants") or []:
if not isinstance(variant, dict):
continue
variant_id = _text(variant.get("id"))
if not variant_id:
continue
append_cues = _string_list(variant.get("append_cues"), field=f"catalog cue variant {variant_id}.append_cues")
tested_text = _variant_prompt_text(prompt_text, variant, field=f"catalog cue variant {variant_id}")
prompt_source = _prompt_source_for_variant(
variant,
variant_id=variant_id,
text=tested_text,
append_cues=append_cues,
)
evidence = _prompt_variant_evidence(variant.get("evidence"), field=f"catalog cue variant {variant_id}.evidence")
score = _merge_known_values(_score_template(), evidence.get("score"))
decision, blockers = _promotion_blockers(score)
if decision != "seedable_candidate":
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"prompt_variant_id": variant_id,
"reason": "missing_seedable_evidence" if blockers else "not_seedable",
"blockers": blockers,
}
)
continue
matrix_evidence = _stable_matrix_evidence_for_variant(variant, field=f"catalog cue variant {variant_id}")
if "matrix_evidence" in variant and not matrix_evidence:
skipped_item = {
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"prompt_variant_id": variant_id,
"reason": "unstable_matrix_evidence",
"blockers": ["unstable_matrix_evidence"],
}
if isinstance(variant.get("matrix_evidence"), dict):
skipped_item["matrix_evidence"] = dict(variant["matrix_evidence"])
skipped.append(skipped_item)
continue
if prompt_source.get("kind") != "append_cues" or not prompt_source.get("append_cues"):
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"prompt_variant_id": variant_id,
"reason": "not_append_cues",
}
)
continue
candidate = {
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"prompt_variant_id": variant_id,
"prompt_variant_cues": list(prompt_source.get("append_cues") or []),
"tested_text": tested_text,
"tested_text_sha256": _sha256_text(tested_text),
"cue_axes": _merge_known_values(_cue_axes(), variant.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), variant.get("seed_metadata")),
"evidence": evidence,
"notes": _text(variant.get("notes")),
}
reference_images = _reference_images(variant.get("reference_images"), field=f"catalog cue variant {variant_id}.reference_images")
if reference_images:
candidate["reference_images"] = reference_images
if matrix_evidence:
candidate["matrix_evidence"] = matrix_evidence
candidates.append(candidate)
return {
"schema": CATALOG_CUE_DRAFT_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"ready_cue_count": len(candidates),
"skipped_count": len(skipped),
"candidates": candidates,
"skipped": skipped,
}
def _coverage_state(
*,
known_variant: bool,
prompt_noise_issue_count: int,
prompt_variant_count: int,
seedable_count: int,
catalog_cue_count: int,
unscored_count: int,
rejected_count: int,
) -> tuple[str, str]:
if not known_variant:
return "unknown_variant", "map the prompt/image stem to a catalog variant before seed testing"
if prompt_noise_issue_count:
return "needs_prompt_cleanup", "clean option/meta/negative prompt wording before visual scoring or seed promotion"
if prompt_variant_count == 0:
return "baseline_only", "add reviewed sidecar prompt_variants from MCP atlas probes"
if catalog_cue_count:
return "ready_for_catalog_review", "review catalog cue draft before editing prompt_variant_cues"
if seedable_count:
return "ready_for_seed_selection", "use atlas_cue_seed selection or create catalog cue draft if append_cues are available"
if unscored_count:
return "needs_visual_score", "score returned images against atlas preservation gates"
if rejected_count:
return "rejected_only", "try new prompt variants; current variants failed preservation gates"
return "needs_prompt_variants", "add explicit prompt variants before seed selection"
def _score_state(score: dict[str, Any]) -> str:
decision, _blockers = _promotion_blockers(score)
if decision == "seedable_candidate":
return "scored_pass"
if decision == "needs_visual_score":
if any(_text(score.get(key)) for key in SCORE_KEYS):
return "partially_scored"
return "needs_visual_score"
return "scored_rejected"
def build_baseline_score_sheet(manifest: dict[str, Any], *, variant_key: str = "") -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
requested_variant_key = _text(variant_key)
sheet_entries: list[dict[str, Any]] = []
state_counts = {
"scored_pass_count": 0,
"needs_visual_score_count": 0,
"partially_scored_count": 0,
"scored_rejected_count": 0,
}
for entry in entries:
if not isinstance(entry, dict):
continue
entry_variant_key = _text(entry.get("variant_key"))
if requested_variant_key and entry_variant_key != requested_variant_key:
continue
score = _merge_known_values(_score_template(), entry.get("score"))
score_state = _score_state(score)
if score_state == "scored_pass":
state_counts["scored_pass_count"] += 1
elif score_state == "needs_visual_score":
state_counts["needs_visual_score_count"] += 1
elif score_state == "partially_scored":
state_counts["partially_scored_count"] += 1
else:
state_counts["scored_rejected_count"] += 1
entry_id = _text(entry.get("id"))
sheet_entries.append(
{
"id": entry_id,
"source_stem": _text(entry.get("source_stem") or entry_id),
"variant_key": entry_variant_key,
"known_variant": bool(entry.get("known_variant")),
"prompt_path": _text(entry.get("prompt_path")),
"image_path": _text(entry.get("image_path")),
"prompt_text": _text(entry.get("prompt_text")),
"prompt_sha256": _text(entry.get("prompt_sha256")),
"seed_metadata": _merge_known_values(_seed_metadata(), entry.get("seed_metadata")),
"cue_axes": _merge_known_values(_cue_axes(), entry.get("cue_axes")),
"score": score,
"score_state": score_state,
"analysis_notes": "",
}
)
return {
"schema": BASELINE_SCORE_SHEET_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"entry_count": len(sheet_entries),
"score_keys": list(SCORE_KEYS),
"unscored_count": state_counts["needs_visual_score_count"],
**state_counts,
"entries": sheet_entries,
}
def _prompt_noise_excerpt(text: str, start: int, end: int, *, radius: int = 56) -> str:
prefix_start = max(0, start - radius)
suffix_end = min(len(text), end + radius)
excerpt = text[prefix_start:suffix_end].strip()
if prefix_start:
excerpt = f"...{excerpt}"
if suffix_end < len(text):
excerpt = f"{excerpt}..."
return re.sub(r"\s+", " ", excerpt)
def _normalized_prompt_phrase(text: str) -> str:
phrase = re.sub(r"[.!?;]+$", "", _text(text).lower()).strip()
return re.sub(r"\s+", " ", phrase)
def _prompt_noise_issues(
text: str,
*,
context: str,
prompt_variant_id: str = "",
cue_index: int | None = None,
) -> list[dict[str, Any]]:
prompt_text = _text(text)
if not prompt_text:
return []
issues: list[dict[str, Any]] = []
for match in PROMPT_OPTION_WORD_RE.finditer(prompt_text):
issues.append(
{
"context": context,
"prompt_variant_id": prompt_variant_id,
"cue_index": cue_index,
"code": "option_word",
"match": match.group(0),
"message": "option-list wording makes atlas geometry ambiguous for Krea2",
"excerpt": _prompt_noise_excerpt(prompt_text, match.start(), match.end()),
}
)
for match in PROMPT_NEGATIVE_CONDITIONING_RE.finditer(prompt_text):
issues.append(
{
"context": context,
"prompt_variant_id": prompt_variant_id,
"cue_index": cue_index,
"code": "negative_conditioning",
"match": match.group(0),
"message": "negative or policy wording should not be placed in positive atlas conditioning",
"excerpt": _prompt_noise_excerpt(prompt_text, match.start(), match.end()),
}
)
lower_text = prompt_text.lower()
for phrase in PROMPT_META_PHRASES:
start = lower_text.find(phrase)
while start != -1:
end = start + len(phrase)
issues.append(
{
"context": context,
"prompt_variant_id": prompt_variant_id,
"cue_index": cue_index,
"code": "meta_instruction",
"match": prompt_text[start:end],
"message": "meta or policy wording should be rewritten as direct visible image description",
"excerpt": _prompt_noise_excerpt(prompt_text, start, end),
}
)
start = lower_text.find(phrase, end)
seen_phrases: dict[str, tuple[int, int, str]] = {}
for match in PROMPT_DUPLICATE_PHRASE_RE.finditer(prompt_text):
phrase_text = match.group(0).strip()
normalized = _normalized_prompt_phrase(phrase_text)
if not normalized:
continue
word_count = len(re.findall(r"[a-z0-9']+", normalized))
if word_count < PROMPT_DUPLICATE_MIN_WORDS:
continue
if normalized not in seen_phrases:
seen_phrases[normalized] = (match.start(), match.end(), phrase_text)
continue
issues.append(
{
"context": context,
"prompt_variant_id": prompt_variant_id,
"cue_index": cue_index,
"code": "duplicate_phrase",
"match": phrase_text,
"message": "repeated prompt phrase makes atlas geometry noisy for Krea2",
"excerpt": _prompt_noise_excerpt(prompt_text, match.start(), match.end()),
}
)
return issues
def _prompt_noise_issues_for_entry(entry: dict[str, Any]) -> list[dict[str, Any]]:
entry_issues: list[dict[str, Any]] = []
entry_issues.extend(
_prompt_noise_issues(
_text(entry.get("prompt_text")),
context="baseline_prompt",
)
)
for variant in entry.get("prompt_variants") or []:
if not isinstance(variant, dict):
continue
prompt_variant_id = _text(variant.get("id"))
exact_text = _text(variant.get("text"))
if exact_text:
entry_issues.extend(
_prompt_noise_issues(
exact_text,
context="prompt_variant_text",
prompt_variant_id=prompt_variant_id,
)
)
for cue_index, cue in enumerate(_string_list(variant.get("append_cues"), field=f"prompt noise variant {prompt_variant_id}.append_cues")):
entry_issues.extend(
_prompt_noise_issues(
cue,
context="prompt_variant_append_cue",
prompt_variant_id=prompt_variant_id,
cue_index=cue_index,
)
)
return entry_issues
def _prompt_noise_code_counts(issues: list[dict[str, Any]]) -> dict[str, int]:
counts = {code: 0 for code in PROMPT_NOISE_CODES}
for issue in issues:
code = _text(issue.get("code"))
if code in counts:
counts[code] += 1
return counts
def build_prompt_noise_report(manifest: dict[str, Any], *, variant_key: str = "") -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
requested_variant_key = _text(variant_key)
report_entries: list[dict[str, Any]] = []
issue_code_counts = {code: 0 for code in PROMPT_NOISE_CODES}
scanned_entry_count = 0
for entry in entries:
if not isinstance(entry, dict):
continue
entry_variant_key = _text(entry.get("variant_key"))
if requested_variant_key and entry_variant_key != requested_variant_key:
continue
scanned_entry_count += 1
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
entry_issues = _prompt_noise_issues_for_entry(entry)
if not entry_issues:
continue
for code, count in _prompt_noise_code_counts(entry_issues).items():
issue_code_counts[code] += count
report_entries.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"known_variant": bool(entry.get("known_variant")),
"issue_count": len(entry_issues),
"issues": entry_issues,
}
)
issue_count = sum(entry.get("issue_count", 0) for entry in report_entries)
return {
"schema": PROMPT_NOISE_REPORT_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"entry_count": scanned_entry_count,
"clean_entry_count": scanned_entry_count - len(report_entries),
"issue_entry_count": len(report_entries),
"issue_count": issue_count,
"issue_code_counts": issue_code_counts,
"entries": report_entries,
}
def _sidecar_path_text(manifest: dict[str, Any], source_stem: str) -> str:
root_text = _text(manifest.get("root"))
if not root_text or not source_stem:
return ""
return str((Path(root_text).resolve() / f"{source_stem}{SIDECAR_SUFFIX}"))
def _cleanup_source_type(context: str) -> str:
if context == "baseline_prompt":
return "prompt_file"
if context == "prompt_variant_text":
return "sidecar_prompt_variant_text"
if context == "prompt_variant_append_cue":
return "sidecar_prompt_variant_append_cue"
return "unknown"
def _cleanup_item_for_context(
*,
manifest: dict[str, Any],
entry: dict[str, Any],
context: str,
prompt_variant_id: str = "",
cue_index: int | None = None,
) -> dict[str, Any]:
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
sidecar_filename = f"{source_stem}{SIDECAR_SUFFIX}" if source_stem else ""
source_type = _cleanup_source_type(context)
current_text = ""
source_path = ""
if context == "baseline_prompt":
current_text = _text(entry.get("prompt_text"))
source_path = _text(entry.get("prompt_path"))
sidecar_filename = ""
else:
source_path = _sidecar_path_text(manifest, source_stem)
for variant in entry.get("prompt_variants") or []:
if not isinstance(variant, dict):
continue
if _text(variant.get("id")) != prompt_variant_id:
continue
if context == "prompt_variant_text":
current_text = _text(variant.get("text"))
elif context == "prompt_variant_append_cue":
cues = _string_list(variant.get("append_cues"), field=f"cleanup prompt variant {prompt_variant_id}.append_cues")
if cue_index is not None and 0 <= cue_index < len(cues):
current_text = cues[cue_index]
break
return {
"variant_key": _text(entry.get("variant_key")),
"source_entry_id": entry_id,
"source_stem": source_stem,
"source_prompt_sha256": _text(entry.get("prompt_sha256")),
"context": context,
"source_type": source_type,
"source_path": source_path,
"sidecar_filename": sidecar_filename,
"prompt_variant_id": prompt_variant_id,
"cue_index": cue_index,
"current_text": current_text,
"current_text_sha256": _sha256_text(current_text),
"replacement_text": "",
"cleanup_notes": "",
"manual_review_required": True,
"issues": [],
}
def build_prompt_cleanup_sheet(manifest: dict[str, Any], *, variant_key: str = "") -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
requested_variant_key = _text(variant_key)
cleanup_items: list[dict[str, Any]] = []
issue_code_counts = {code: 0 for code in PROMPT_NOISE_CODES}
for entry in entries:
if not isinstance(entry, dict):
continue
entry_variant_key = _text(entry.get("variant_key"))
if requested_variant_key and entry_variant_key != requested_variant_key:
continue
issues = _prompt_noise_issues_for_entry(entry)
if not issues:
continue
for code, count in _prompt_noise_code_counts(issues).items():
issue_code_counts[code] += count
item_map: dict[tuple[str, str, int | None], dict[str, Any]] = {}
for issue in issues:
context = _text(issue.get("context"))
prompt_variant_id = _text(issue.get("prompt_variant_id"))
raw_cue_index = issue.get("cue_index")
cue_index = raw_cue_index if isinstance(raw_cue_index, int) and not isinstance(raw_cue_index, bool) else None
key = (context, prompt_variant_id, cue_index)
if key not in item_map:
item_map[key] = _cleanup_item_for_context(
manifest=manifest,
entry=entry,
context=context,
prompt_variant_id=prompt_variant_id,
cue_index=cue_index,
)
item_map[key]["issues"].append(issue)
for key in sorted(item_map):
item = item_map[key]
item["issue_count"] = len(item.get("issues") or [])
cleanup_items.append(item)
return {
"schema": PROMPT_CLEANUP_SHEET_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"cleanup_item_count": len(cleanup_items),
"issue_count": sum(item.get("issue_count", 0) for item in cleanup_items),
"issue_code_counts": issue_code_counts,
"instructions": "Fill replacement_text manually with direct positive visual wording; do not use this sheet to auto-invent cues.",
"cleanup_items": cleanup_items,
}
def validate_prompt_cleanup_sheet(sheet: dict[str, Any]) -> dict[str, Any]:
errors: list[str] = []
warnings: list[str] = []
schema = _text(sheet.get("schema"))
if schema and schema != PROMPT_CLEANUP_SHEET_SCHEMA:
errors.append(f"schema must be {PROMPT_CLEANUP_SHEET_SCHEMA}")
cleanup_items_raw = sheet.get("cleanup_items")
if not isinstance(cleanup_items_raw, list):
errors.append("cleanup_items must be a list")
cleanup_items_raw = []
validated_item_count = 0
for item_index, item in enumerate(cleanup_items_raw):
prefix = f"cleanup_items[{item_index}]"
if not isinstance(item, dict):
errors.append(f"{prefix} must be an object")
continue
validated_item_count += 1
context = _text(item.get("context"))
source_type = _text(item.get("source_type"))
expected_source_type = _cleanup_source_type(context)
if expected_source_type == "unknown":
errors.append(f"{prefix}.context is unsupported")
elif source_type != expected_source_type:
errors.append(f"{prefix}.source_type must be {expected_source_type}")
if not _text(item.get("variant_key")):
errors.append(f"{prefix}.variant_key is required")
if not _text(item.get("source_stem")):
errors.append(f"{prefix}.source_stem is required")
source_prompt_hash = _text(item.get("source_prompt_sha256"))
if not source_prompt_hash:
errors.append(f"{prefix}.source_prompt_sha256 is required")
current_text = _text(item.get("current_text"))
if not current_text:
errors.append(f"{prefix}.current_text is required")
current_text_hash = _text(item.get("current_text_sha256"))
if not current_text_hash:
errors.append(f"{prefix}.current_text_sha256 is required")
elif current_text and current_text_hash != _sha256_text(current_text):
errors.append(f"{prefix}.current_text_sha256 must match current_text")
if context == "baseline_prompt" and source_prompt_hash and current_text_hash and source_prompt_hash != current_text_hash:
errors.append(f"{prefix}.source_prompt_sha256 must match current_text_sha256 for baseline prompt cleanup")
replacement_text = _text(item.get("replacement_text"))
if not replacement_text:
errors.append(f"{prefix}.replacement_text is required")
elif replacement_text == current_text:
errors.append(f"{prefix}.replacement_text must change current_text")
else:
replacement_issues = _prompt_noise_issues(
replacement_text,
context=context or "cleanup_replacement",
prompt_variant_id=_text(item.get("prompt_variant_id")),
cue_index=item.get("cue_index") if isinstance(item.get("cue_index"), int) and not isinstance(item.get("cue_index"), bool) else None,
)
if replacement_issues:
errors.append(f"{prefix}.replacement_text still has prompt-noise issues")
if context == "baseline_prompt":
source_path = _text(item.get("source_path"))
if not source_path:
errors.append(f"{prefix}.source_path is required for baseline prompt cleanup")
elif Path(source_path).suffix.lower() not in PROMPT_SUFFIXES:
errors.append(f"{prefix}.source_path must reference a prompt file")
elif context == "prompt_variant_text":
if not _text(item.get("prompt_variant_id")):
errors.append(f"{prefix}.prompt_variant_id is required for sidecar text cleanup")
if not _text(item.get("sidecar_filename")):
errors.append(f"{prefix}.sidecar_filename is required for sidecar text cleanup")
elif context == "prompt_variant_append_cue":
if not _text(item.get("prompt_variant_id")):
errors.append(f"{prefix}.prompt_variant_id is required for sidecar append-cue cleanup")
cue_index = item.get("cue_index")
if not isinstance(cue_index, int) or isinstance(cue_index, bool) or cue_index < 0:
errors.append(f"{prefix}.cue_index must be a non-negative integer")
if not _text(item.get("sidecar_filename")):
errors.append(f"{prefix}.sidecar_filename is required for sidecar append-cue cleanup")
if not item.get("manual_review_required"):
warnings.append(f"{prefix}.manual_review_required is not true")
return {
"schema": PROMPT_CLEANUP_VALIDATION_SCHEMA,
"valid": not errors,
"error_count": len(errors),
"warning_count": len(warnings),
"cleanup_item_count": len(cleanup_items_raw),
"validated_item_count": validated_item_count,
"errors": errors,
"warnings": warnings,
}
def _path_is_under_root(path: Path, root: Path) -> bool:
try:
path.resolve().relative_to(root.resolve())
except ValueError:
return False
return True
def _cleanup_target_path(item: dict[str, Any], root: Path) -> Path:
context = _text(item.get("context"))
if context == "baseline_prompt":
path = Path(_text(item.get("source_path"))).resolve()
else:
sidecar_filename = _text(item.get("sidecar_filename"))
if not sidecar_filename or Path(sidecar_filename).name != sidecar_filename:
raise ValueError(f"sidecar filename must be a plain filename: {sidecar_filename!r}")
path = (root / sidecar_filename).resolve()
if not _path_is_under_root(path, root):
raise ValueError(f"cleanup target must be inside {root}: {path}")
return path
def _replace_sidecar_prompt_variant_text(sidecar: dict[str, Any], item: dict[str, Any]) -> tuple[dict[str, Any], str]:
variants = sidecar.get("prompt_variants")
if not isinstance(variants, list):
raise ValueError("sidecar prompt_variants must be a list")
prompt_variant_id = _text(item.get("prompt_variant_id"))
current_text = _text(item.get("current_text"))
replacement_text = _text(item.get("replacement_text"))
context = _text(item.get("context"))
for variant in variants:
if not isinstance(variant, dict) or _text(variant.get("id")) != prompt_variant_id:
continue
if context == "prompt_variant_text":
actual_text = _text(variant.get("text"))
if actual_text not in {current_text, replacement_text}:
raise ValueError(f"sidecar variant {prompt_variant_id}.text has drifted")
variant["text"] = replacement_text
return sidecar, "sidecar_prompt_variant_text"
if context == "prompt_variant_append_cue":
cues = _string_list(variant.get("append_cues"), field=f"cleanup sidecar variant {prompt_variant_id}.append_cues")
cue_index = item.get("cue_index")
if not isinstance(cue_index, int) or isinstance(cue_index, bool) or cue_index < 0 or cue_index >= len(cues):
raise ValueError(f"sidecar variant {prompt_variant_id}.append_cues index is out of range")
if cues[cue_index] not in {current_text, replacement_text}:
raise ValueError(f"sidecar variant {prompt_variant_id}.append_cues[{cue_index}] has drifted")
cues[cue_index] = replacement_text
variant["append_cues"] = cues
return sidecar, "sidecar_prompt_variant_append_cue"
raise ValueError(f"sidecar prompt variant {prompt_variant_id!r} was not found")
def apply_prompt_cleanup_sheet(sheet: dict[str, Any], folder: str | Path) -> dict[str, Any]:
validation = validate_prompt_cleanup_sheet(sheet)
if not validation["valid"]:
return {
"schema": PROMPT_CLEANUP_APPLY_REPORT_SCHEMA,
"applied": False,
"root": str(Path(folder).resolve()),
"updated_file_count": 0,
"updated_files": [],
"validation": validation,
}
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"cleanup folder does not exist: {root}")
updated_by_path: dict[str, dict[str, Any]] = {}
for item in sheet.get("cleanup_items", []):
if not isinstance(item, dict):
continue
target_path = _cleanup_target_path(item, root)
context = _text(item.get("context"))
current_text = _text(item.get("current_text"))
replacement_text = _text(item.get("replacement_text"))
if context == "baseline_prompt":
actual_text = target_path.read_text(encoding="utf-8").strip()
if actual_text not in {current_text, replacement_text}:
raise ValueError(f"prompt file has drifted: {target_path}")
target_path.write_text(replacement_text, encoding="utf-8")
source_type = "prompt_file"
else:
sidecar = _read_json_object_if_present(target_path)
sidecar, source_type = _replace_sidecar_prompt_variant_text(sidecar, item)
target_path.write_text(json.dumps(sidecar, ensure_ascii=True, indent=2, sort_keys=True) + "\n", encoding="utf-8")
path_key = str(target_path)
if path_key not in updated_by_path:
updated_by_path[path_key] = {
"path": path_key,
"source_type": source_type,
"cleanup_item_count": 0,
}
updated_by_path[path_key]["cleanup_item_count"] += 1
updated_files = list(updated_by_path.values())
return {
"schema": PROMPT_CLEANUP_APPLY_REPORT_SCHEMA,
"applied": True,
"root": str(root),
"updated_file_count": len(updated_files),
"updated_files": updated_files,
"validation": validation,
}
def build_coverage_report(manifest: dict[str, Any]) -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
report_entries: list[dict[str, Any]] = []
totals = {
"baseline_only_count": 0,
"needs_prompt_cleanup_count": 0,
"needs_visual_score_count": 0,
"ready_for_seed_selection_count": 0,
"ready_for_catalog_review_count": 0,
"unknown_variant_count": 0,
"rejected_only_count": 0,
"prompt_variant_count": 0,
"seedable_variant_count": 0,
"catalog_cue_candidate_count": 0,
"unscored_variant_count": 0,
"rejected_variant_count": 0,
"prompt_noise_issue_count": 0,
"prompt_noise_entry_count": 0,
}
for entry in entries:
if not isinstance(entry, dict):
continue
variant_key = _text(entry.get("variant_key"))
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
known_variant = bool(entry.get("known_variant"))
prompt_text = _text(entry.get("prompt_text"))
prompt_variants = [variant for variant in entry.get("prompt_variants") or [] if isinstance(variant, dict)]
prompt_noise_issues = _prompt_noise_issues_for_entry(entry)
prompt_noise_issue_count = len(prompt_noise_issues)
prompt_noise_code_counts = _prompt_noise_code_counts(prompt_noise_issues)
seedable_count = 0
catalog_cue_count = 0
unscored_count = 0
rejected_count = 0
prompt_variant_summaries: list[dict[str, Any]] = []
for variant in prompt_variants:
variant_id = _text(variant.get("id"))
if not variant_id:
continue
append_cues = _string_list(variant.get("append_cues"), field=f"coverage prompt variant {variant_id}.append_cues")
tested_text = _variant_prompt_text(prompt_text, variant, field=f"coverage prompt variant {variant_id}")
prompt_source = _prompt_source_for_variant(
variant,
variant_id=variant_id,
text=tested_text,
append_cues=append_cues,
)
evidence = _prompt_variant_evidence(variant.get("evidence"), field=f"coverage prompt variant {variant_id}.evidence")
score = _merge_known_values(_score_template(), evidence.get("score"))
decision, blockers = _promotion_blockers(score)
matrix_evidence = _stable_matrix_evidence_for_variant(variant, field=f"coverage prompt variant {variant_id}")
if decision == "seedable_candidate" and "matrix_evidence" in variant and not matrix_evidence:
decision = "rejected"
blockers = ["unstable_matrix_evidence"]
if decision == "seedable_candidate":
seedable_count += 1
if prompt_source.get("kind") == "append_cues" and prompt_source.get("append_cues"):
catalog_cue_count += 1
elif decision == "needs_visual_score":
unscored_count += 1
elif decision == "rejected":
rejected_count += 1
prompt_variant_summaries.append(
{
"prompt_variant_id": variant_id,
"decision": decision,
"blockers": blockers,
"prompt_source_kind": prompt_source.get("kind") or "",
"has_append_cues": bool(prompt_source.get("append_cues")),
"has_evidence": bool(evidence),
"has_matrix_evidence": "matrix_evidence" in variant,
"matrix_evidence_stable": bool(matrix_evidence),
}
)
state, next_action = _coverage_state(
known_variant=known_variant,
prompt_noise_issue_count=prompt_noise_issue_count,
prompt_variant_count=len(prompt_variants),
seedable_count=seedable_count,
catalog_cue_count=catalog_cue_count,
unscored_count=unscored_count,
rejected_count=rejected_count,
)
totals["prompt_variant_count"] += len(prompt_variants)
totals["seedable_variant_count"] += seedable_count
totals["catalog_cue_candidate_count"] += catalog_cue_count
totals["unscored_variant_count"] += unscored_count
totals["rejected_variant_count"] += rejected_count
totals["prompt_noise_issue_count"] += prompt_noise_issue_count
if prompt_noise_issue_count:
totals["prompt_noise_entry_count"] += 1
if state == "baseline_only":
totals["baseline_only_count"] += 1
elif state == "needs_prompt_cleanup":
totals["needs_prompt_cleanup_count"] += 1
elif state == "needs_visual_score":
totals["needs_visual_score_count"] += 1
elif state == "ready_for_seed_selection":
totals["ready_for_seed_selection_count"] += 1
elif state == "ready_for_catalog_review":
totals["ready_for_catalog_review_count"] += 1
elif state == "unknown_variant":
totals["unknown_variant_count"] += 1
elif state == "rejected_only":
totals["rejected_only_count"] += 1
report_entries.append(
{
"id": entry_id,
"source_stem": source_stem,
"variant_key": variant_key,
"known_variant": known_variant,
"state": state,
"next_action": next_action,
"prompt_variant_count": len(prompt_variants),
"seedable_variant_count": seedable_count,
"catalog_cue_candidate_count": catalog_cue_count,
"unscored_variant_count": unscored_count,
"rejected_variant_count": rejected_count,
"prompt_noise_issue_count": prompt_noise_issue_count,
"prompt_noise_code_counts": prompt_noise_code_counts,
"prompt_variants": prompt_variant_summaries,
}
)
return {
"schema": COVERAGE_REPORT_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"entry_count": len(report_entries),
"missing_pair_count": int(manifest.get("missing_pair_count") or 0),
"manifest_unknown_variant_count": int(manifest.get("unknown_variant_count") or 0),
**totals,
"entries": report_entries,
}
def build_sidecar_scaffold(manifest: dict[str, Any], *, variant_key: str = "") -> dict[str, Any]:
entries = manifest.get("entries")
if not isinstance(entries, list):
raise ValueError("manifest entries must be a list")
requested_variant_key = _text(variant_key)
scaffolds: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
for entry in entries:
if not isinstance(entry, dict):
continue
entry_variant_key = _text(entry.get("variant_key"))
if requested_variant_key and entry_variant_key != requested_variant_key:
continue
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
prompt_variant_count = len([variant for variant in entry.get("prompt_variants") or [] if isinstance(variant, dict)])
if not bool(entry.get("known_variant")):
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"reason": "unknown_variant",
}
)
continue
if prompt_variant_count:
skipped.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"reason": "has_prompt_variants",
"prompt_variant_count": prompt_variant_count,
}
)
continue
seed_metadata = _merge_known_values(_seed_metadata(), entry.get("seed_metadata"))
cue_axes = _merge_known_values(_cue_axes(), entry.get("cue_axes"))
score = _merge_known_values(_score_template(), entry.get("score"))
scaffolds.append(
{
"variant_key": entry_variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"source_prompt_sha256": _text(entry.get("prompt_sha256")),
"prompt_path": _text(entry.get("prompt_path")),
"image_path": _text(entry.get("image_path")),
"sidecar_json": {
"seed_metadata": seed_metadata,
"cue_axes": cue_axes,
"score": score,
"prompt_variants": [],
"notes": "Add user-authored prompt_variants here; do not add negative-conditioning fields.",
},
"prompt_variant_template": {
"id": "",
"prompt_order": "subject_first",
"append_cues": [],
"reference_images": [],
"cue_axes": _cue_axes(),
"seed_metadata": _seed_metadata(),
"notes": "",
},
}
)
return {
"schema": SIDECAR_SCAFFOLD_SCHEMA,
"subject_id": _text(manifest.get("subject_id")),
"variant_key": requested_variant_key,
"scaffold_count": len(scaffolds),
"skipped_count": len(skipped),
"scaffolds": scaffolds,
"skipped": skipped,
}
def _has_filled_axis(values: dict[str, Any], keys: tuple[str, ...]) -> bool:
return any(values.get(key) not in (None, "", [], {}) for key in keys)
def build_baseline_score_update_draft(baseline_score_sheet: dict[str, Any]) -> dict[str, Any]:
schema = _text(baseline_score_sheet.get("schema"))
if schema and schema != BASELINE_SCORE_SHEET_SCHEMA:
raise ValueError(f"baseline score sheet schema must be {BASELINE_SCORE_SHEET_SCHEMA}")
entries = baseline_score_sheet.get("entries")
if not isinstance(entries, list):
raise ValueError("baseline score sheet entries must be a list")
updates: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
requested_variant_key = _text(baseline_score_sheet.get("variant_key"))
for index, entry in enumerate(entries):
if not isinstance(entry, dict):
skipped.append({"entry_index": index, "reason": "not_object"})
continue
entry_id = _text(entry.get("id"))
source_stem = _text(entry.get("source_stem") or entry_id)
variant_key = _text(entry.get("variant_key"))
skip_context = {
"entry_index": index,
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
}
if not source_stem:
skipped.append({**skip_context, "reason": "missing_source_stem"})
continue
if not bool(entry.get("known_variant")):
skipped.append({**skip_context, "reason": "unknown_variant"})
continue
score = _merge_known_values(_score_template(), entry.get("score"))
if not _has_filled_axis(score, SCORE_KEYS):
skipped.append({**skip_context, "reason": "no_score"})
continue
analysis_notes = _text(entry.get("analysis_notes"))
_validate_no_negative_channel(analysis_notes, field=f"baseline score entry {source_stem}.analysis_notes")
score_state = _score_state(score)
updates.append(
{
"variant_key": variant_key,
"source_entry_id": entry_id,
"source_stem": source_stem,
"sidecar_filename": f"{source_stem}{SIDECAR_SUFFIX}",
"source_prompt_sha256": _text(entry.get("prompt_sha256") or entry.get("source_prompt_sha256")),
"prompt_path": _text(entry.get("prompt_path")),
"image_path": _text(entry.get("image_path")),
"seed_metadata": _merge_known_values(_seed_metadata(), entry.get("seed_metadata")),
"cue_axes": _merge_known_values(_cue_axes(), entry.get("cue_axes")),
"score": score,
"score_state": score_state,
"analysis_notes": analysis_notes,
}
)
return {
"schema": BASELINE_SCORE_UPDATE_DRAFT_SCHEMA,
"subject_id": _text(baseline_score_sheet.get("subject_id")),
"variant_key": requested_variant_key,
"update_count": len(updates),
"skipped_count": len(skipped),
"updates": updates,
"skipped": skipped,
}
def validate_baseline_score_update_draft(draft: dict[str, Any]) -> dict[str, Any]:
errors: list[str] = []
warnings: list[str] = []
schema = _text(draft.get("schema"))
if schema and schema != BASELINE_SCORE_UPDATE_DRAFT_SCHEMA:
errors.append(f"schema must be {BASELINE_SCORE_UPDATE_DRAFT_SCHEMA}")
updates_raw = draft.get("updates")
if not isinstance(updates_raw, list):
errors.append("updates must be a list")
updates_raw = []
validated_update_count = 0
for update_index, update in enumerate(updates_raw):
if not isinstance(update, dict):
errors.append(f"updates[{update_index}] must be an object")
continue
validated_update_count += 1
prefix = f"updates[{update_index}]"
for forbidden in (*FORBIDDEN_PROMPT_FIELDS, "prompt_variants"):
if forbidden in update:
errors.append(f"{prefix} must not contain {forbidden}")
variant_key = _text(update.get("variant_key"))
if not variant_key:
errors.append(f"{prefix}.variant_key is required")
source_stem = _text(update.get("source_stem"))
if not source_stem:
errors.append(f"{prefix}.source_stem is required")
expected_sidecar = f"{source_stem}{SIDECAR_SUFFIX}" if source_stem else ""
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename:
errors.append(f"{prefix}.sidecar_filename is required")
elif Path(sidecar_filename).name != sidecar_filename:
errors.append(f"{prefix}.sidecar_filename must be a plain filename")
elif expected_sidecar and sidecar_filename != expected_sidecar:
errors.append(f"{prefix}.sidecar_filename must be {expected_sidecar}")
if not _text(update.get("source_prompt_sha256")):
errors.append(f"{prefix}.source_prompt_sha256 is required")
image_path = _text(update.get("image_path"))
if image_path:
try:
_image_path(image_path, field=f"{prefix}.image_path")
except ValueError as exc:
errors.append(str(exc))
score = _merge_known_values(_score_template(), update.get("score"))
if not _has_filled_axis(score, SCORE_KEYS):
errors.append(f"{prefix}.score must include at least one filled score")
continue
score_state = _score_state(score)
declared_score_state = _text(update.get("score_state"))
if declared_score_state and declared_score_state != score_state:
errors.append(f"{prefix}.score_state must be {score_state}")
if score_state == "partially_scored":
warnings.append(f"{prefix}.score is partially scored")
elif score_state == "scored_rejected":
warnings.append(f"{prefix}.score is rejected baseline evidence")
analysis_notes = _text(update.get("analysis_notes"))
try:
_validate_no_negative_channel(analysis_notes, field=f"{prefix}.analysis_notes")
except ValueError as exc:
errors.append(str(exc))
return {
"schema": BASELINE_SCORE_UPDATE_VALIDATION_SCHEMA,
"valid": not errors,
"error_count": len(errors),
"warning_count": len(warnings),
"update_count": len(updates_raw),
"validated_update_count": validated_update_count,
"errors": errors,
"warnings": warnings,
}
def validate_reference_cue_sidecar_author_draft(draft: dict[str, Any]) -> dict[str, Any]:
errors: list[str] = []
warnings: list[str] = []
schema = _text(draft.get("schema"))
if schema and schema != REFERENCE_CUE_SIDECAR_AUTHOR_DRAFT_SCHEMA:
errors.append(f"schema must be {REFERENCE_CUE_SIDECAR_AUTHOR_DRAFT_SCHEMA}")
updates_raw = draft.get("updates")
if not isinstance(updates_raw, list):
errors.append("updates must be a list")
updates_raw = []
validated_variant_count = 0
for update_index, update in enumerate(updates_raw):
if not isinstance(update, dict):
errors.append(f"updates[{update_index}] must be an object")
continue
prefix = f"updates[{update_index}]"
variant_key = _text(update.get("variant_key"))
if not variant_key:
errors.append(f"{prefix}.variant_key is required")
source_stem = _text(update.get("source_stem"))
if not source_stem:
errors.append(f"{prefix}.source_stem is required")
expected_sidecar = f"{source_stem}{SIDECAR_SUFFIX}" if source_stem else ""
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename:
errors.append(f"{prefix}.sidecar_filename is required")
elif Path(sidecar_filename).name != sidecar_filename:
errors.append(f"{prefix}.sidecar_filename must be a plain filename")
elif expected_sidecar and sidecar_filename != expected_sidecar:
errors.append(f"{prefix}.sidecar_filename must be {expected_sidecar}")
if not _text(update.get("source_prompt_sha256")):
errors.append(f"{prefix}.source_prompt_sha256 is required")
image_path = _text(update.get("image_path"))
if image_path:
try:
_image_path(image_path, field=f"{prefix}.image_path")
except ValueError as exc:
errors.append(str(exc))
variants_raw = update.get("prompt_variants")
if not isinstance(variants_raw, list) or not variants_raw:
errors.append(f"{prefix}.prompt_variants must be a non-empty list")
continue
seen_variant_ids: set[str] = set()
for variant_index, variant in enumerate(variants_raw):
variant_prefix = f"{prefix}.prompt_variants[{variant_index}]"
if not isinstance(variant, dict):
errors.append(f"{variant_prefix} must be an object")
continue
validated_variant_count += 1
for forbidden in FORBIDDEN_PROMPT_FIELDS:
if forbidden in variant:
errors.append(f"{variant_prefix} must not contain {forbidden}")
variant_id = _text(variant.get("id"))
if not variant_id:
errors.append(f"{variant_prefix}.id is required")
elif variant_id in seen_variant_ids:
errors.append(f"{variant_prefix}.id {variant_id!r} is duplicated in this sidecar author draft")
seen_variant_ids.add(variant_id)
if variant_id:
_validate_prompt_source_identity(variant, variant_id=variant_id, prefix=variant_prefix, errors=errors)
prompt_order = _text(variant.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
errors.append(f"{variant_prefix}.prompt_order must be one of {sorted(PROMPT_ORDERS)}")
text = _text(variant.get("text"))
append_cues: list[str] = []
try:
append_cues = _string_list(variant.get("append_cues"), field=f"{variant_prefix}.append_cues")
except ValueError as exc:
errors.append(str(exc))
if bool(text) == bool(append_cues):
errors.append(f"{variant_prefix} must provide exactly one of text or append_cues")
if text:
try:
_validate_no_negative_channel(text, field=f"{variant_prefix}.text")
except ValueError as exc:
errors.append(str(exc))
for cue_index, cue in enumerate(append_cues):
prompt_noise_issues = _prompt_noise_issues(
cue,
context="reference_cue_sidecar_author_append_cue",
prompt_variant_id=variant_id,
cue_index=cue_index,
)
for issue in prompt_noise_issues:
errors.append(
f"{variant_prefix}.append_cues[{cue_index}] prompt_noise {issue.get('code')}: {issue.get('match')}"
)
reference_images = _reference_images(variant.get("reference_images"), field=f"{variant_prefix}.reference_images")
if not reference_images:
errors.append(f"{variant_prefix}.reference_images must include at least one canonical atlas reference")
cue_axes = _merge_known_values(_cue_axes(), variant.get("cue_axes"))
if not _has_filled_axis(cue_axes, CUE_AXIS_KEYS):
errors.append(f"{variant_prefix}.cue_axes must include at least one filled cue axis")
if not _text(variant.get("notes")):
warnings.append(f"{variant_prefix}.notes is empty")
return {
"schema": REFERENCE_CUE_SIDECAR_AUTHOR_VALIDATION_SCHEMA,
"valid": not errors,
"error_count": len(errors),
"warning_count": len(warnings),
"update_count": len(updates_raw),
"validated_variant_count": validated_variant_count,
"errors": errors,
"warnings": warnings,
}
def validate_sidecar_update_draft(draft: dict[str, Any]) -> dict[str, Any]:
errors: list[str] = []
warnings: list[str] = []
schema = _text(draft.get("schema"))
if schema and schema != SIDECAR_UPDATE_DRAFT_SCHEMA:
errors.append(f"schema must be {SIDECAR_UPDATE_DRAFT_SCHEMA}")
seed = draft.get("seed")
if not isinstance(seed, int) or isinstance(seed, bool):
errors.append("seed must be an integer sampler seed")
updates_raw = draft.get("updates")
if not isinstance(updates_raw, list):
errors.append("updates must be a list")
updates_raw = []
validated_variant_count = 0
for update_index, update in enumerate(updates_raw):
if not isinstance(update, dict):
errors.append(f"updates[{update_index}] must be an object")
continue
source_stem = _text(update.get("source_stem"))
if not source_stem:
errors.append(f"updates[{update_index}].source_stem is required")
expected_sidecar = f"{source_stem}{SIDECAR_SUFFIX}" if source_stem else ""
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename:
errors.append(f"updates[{update_index}].sidecar_filename is required")
elif expected_sidecar and sidecar_filename != expected_sidecar:
errors.append(f"updates[{update_index}].sidecar_filename must be {expected_sidecar}")
variants_raw = update.get("prompt_variants")
if not isinstance(variants_raw, list) or not variants_raw:
errors.append(f"updates[{update_index}].prompt_variants must be a non-empty list")
continue
seen_variant_ids: set[str] = set()
for variant_index, variant in enumerate(variants_raw):
prefix = f"updates[{update_index}].prompt_variants[{variant_index}]"
if not isinstance(variant, dict):
errors.append(f"{prefix} must be an object")
continue
validated_variant_count += 1
for forbidden in FORBIDDEN_PROMPT_FIELDS:
if forbidden in variant:
errors.append(f"{prefix} must not contain {forbidden}")
variant_id = _text(variant.get("id"))
if not variant_id:
errors.append(f"{prefix}.id is required")
elif variant_id in seen_variant_ids:
errors.append(f"{prefix}.id {variant_id!r} is duplicated in this sidecar update")
seen_variant_ids.add(variant_id)
if variant_id:
_validate_prompt_source_identity(variant, variant_id=variant_id, prefix=prefix, errors=errors)
prompt_order = _text(variant.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
errors.append(f"{prefix}.prompt_order must be one of {sorted(PROMPT_ORDERS)}")
text = _text(variant.get("text"))
if not text:
errors.append(f"{prefix}.text is required")
elif NEGATIVE_OUT_CHANNEL in text:
errors.append(f"{prefix}.text must not mention {NEGATIVE_OUT_CHANNEL}")
cue_axes = _merge_known_values(_cue_axes(), variant.get("cue_axes"))
if not _has_filled_axis(cue_axes, CUE_AXIS_KEYS):
errors.append(f"{prefix}.cue_axes must include at least one filled cue axis")
evidence = variant.get("evidence")
if not isinstance(evidence, dict):
errors.append(f"{prefix}.evidence is required")
continue
evidence_seed = evidence.get("seed")
if not isinstance(evidence_seed, int) or isinstance(evidence_seed, bool):
errors.append(f"{prefix}.evidence.seed must be an integer sampler seed")
elif isinstance(seed, int) and not isinstance(seed, bool) and evidence_seed != seed:
errors.append(f"{prefix}.evidence.seed {evidence_seed} does not match draft seed {seed}")
try:
_image_path(evidence.get("image_path"), field=f"{prefix}.evidence.image_path")
except ValueError as exc:
errors.append(str(exc))
score = _merge_known_values(_score_template(), evidence.get("score"))
decision, blockers = _promotion_blockers(score)
if decision != "seedable_candidate":
for blocker in blockers:
errors.append(f"{prefix}.evidence.score failed promotion gate: {blocker}")
if not _text(variant.get("notes")):
warnings.append(f"{prefix}.notes is empty")
return {
"schema": SIDECAR_UPDATE_VALIDATION_SCHEMA,
"valid": not errors,
"error_count": len(errors),
"warning_count": len(warnings),
"update_count": len(updates_raw),
"validated_variant_count": validated_variant_count,
"errors": errors,
"warnings": warnings,
}
def validate_matrix_sidecar_update_draft(draft: dict[str, Any]) -> dict[str, Any]:
errors: list[str] = []
warnings: list[str] = []
schema = _text(draft.get("schema"))
if schema and schema != MATRIX_SIDECAR_UPDATE_DRAFT_SCHEMA:
errors.append(f"schema must be {MATRIX_SIDECAR_UPDATE_DRAFT_SCHEMA}")
updates_raw = draft.get("updates")
if not isinstance(updates_raw, list):
errors.append("updates must be a list")
updates_raw = []
validated_variant_count = 0
for update_index, update in enumerate(updates_raw):
if not isinstance(update, dict):
errors.append(f"updates[{update_index}] must be an object")
continue
source_stem = _text(update.get("source_stem"))
if not source_stem:
errors.append(f"updates[{update_index}].source_stem is required")
expected_sidecar = f"{source_stem}{SIDECAR_SUFFIX}" if source_stem else ""
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename:
errors.append(f"updates[{update_index}].sidecar_filename is required")
elif Path(sidecar_filename).name != sidecar_filename:
errors.append(f"updates[{update_index}].sidecar_filename must be a plain filename")
elif expected_sidecar and sidecar_filename != expected_sidecar:
errors.append(f"updates[{update_index}].sidecar_filename must be {expected_sidecar}")
variants_raw = update.get("prompt_variants")
if not isinstance(variants_raw, list) or not variants_raw:
errors.append(f"updates[{update_index}].prompt_variants must be a non-empty list")
continue
seen_variant_ids: set[str] = set()
for variant_index, variant in enumerate(variants_raw):
prefix = f"updates[{update_index}].prompt_variants[{variant_index}]"
if not isinstance(variant, dict):
errors.append(f"{prefix} must be an object")
continue
validated_variant_count += 1
for forbidden in FORBIDDEN_PROMPT_FIELDS:
if forbidden in variant:
errors.append(f"{prefix} must not contain {forbidden}")
variant_id = _text(variant.get("id"))
if not variant_id:
errors.append(f"{prefix}.id is required")
elif variant_id in seen_variant_ids:
errors.append(f"{prefix}.id {variant_id!r} is duplicated in this sidecar update")
seen_variant_ids.add(variant_id)
if variant_id:
_validate_prompt_source_identity(variant, variant_id=variant_id, prefix=prefix, errors=errors)
prompt_order = _text(variant.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
errors.append(f"{prefix}.prompt_order must be one of {sorted(PROMPT_ORDERS)}")
text = _text(variant.get("text"))
if not text:
errors.append(f"{prefix}.text is required")
else:
try:
_validate_no_negative_channel(text, field=f"{prefix}.text")
except ValueError as exc:
errors.append(str(exc))
cue_axes = _merge_known_values(_cue_axes(), variant.get("cue_axes"))
if not _has_filled_axis(cue_axes, CUE_AXIS_KEYS):
errors.append(f"{prefix}.cue_axes must include at least one filled cue axis")
evidence = variant.get("evidence")
evidence_seed: int | None = None
evidence_image_path = ""
evidence_turn: Any = None
evidence_score: dict[str, Any] | None = None
if not isinstance(evidence, dict):
errors.append(f"{prefix}.evidence is required")
else:
try:
evidence_seed = _int_seed(evidence.get("seed"), field=f"{prefix}.evidence.seed")
except ValueError as exc:
errors.append(str(exc))
evidence_turn = evidence.get("turn")
if not isinstance(evidence_turn, int) or isinstance(evidence_turn, bool):
errors.append(f"{prefix}.evidence.turn must be an integer")
try:
evidence_image_path = _image_path(evidence.get("image_path"), field=f"{prefix}.evidence.image_path")
except ValueError as exc:
errors.append(str(exc))
evidence_score = _merge_known_values(_score_template(), evidence.get("score"))
decision, blockers = _promotion_blockers(evidence_score)
if decision != "seedable_candidate":
for blocker in blockers:
errors.append(f"{prefix}.evidence.score failed promotion gate: {blocker}")
matrix_evidence = variant.get("matrix_evidence")
if not isinstance(matrix_evidence, dict):
errors.append(f"{prefix}.matrix_evidence is required")
continue
if matrix_evidence.get("stable") is not True:
errors.append(f"{prefix}.matrix_evidence.stable must be true")
try:
selection_seed = _int_seed(matrix_evidence.get("selection_seed"), field=f"{prefix}.matrix_evidence.selection_seed")
except ValueError as exc:
errors.append(str(exc))
selection_seed = None
seed_slot = _text(matrix_evidence.get("seed_slot"))
if seed_slot not in SEED_SELECTION_SLOT_KEYS:
errors.append(
f"{prefix}.matrix_evidence.seed_slot must be one of {list(SEED_SELECTION_SLOT_KEYS)} and must not be sampler_seed"
)
elif selection_seed is not None:
seed_metadata = _merge_known_values(_seed_metadata(), variant.get("seed_metadata"))
try:
seed_metadata_value = _int_seed(
seed_metadata.get(seed_slot),
field=f"{prefix}.seed_metadata.{seed_slot}",
)
except ValueError as exc:
errors.append(str(exc))
else:
if seed_metadata_value != selection_seed:
errors.append(
f"{prefix}.seed_metadata.{seed_slot} {seed_metadata_value} "
f"must match matrix_evidence.selection_seed {selection_seed}"
)
sampler_seeds_raw = matrix_evidence.get("sampler_seeds")
sampler_seeds: list[int] = []
if not isinstance(sampler_seeds_raw, list) or not sampler_seeds_raw:
errors.append(f"{prefix}.matrix_evidence.sampler_seeds must be a non-empty list")
else:
seen_declared_sampler_seeds: set[int] = set()
for seed_index, sampler_seed in enumerate(sampler_seeds_raw):
try:
declared_sampler_seed = _int_seed(
sampler_seed,
field=f"{prefix}.matrix_evidence.sampler_seeds[{seed_index}]",
)
sampler_seeds.append(declared_sampler_seed)
if declared_sampler_seed in seen_declared_sampler_seeds:
errors.append(
f"{prefix}.matrix_evidence.sampler_seeds value {declared_sampler_seed} is duplicated"
)
seen_declared_sampler_seeds.add(declared_sampler_seed)
except ValueError as exc:
errors.append(str(exc))
if len(seen_declared_sampler_seeds) < MIN_STABLE_MATRIX_SAMPLER_SEEDS:
errors.append(
f"{prefix}.matrix_evidence.sampler_seeds must include at least "
f"{MIN_STABLE_MATRIX_SAMPLER_SEEDS} unique sampler seeds"
)
jobs_raw = matrix_evidence.get("jobs")
if not isinstance(jobs_raw, list) or not jobs_raw:
errors.append(f"{prefix}.matrix_evidence.jobs must be a non-empty list")
jobs_raw = []
for count_field, expected_count in (
("job_count", len(jobs_raw)),
("promotion_ready_count", len(jobs_raw)),
):
count_value = matrix_evidence.get(count_field)
if not isinstance(count_value, int) or isinstance(count_value, bool):
errors.append(f"{prefix}.matrix_evidence.{count_field} must be an integer")
elif count_value != expected_count:
errors.append(f"{prefix}.matrix_evidence.{count_field} must equal matrix_evidence.jobs count")
blocked_count = matrix_evidence.get("blocked_count")
if blocked_count != 0:
errors.append(f"{prefix}.matrix_evidence.blocked_count must be 0")
job_sampler_seeds: list[int] = []
seen_job_ids: set[str] = set()
seen_job_sampler_seeds: set[int] = set()
jobs_by_sampler_seed: dict[int, dict[str, Any]] = {}
for job_index, job in enumerate(jobs_raw):
job_prefix = f"{prefix}.matrix_evidence.jobs[{job_index}]"
if not isinstance(job, dict):
errors.append(f"{job_prefix} must be an object")
continue
job_id = _text(job.get("id"))
if not job_id:
errors.append(f"{job_prefix}.id is required")
elif job_id in seen_job_ids:
errors.append(f"{prefix}.matrix_evidence.jobs id {job_id!r} is duplicated")
seen_job_ids.add(job_id)
if _text(job.get("decision")) != "seedable_candidate":
errors.append(f"{job_prefix}.decision must be seedable_candidate")
try:
job_sampler_seed = _int_seed(job.get("sampler_seed"), field=f"{job_prefix}.sampler_seed")
job_sampler_seeds.append(job_sampler_seed)
if job_sampler_seed in seen_job_sampler_seeds:
errors.append(f"{prefix}.matrix_evidence.jobs sampler_seed {job_sampler_seed} is duplicated")
else:
jobs_by_sampler_seed[job_sampler_seed] = job
seen_job_sampler_seeds.add(job_sampler_seed)
if sampler_seeds and job_sampler_seed not in sampler_seeds:
errors.append(f"{job_prefix}.sampler_seed must be listed in matrix_evidence.sampler_seeds")
except ValueError as exc:
errors.append(str(exc))
try:
job_selection_seed = _int_seed(job.get("selection_seed"), field=f"{job_prefix}.selection_seed")
if selection_seed is not None and job_selection_seed != selection_seed:
errors.append(f"{job_prefix}.selection_seed must match matrix_evidence.selection_seed")
except ValueError as exc:
errors.append(str(exc))
try:
_image_path(job.get("image_path"), field=f"{job_prefix}.image_path")
except ValueError as exc:
errors.append(str(exc))
turn = job.get("turn")
if not isinstance(turn, int) or isinstance(turn, bool):
errors.append(f"{job_prefix}.turn must be an integer")
job_score = _merge_known_values(_score_template(), job.get("score"))
decision, blockers = _promotion_blockers(job_score)
if decision != "seedable_candidate":
for blocker in blockers:
errors.append(f"{job_prefix}.score failed promotion gate: {blocker}")
if sampler_seeds and sorted(set(job_sampler_seeds)) != sorted(set(sampler_seeds)):
errors.append(f"{prefix}.matrix_evidence.jobs must cover every sampler seed")
if evidence_seed is not None and sampler_seeds and evidence_seed not in sampler_seeds:
errors.append(f"{prefix}.evidence.seed must be one of matrix_evidence.sampler_seeds")
if evidence_seed is not None:
representative_job = jobs_by_sampler_seed.get(evidence_seed)
if representative_job is None:
errors.append(f"{prefix}.evidence.seed must match a matrix_evidence.jobs sampler_seed")
else:
representative_prefix = f"{prefix}.matrix_evidence.jobs entry for evidence.seed {evidence_seed}"
try:
representative_image_path = _image_path(
representative_job.get("image_path"),
field=f"{representative_prefix}.image_path",
)
except ValueError:
representative_image_path = ""
if evidence_image_path and representative_image_path and evidence_image_path != representative_image_path:
errors.append(f"{prefix}.evidence.image_path must match {representative_prefix}.image_path")
if evidence_turn != representative_job.get("turn"):
errors.append(f"{prefix}.evidence.turn must match {representative_prefix}.turn")
representative_score = _merge_known_values(_score_template(), representative_job.get("score"))
if evidence_score is not None and evidence_score != representative_score:
errors.append(f"{prefix}.evidence.score must match {representative_prefix}.score")
if not _text(variant.get("notes")):
warnings.append(f"{prefix}.notes is empty")
return {
"schema": MATRIX_SIDECAR_UPDATE_VALIDATION_SCHEMA,
"valid": not errors,
"error_count": len(errors),
"warning_count": len(warnings),
"update_count": len(updates_raw),
"validated_variant_count": validated_variant_count,
"errors": errors,
"warnings": warnings,
}
def _read_json_object_if_present(path: Path) -> dict[str, Any]:
if not path.is_file():
return {}
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain one JSON object")
return data
def _validate_prompt_source_identity(variant: dict[str, Any], *, variant_id: str, prefix: str, errors: list[str]) -> None:
prompt_source = variant.get("prompt_source")
if prompt_source is None:
return
if not isinstance(prompt_source, dict):
errors.append(f"{prefix}.prompt_source must be an object")
return
source_variant_id = _text(prompt_source.get("prompt_variant_id"))
if source_variant_id and source_variant_id != variant_id:
errors.append(f"{prefix}.prompt_source.prompt_variant_id {source_variant_id!r} must match id {variant_id!r}")
def apply_baseline_score_update_draft(draft: dict[str, Any], folder: str | Path) -> dict[str, Any]:
validation = validate_baseline_score_update_draft(draft)
if not validation["valid"]:
return {
"schema": BASELINE_SCORE_APPLY_REPORT_SCHEMA,
"applied": False,
"root": str(Path(folder).resolve()),
"updated_file_count": 0,
"updated_files": [],
"validation": validation,
}
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"sidecar folder does not exist: {root}")
updated_files: list[dict[str, Any]] = []
for update in draft.get("updates", []):
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename or Path(sidecar_filename).name != sidecar_filename:
raise ValueError(f"sidecar filename must be a plain filename: {sidecar_filename!r}")
sidecar_path = root / sidecar_filename
sidecar = _read_json_object_if_present(sidecar_path)
score = _merge_known_values(_score_template(), update.get("score"))
sidecar["seed_metadata"] = _merge_known_values(_seed_metadata(), update.get("seed_metadata"))
sidecar["cue_axes"] = _merge_known_values(_cue_axes(), update.get("cue_axes"))
sidecar["score"] = score
sidecar["baseline_score_state"] = _score_state(score)
sidecar["baseline_source_prompt_sha256"] = _text(update.get("source_prompt_sha256"))
sidecar["baseline_analysis_notes"] = _text(update.get("analysis_notes"))
sidecar_path.write_text(json.dumps(sidecar, ensure_ascii=True, indent=2, sort_keys=True) + "\n", encoding="utf-8")
updated_files.append(
{
"sidecar_filename": sidecar_filename,
"sidecar_path": str(sidecar_path),
"score_state": sidecar["baseline_score_state"],
}
)
return {
"schema": BASELINE_SCORE_APPLY_REPORT_SCHEMA,
"applied": True,
"root": str(root),
"updated_file_count": len(updated_files),
"updated_files": updated_files,
"validation": validation,
}
def _upsert_prompt_variants(existing: Any, incoming: list[dict[str, Any]]) -> list[dict[str, Any]]:
if existing is None:
variants: list[dict[str, Any]] = []
elif not isinstance(existing, list):
raise ValueError("existing sidecar prompt_variants must be a list")
else:
variants = []
seen_existing_ids: set[str] = set()
for index, item in enumerate(existing):
if not isinstance(item, dict):
raise ValueError(f"existing sidecar prompt_variants[{index}] must be an object")
variant_id = _text(item.get("id"))
if not variant_id:
raise ValueError(f"existing sidecar prompt_variants[{index}].id is required")
if variant_id in seen_existing_ids:
raise ValueError(f"existing sidecar prompt_variants[{index}].id {variant_id!r} is duplicated")
seen_existing_ids.add(variant_id)
variants.append(dict(item))
index_by_id = {_text(variant.get("id")): index for index, variant in enumerate(variants)}
for variant in incoming:
variant_copy = dict(variant)
variant_id = _text(variant_copy.get("id"))
if variant_id in index_by_id:
variants[index_by_id[variant_id]] = variant_copy
else:
index_by_id[variant_id] = len(variants)
variants.append(variant_copy)
return variants
def _prompt_path_for_source_stem(root: Path, source_stem: str) -> Path:
for suffix in (".txt", ".prompt"):
path = root / f"{source_stem}{suffix}"
if path.is_file():
return path
raise FileNotFoundError(f"prompt file for source stem {source_stem!r} does not exist in {root}")
def apply_reference_cue_sidecar_author_draft(draft: dict[str, Any], folder: str | Path) -> dict[str, Any]:
validation = validate_reference_cue_sidecar_author_draft(draft)
if not validation["valid"]:
return {
"schema": REFERENCE_CUE_SIDECAR_AUTHOR_APPLY_REPORT_SCHEMA,
"applied": False,
"root": str(Path(folder).resolve()),
"updated_file_count": 0,
"updated_files": [],
"validation": validation,
}
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"sidecar folder does not exist: {root}")
updated_files: list[dict[str, Any]] = []
for update in draft.get("updates", []):
source_stem = _text(update.get("source_stem"))
source_prompt_sha256 = _text(update.get("source_prompt_sha256"))
prompt_path = _prompt_path_for_source_stem(root, source_stem)
actual_prompt_sha256 = _sha256_text(prompt_path.read_text(encoding="utf-8").strip())
if source_prompt_sha256 and actual_prompt_sha256 != source_prompt_sha256:
raise ValueError(f"prompt file has drifted for {source_stem}: {prompt_path}")
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename or Path(sidecar_filename).name != sidecar_filename:
raise ValueError(f"sidecar filename must be a plain filename: {sidecar_filename!r}")
sidecar_path = root / sidecar_filename
sidecar = _read_json_object_if_present(sidecar_path)
incoming_variants = [dict(variant) for variant in update.get("prompt_variants", []) if isinstance(variant, dict)]
sidecar["prompt_variants"] = _upsert_prompt_variants(sidecar.get("prompt_variants"), incoming_variants)
sidecar["reference_cue_author_source_prompt_sha256"] = source_prompt_sha256
sidecar["reference_cue_author_notes"] = _text(update.get("notes"))
sidecar_path.write_text(json.dumps(sidecar, ensure_ascii=True, indent=2, sort_keys=True) + "\n", encoding="utf-8")
updated_files.append(
{
"sidecar_filename": sidecar_filename,
"sidecar_path": str(sidecar_path),
"prompt_variant_count": len(incoming_variants),
}
)
return {
"schema": REFERENCE_CUE_SIDECAR_AUTHOR_APPLY_REPORT_SCHEMA,
"applied": True,
"root": str(root),
"updated_file_count": len(updated_files),
"updated_files": updated_files,
"validation": validation,
}
def apply_sidecar_update_draft(draft: dict[str, Any], folder: str | Path) -> dict[str, Any]:
validation = validate_sidecar_update_draft(draft)
if not validation["valid"]:
return {
"schema": SIDECAR_APPLY_REPORT_SCHEMA,
"applied": False,
"root": str(Path(folder).resolve()),
"updated_file_count": 0,
"updated_files": [],
"validation": validation,
}
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"sidecar folder does not exist: {root}")
updated_files: list[dict[str, Any]] = []
for update in draft.get("updates", []):
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename or Path(sidecar_filename).name != sidecar_filename:
raise ValueError(f"sidecar filename must be a plain filename: {sidecar_filename!r}")
sidecar_path = root / sidecar_filename
sidecar = _read_json_object_if_present(sidecar_path)
incoming_variants = [dict(variant) for variant in update.get("prompt_variants", []) if isinstance(variant, dict)]
sidecar["prompt_variants"] = _upsert_prompt_variants(sidecar.get("prompt_variants"), incoming_variants)
sidecar_path.write_text(json.dumps(sidecar, ensure_ascii=True, indent=2, sort_keys=True) + "\n", encoding="utf-8")
updated_files.append(
{
"sidecar_filename": sidecar_filename,
"sidecar_path": str(sidecar_path),
"prompt_variant_count": len(incoming_variants),
}
)
return {
"schema": SIDECAR_APPLY_REPORT_SCHEMA,
"applied": True,
"root": str(root),
"updated_file_count": len(updated_files),
"updated_files": updated_files,
"validation": validation,
}
def apply_matrix_sidecar_update_draft(draft: dict[str, Any], folder: str | Path) -> dict[str, Any]:
validation = validate_matrix_sidecar_update_draft(draft)
if not validation["valid"]:
return {
"schema": MATRIX_SIDECAR_APPLY_REPORT_SCHEMA,
"applied": False,
"root": str(Path(folder).resolve()),
"updated_file_count": 0,
"updated_files": [],
"validation": validation,
}
root = Path(folder).resolve()
if not root.is_dir():
raise FileNotFoundError(f"sidecar folder does not exist: {root}")
updated_files: list[dict[str, Any]] = []
for update in draft.get("updates", []):
sidecar_filename = _text(update.get("sidecar_filename"))
if not sidecar_filename or Path(sidecar_filename).name != sidecar_filename:
raise ValueError(f"sidecar filename must be a plain filename: {sidecar_filename!r}")
sidecar_path = root / sidecar_filename
sidecar = _read_json_object_if_present(sidecar_path)
incoming_variants = [dict(variant) for variant in update.get("prompt_variants", []) if isinstance(variant, dict)]
sidecar["prompt_variants"] = _upsert_prompt_variants(sidecar.get("prompt_variants"), incoming_variants)
sidecar_path.write_text(json.dumps(sidecar, ensure_ascii=True, indent=2, sort_keys=True) + "\n", encoding="utf-8")
updated_files.append(
{
"sidecar_filename": sidecar_filename,
"sidecar_path": str(sidecar_path),
"prompt_variant_count": len(incoming_variants),
}
)
return {
"schema": MATRIX_SIDECAR_APPLY_REPORT_SCHEMA,
"applied": True,
"root": str(root),
"updated_file_count": len(updated_files),
"updated_files": updated_files,
"validation": validation,
}
def build_result_sheet(batch: dict[str, Any], results: dict[str, Any], *, notes: str = "") -> dict[str, Any]:
seed = _int_seed(batch.get("seed"), field="batch seed")
result_seed = _int_seed(results.get("seed"), field="result seed")
if result_seed != seed:
raise ValueError(f"result seed {result_seed} does not match batch seed {seed}")
channel_in = _text(batch.get("channel_in") or DEFAULT_IN_CHANNEL)
result_channel_in = _text(results.get("channel_in") or DEFAULT_IN_CHANNEL)
_validate_no_negative_channel(channel_in, field="batch channel_in")
_validate_no_negative_channel(result_channel_in, field="result channel_in")
if result_channel_in != channel_in:
raise ValueError(f"result channel_in {result_channel_in!r} does not match batch channel_in {channel_in!r}")
batch_probes = _probe_list(batch.get("probes"), field="batch probes")
result_probes = _probe_list(results.get("probes"), field="result probes")
if len(result_probes) != len(batch_probes):
raise ValueError("result probe count must match batch probe count")
sheet_probes: list[dict[str, Any]] = []
for index, (batch_probe, result_probe) in enumerate(zip(batch_probes, result_probes)):
probe_id = _text(batch_probe.get("id"))
if not probe_id:
raise ValueError(f"batch probes[{index}].id is required")
result_probe_id = _text(result_probe.get("id"))
if result_probe_id != probe_id:
raise ValueError(f"result probes[{index}].id {result_probe_id!r} does not match batch probe id {probe_id!r}")
prompt_order = _text(batch_probe.get("prompt_order") or "subject_first")
result_prompt_order = _text(result_probe.get("prompt_order") or "subject_first")
if prompt_order not in PROMPT_ORDERS:
raise ValueError(f"batch probes[{index}].prompt_order must be one of {sorted(PROMPT_ORDERS)}")
if result_prompt_order != prompt_order:
raise ValueError(f"result probes[{index}].prompt_order does not match batch prompt_order {prompt_order!r}")
text = _text(batch_probe.get("text"))
if not text:
raise ValueError(f"batch probes[{index}].text is required")
_validate_no_negative_channel(text, field=f"batch probes[{index}].text")
turn = result_probe.get("turn")
if not isinstance(turn, int) or isinstance(turn, bool):
raise ValueError(f"result probes[{index}].turn must be an integer")
returned_seed = _int_seed(result_probe.get("returned_seed"), field=f"result probes[{index}].returned_seed")
if returned_seed != seed:
raise ValueError(f"result probes[{index}].returned_seed {returned_seed} does not match batch seed {seed}")
sheet_probe = {
"id": probe_id,
"variant_key": _text(batch_probe.get("variant_key") or batch.get("variant_key")),
"source_entry_id": _text(batch_probe.get("source_entry_id") or batch.get("source_entry_id")),
"source_stem": _text(batch_probe.get("source_stem") or batch.get("source_stem") or batch_probe.get("source_entry_id")),
"prompt_order": prompt_order,
"text": text,
"turn": turn,
"image_path": _image_path(result_probe.get("image_path"), field=f"result probes[{index}].image_path"),
"returned_seed": returned_seed,
"cue_axes": _merge_known_values(_cue_axes(), batch_probe.get("cue_axes")),
"seed_metadata": _merge_known_values(_seed_metadata(), batch_probe.get("seed_metadata")),
"prompt_source": _prompt_source(batch_probe.get("prompt_source"), field=f"batch probes[{index}].prompt_source"),
"selection": dict(batch_probe.get("selection")) if isinstance(batch_probe.get("selection"), dict) else {},
"score": _score_template(),
"analysis_notes": "",
}
reference_images = _reference_images(batch_probe.get("reference_images"), field=f"batch probes[{index}].reference_images")
if reference_images:
sheet_probe["reference_images"] = reference_images
matrix_evidence = _stable_matrix_evidence_for_variant(batch_probe, field=f"batch probes[{index}]")
if matrix_evidence:
sheet_probe["matrix_evidence"] = matrix_evidence
sheet_probes.append(sheet_probe)
return {
"schema": RESULT_SHEET_SCHEMA,
"seed": seed,
"channel_in": channel_in,
"subject_id": _text(batch.get("subject_id")),
"variant_key": _text(batch.get("variant_key")),
"source_entry_id": _text(batch.get("source_entry_id")),
"source_stem": _text(batch.get("source_stem") or batch.get("source_entry_id")),
"source_prompt_sha256": _text(batch.get("source_prompt_sha256")),
"selection": dict(batch.get("selection")) if isinstance(batch.get("selection"), dict) else {},
"baseline_probe_id": sheet_probes[0]["id"],
"probe_count": len(sheet_probes),
"score_keys": list(SCORE_KEYS),
"notes": _text(notes),
"probes": sheet_probes,
}
def _matrix_result_jobs(results: dict[str, Any]) -> dict[str, dict[str, Any]]:
jobs_raw = results.get("jobs")
if not isinstance(jobs_raw, list):
raise ValueError("seed matrix results jobs must be a list")
jobs: dict[str, dict[str, Any]] = {}
for index, job in enumerate(jobs_raw):
if not isinstance(job, dict):
raise ValueError(f"seed matrix results jobs[{index}] must be an object")
job_id = _text(job.get("id"))
if not job_id:
raise ValueError(f"seed matrix results jobs[{index}].id is required")
if job_id in jobs:
raise ValueError(f"seed matrix results job id {job_id!r} is duplicated")
job_results = job.get("results")
if not isinstance(job_results, dict):
raise ValueError(f"seed matrix results jobs[{index}].results must be an object")
jobs[job_id] = job_results
return jobs
def build_seed_matrix_result_sheet(seed_matrix: dict[str, Any], results: dict[str, Any], *, notes: str = "") -> dict[str, Any]:
schema = _text(seed_matrix.get("schema"))
if schema and schema != SEED_MATRIX_SCHEMA:
raise ValueError(f"seed matrix schema must be {SEED_MATRIX_SCHEMA}")
matrix_jobs_raw = seed_matrix.get("jobs")
if not isinstance(matrix_jobs_raw, list) or not matrix_jobs_raw:
raise ValueError("seed matrix jobs must be a non-empty list")
result_jobs_by_id = _matrix_result_jobs(results)
sheet_jobs: list[dict[str, Any]] = []
seen_matrix_ids: set[str] = set()
for index, job in enumerate(matrix_jobs_raw):
if not isinstance(job, dict):
raise ValueError(f"seed matrix jobs[{index}] must be an object")
job_id = _text(job.get("id"))
if not job_id:
raise ValueError(f"seed matrix jobs[{index}].id is required")
if job_id in seen_matrix_ids:
raise ValueError(f"seed matrix jobs[{index}].id {job_id!r} is duplicated")
seen_matrix_ids.add(job_id)
batch = job.get("batch")
if not isinstance(batch, dict):
raise ValueError(f"seed matrix jobs[{index}].batch must be an object")
job_results = result_jobs_by_id.get(job_id)
if not isinstance(job_results, dict):
raise ValueError(f"seed matrix results missing job {job_id!r}")
result_sheet = build_result_sheet(batch, job_results, notes=notes)
sheet_jobs.append(
{
"id": job_id,
"variant_key": _text(job.get("variant_key") or seed_matrix.get("variant_key")),
"sampler_seed": _int_seed(job.get("sampler_seed"), field=f"seed matrix jobs[{index}].sampler_seed"),
"selection_seed": _int_seed(job.get("selection_seed"), field=f"seed matrix jobs[{index}].selection_seed"),
"seed_slot": _text(job.get("seed_slot") or seed_matrix.get("seed_slot")),
"selected": dict(job.get("selected")) if isinstance(job.get("selected"), dict) else {},
"candidate_probe": dict(job.get("candidate_probe")) if isinstance(job.get("candidate_probe"), dict) else {},
"result_sheet": result_sheet,
}
)
extra_ids = sorted(set(result_jobs_by_id) - seen_matrix_ids)
if extra_ids:
raise ValueError(f"seed matrix results contain unknown job ids: {', '.join(extra_ids)}")
return {
"schema": SEED_MATRIX_RESULT_SHEET_SCHEMA,
"subject_id": _text(seed_matrix.get("subject_id")),
"variant_key": _text(seed_matrix.get("variant_key")),
"seed_slot": _text(seed_matrix.get("seed_slot")),
"sampler_seeds": list(seed_matrix.get("sampler_seeds") or []),
"selection_seeds": list(seed_matrix.get("selection_seeds") or []),
"job_count": len(sheet_jobs),
"score_keys": list(SCORE_KEYS),
"notes": _text(notes),
"jobs": sheet_jobs,
}
def build_seed_matrix_promotion_report(matrix_result_sheet: dict[str, Any]) -> dict[str, Any]:
schema = _text(matrix_result_sheet.get("schema"))
if schema and schema != SEED_MATRIX_RESULT_SHEET_SCHEMA:
raise ValueError(f"seed matrix result sheet schema must be {SEED_MATRIX_RESULT_SHEET_SCHEMA}")
jobs_raw = matrix_result_sheet.get("jobs")
if not isinstance(jobs_raw, list) or not jobs_raw:
raise ValueError("seed matrix result sheet jobs must be a non-empty list")
expected_seed_slot = _text(matrix_result_sheet.get("seed_slot"))
if expected_seed_slot and expected_seed_slot not in SEED_SELECTION_SLOT_KEYS:
raise ValueError(f"seed matrix result sheet seed_slot must be one of {list(SEED_SELECTION_SLOT_KEYS)}")
expected_sampler_seeds_raw = matrix_result_sheet.get("sampler_seeds")
expected_sampler_seeds: list[int] = []
if isinstance(expected_sampler_seeds_raw, list):
expected_sampler_seeds = [
_int_seed(seed, field=f"seed matrix result sheet sampler_seeds[{index}]")
for index, seed in enumerate(expected_sampler_seeds_raw)
]
if len(set(expected_sampler_seeds)) != len(expected_sampler_seeds):
raise ValueError("seed matrix result sheet sampler_seeds must not contain duplicate sampler seeds")
expected_selection_seeds_raw = matrix_result_sheet.get("selection_seeds")
expected_selection_seeds: list[int] = []
if isinstance(expected_selection_seeds_raw, list):
expected_selection_seeds = [
_int_seed(seed, field=f"seed matrix result sheet selection_seeds[{index}]")
for index, seed in enumerate(expected_selection_seeds_raw)
]
if len(set(expected_selection_seeds)) != len(expected_selection_seeds):
raise ValueError("seed matrix result sheet selection_seeds must not contain duplicate cue seeds")
report_jobs: list[dict[str, Any]] = []
groups_by_key: dict[tuple[str, int], dict[str, Any]] = {}
seen_job_ids: set[str] = set()
for index, job in enumerate(jobs_raw):
if not isinstance(job, dict):
raise ValueError(f"seed matrix result sheet jobs[{index}] must be an object")
job_id = _text(job.get("id"))
if not job_id:
raise ValueError(f"seed matrix result sheet jobs[{index}].id is required")
if job_id in seen_job_ids:
raise ValueError(f"seed matrix result sheet jobs[{index}].id {job_id!r} is duplicated")
seen_job_ids.add(job_id)
result_sheet = job.get("result_sheet")
if not isinstance(result_sheet, dict):
raise ValueError(f"seed matrix result sheet jobs[{index}].result_sheet must be an object")
promotion_report = build_promotion_report(result_sheet)
candidates = promotion_report.get("candidates") or []
if len(candidates) != 1 or not isinstance(candidates[0], dict):
raise ValueError(f"seed matrix result sheet jobs[{index}] must contain exactly one candidate")
candidate = candidates[0]
sampler_seed = _int_seed(job.get("sampler_seed"), field=f"seed matrix result sheet jobs[{index}].sampler_seed")
if expected_sampler_seeds and sampler_seed not in expected_sampler_seeds:
raise ValueError(
f"seed matrix result sheet jobs[{index}].sampler_seed {sampler_seed} must be listed in sampler_seeds"
)
selection_seed = _int_seed(job.get("selection_seed"), field=f"seed matrix result sheet jobs[{index}].selection_seed")
if expected_selection_seeds and selection_seed not in expected_selection_seeds:
raise ValueError(
f"seed matrix result sheet jobs[{index}].selection_seed {selection_seed} must be listed in selection_seeds"
)
seed_slot = _text(job.get("seed_slot") or expected_seed_slot)
if seed_slot not in SEED_SELECTION_SLOT_KEYS:
raise ValueError(f"seed matrix result sheet jobs[{index}].seed_slot must be one of {list(SEED_SELECTION_SLOT_KEYS)}")
if expected_seed_slot and seed_slot != expected_seed_slot:
raise ValueError(
f"seed matrix result sheet jobs[{index}].seed_slot {seed_slot!r} does not match matrix seed_slot {expected_seed_slot!r}"
)
selected = job.get("selected") if isinstance(job.get("selected"), dict) else {}
selected_prompt_variant_id = _text(selected.get("prompt_variant_id"))
candidate_prompt_variant_id = _text(candidate.get("prompt_variant_id"))
if selected_prompt_variant_id and candidate_prompt_variant_id and selected_prompt_variant_id != candidate_prompt_variant_id:
raise ValueError(
f"seed matrix result sheet jobs[{index}].selected.prompt_variant_id {selected_prompt_variant_id!r} "
f"does not match candidate prompt_variant_id {candidate_prompt_variant_id!r}"
)
prompt_variant_id = _text(
candidate_prompt_variant_id
or selected_prompt_variant_id
)
if not prompt_variant_id:
raise ValueError(f"seed matrix result sheet jobs[{index}] selected prompt_variant_id is required")
source_entry_id = _text(candidate.get("source_entry_id"))
source_stem = _text(candidate.get("source_stem") or source_entry_id)
job_variant_key = _text(job.get("variant_key") or matrix_result_sheet.get("variant_key"))
candidate_variant_key = _text(candidate.get("variant_key"))
if job_variant_key and candidate_variant_key and candidate_variant_key != job_variant_key:
raise ValueError(
f"seed matrix result sheet jobs[{index}].candidate.variant_key {candidate_variant_key!r} "
f"does not match job variant_key {job_variant_key!r}"
)
candidate_text = _text(candidate.get("text"))
candidate_text_sha256 = _sha256_text(candidate_text) if candidate_text else ""
decision = _text(candidate.get("decision"))
blockers = [_text(blocker) for blocker in candidate.get("blockers") or [] if _text(blocker)]
report_job = {
"id": job_id,
"variant_key": job_variant_key or candidate_variant_key,
"source_entry_id": source_entry_id,
"source_stem": source_stem,
"sampler_seed": sampler_seed,
"selection_seed": selection_seed,
"seed_slot": seed_slot,
"prompt_variant_id": prompt_variant_id,
"prompt_text_sha256": candidate_text_sha256,
"decision": decision,
"blockers": blockers,
"candidate": candidate,
}
report_jobs.append(report_job)
group_key = (prompt_variant_id, selection_seed)
group = groups_by_key.get(group_key)
if group is None:
group = {
"variant_key": report_job["variant_key"],
"source_entry_id": source_entry_id,
"source_stem": source_stem,
"prompt_variant_id": prompt_variant_id,
"prompt_text_sha256": candidate_text_sha256,
"selection_seed": selection_seed,
"seed_slot": report_job["seed_slot"],
"sampler_seeds": [],
"job_ids": [],
"job_count": 0,
"promotion_ready_count": 0,
"blocked_count": 0,
"blockers": [],
}
groups_by_key[group_key] = group
else:
for field, value in (
("variant_key", report_job["variant_key"]),
("source_stem", source_stem),
("source_entry_id", source_entry_id),
("prompt_text_sha256", candidate_text_sha256),
):
expected_value = _text(group.get(field))
if expected_value and value and value != expected_value:
label = "prompt text" if field == "prompt_text_sha256" else field
raise ValueError(
f"seed matrix result sheet jobs[{index}].candidate.{label} {value!r} "
f"does not match group {label} {expected_value!r}"
)
if sampler_seed in group["sampler_seeds"]:
raise ValueError(
f"seed matrix result sheet jobs[{index}].sampler_seed {sampler_seed} is duplicated in this cue group"
)
group["sampler_seeds"].append(sampler_seed)
group["job_ids"].append(report_job["id"])
group["job_count"] += 1
if decision == "seedable_candidate":
group["promotion_ready_count"] += 1
else:
group["blocked_count"] += 1
for blocker in blockers:
if blocker not in group["blockers"]:
group["blockers"].append(blocker)
groups = []
for key in sorted(groups_by_key, key=lambda item: (item[1], item[0])):
group = groups_by_key[key]
group["sampler_seeds"] = sorted(group["sampler_seeds"])
group["sampler_seed_count"] = len(set(group["sampler_seeds"]))
missing_sampler_seeds = sorted(set(expected_sampler_seeds) - set(group["sampler_seeds"]))
if missing_sampler_seeds:
group["missing_sampler_seeds"] = missing_sampler_seeds
if "missing_sampler_coverage" not in group["blockers"]:
group["blockers"].append("missing_sampler_coverage")
insufficient_sampler_coverage = group["sampler_seed_count"] < MIN_STABLE_MATRIX_SAMPLER_SEEDS
if insufficient_sampler_coverage and "insufficient_sampler_coverage" not in group["blockers"]:
group["blockers"].append("insufficient_sampler_coverage")
group["stable"] = (
group["job_count"] > 0
and group["blocked_count"] == 0
and not missing_sampler_seeds
and not insufficient_sampler_coverage
)
groups.append(group)
return {
"schema": SEED_MATRIX_PROMOTION_REPORT_SCHEMA,
"subject_id": _text(matrix_result_sheet.get("subject_id")),
"variant_key": _text(matrix_result_sheet.get("variant_key")),
"seed_slot": _text(matrix_result_sheet.get("seed_slot")),
"job_count": len(report_jobs),
"promotion_ready_job_count": sum(1 for job in report_jobs if job["decision"] == "seedable_candidate"),
"blocked_job_count": sum(1 for job in report_jobs if job["decision"] != "seedable_candidate"),
"stable_group_count": sum(1 for group in groups if group.get("stable") is True),
"unstable_group_count": sum(1 for group in groups if group.get("stable") is False),
"required_pass_keys": list(PROMOTION_REQUIRED_PASS_KEYS),
"required_progress_keys": list(PROMOTION_REQUIRED_PROGRESS_KEYS),
"minimum_stable_sampler_seed_count": MIN_STABLE_MATRIX_SAMPLER_SEEDS,
"jobs": report_jobs,
"groups": groups,
}
def _load_json_object(path: str | Path, *, field: str) -> dict[str, Any]:
json_path = Path(path)
with json_path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError(f"{field} must contain one JSON object")
return data
def _parse_int_csv(value: str, *, field: str) -> list[int]:
text = _text(value)
if not text:
raise ValueError(f"{field} must contain at least one integer")
items: list[int] = []
for index, part in enumerate(text.split(",")):
item = part.strip()
if not item:
raise ValueError(f"{field}[{index}] is empty")
try:
parsed = int(item)
except ValueError as exc:
raise ValueError(f"{field}[{index}] must be an integer") from exc
items.append(_int_seed(parsed, field=f"{field}[{index}]"))
return items
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Build a manifest for a same-subject Krea2 atlas-refine prompt/image deck.")
parser.add_argument("--folder", help="Folder containing paired .txt/.png atlas refine artifacts.")
parser.add_argument("--subject-id", default="", help="Stable subject id for this reference deck.")
parser.add_argument("--indent", type=int, default=2, help="JSON indentation level.")
parser.add_argument("--print-manifest", action="store_true", help="Print the atlas refine manifest explicitly.")
parser.add_argument("--print-batch", action="store_true", help="Print an sxcp_prompt_batch-compatible probe batch instead of the manifest.")
parser.add_argument("--print-seed-selection", action="store_true", help="Print a deterministic seed-selected prompt variant from a manifest.")
parser.add_argument("--print-seed-selected-batch", action="store_true", help="Print an sxcp prompt batch containing baseline and a deterministic seed-selected candidate.")
parser.add_argument("--print-seed-matrix", action="store_true", help="Print seed-selected batches for every sampler/cue seed pair.")
parser.add_argument("--print-seed-matrix-result-sheet", action="store_true", help="Print visual scoring sheets for completed seed-matrix jobs.")
parser.add_argument("--print-seed-matrix-promotion-report", action="store_true", help="Print stability/promotion gates from a scored seed-matrix result sheet.")
parser.add_argument("--print-matrix-sidecar-update-draft", action="store_true", help="Print sidecar prompt-variant updates from stable seed-matrix groups.")
parser.add_argument("--print-catalog-cue-draft", action="store_true", help="Print review-only catalog prompt_variant_cues candidates from seedable append-cue sidecars.")
parser.add_argument("--print-reference-pool-report", action="store_true", help="Print canonical/supplemental atlas reference-pool coverage for cue expansion.")
parser.add_argument("--print-reference-cue-review-sheet", action="store_true", help="Print blank atlas reference cue-labeling slots for prompt-variant review.")
parser.add_argument("--print-reference-cue-candidate-draft", action="store_true", help="Print sidecar-ready prompt-variant candidates from a filled reference cue-review sheet.")
parser.add_argument("--print-reference-cue-sidecar-author-draft", action="store_true", help="Print same-stem sidecar authoring updates from reviewed reference cue candidates.")
parser.add_argument("--validate-reference-cue-sidecar-author-draft", action="store_true", help="Validate pre-test reference cue sidecar authoring updates without writing sidecars.")
parser.add_argument("--apply-reference-cue-sidecar-author-draft", action="store_true", help="Apply pre-test reference cue sidecar authoring updates to a folder.")
parser.add_argument("--print-coverage-report", action="store_true", help="Print atlas refine readiness coverage by variant.")
parser.add_argument("--print-sidecar-scaffold", action="store_true", help="Print review-only same-stem sidecar JSON scaffolds for known baseline-only entries.")
parser.add_argument("--print-baseline-score-sheet", action="store_true", help="Print baseline image/prompt scoring slots for manifest entries.")
parser.add_argument("--print-prompt-noise-report", action="store_true", help="Print read-only option/meta/negative prompt-noise findings for atlas prompts.")
parser.add_argument("--print-prompt-cleanup-sheet", action="store_true", help="Print manual cleanup slots for prompt-noise findings.")
parser.add_argument("--validate-prompt-cleanup-sheet", action="store_true", help="Validate manually filled prompt cleanup replacements without writing files.")
parser.add_argument("--apply-prompt-cleanup-sheet", action="store_true", help="Apply validated prompt cleanup replacements to prompt files or sidecars.")
parser.add_argument("--print-baseline-score-update-draft", action="store_true", help="Print sidecar baseline score updates from a manually scored baseline sheet.")
parser.add_argument("--validate-baseline-score-update-draft", action="store_true", help="Validate baseline score sidecar updates without writing files.")
parser.add_argument("--apply-baseline-score-update-draft", action="store_true", help="Apply baseline score sidecar updates to a folder.")
parser.add_argument("--variant-key", default="", help="Variant key to export when --print-batch is set.")
parser.add_argument("--reference-pool-folder", action="append", default=[], help="Supplemental atlas-root-relative folder for --print-reference-pool-report. Can be repeated.")
parser.add_argument("--sampler-seed", type=int, default=None, help="Override sampler seed for --print-batch.")
parser.add_argument("--selection-seed", type=int, default=None, help="Cue seed for --print-seed-selection.")
parser.add_argument("--sampler-seeds", default="", help="Comma-separated sampler seeds for --print-seed-matrix.")
parser.add_argument("--selection-seeds", default="", help="Comma-separated cue seeds for --print-seed-matrix.")
parser.add_argument("--seed-slot", default="atlas_cue_seed", help="Seed slot label for --print-seed-selection.")
parser.add_argument("--print-result-sheet", action="store_true", help="Print a visual scoring sheet from a batch JSON and result JSON.")
parser.add_argument("--print-promotion-report", action="store_true", help="Print conservative seedable-candidate gates from a scored result sheet.")
parser.add_argument("--print-sidecar-update-draft", action="store_true", help="Print reviewable sidecar prompt_variants from a promotion report.")
parser.add_argument("--validate-sidecar-update-draft", action="store_true", help="Validate a sidecar update draft without writing sidecar files.")
parser.add_argument("--apply-sidecar-update-draft", action="store_true", help="Apply a validated sidecar update draft to a folder.")
parser.add_argument("--validate-matrix-sidecar-update-draft", action="store_true", help="Validate a matrix sidecar update draft without writing sidecar files.")
parser.add_argument("--apply-matrix-sidecar-update-draft", action="store_true", help="Apply a validated matrix sidecar update draft to a folder.")
parser.add_argument("--batch-json", default="", help="Prompt batch JSON path for --print-result-sheet.")
parser.add_argument("--result-json", default="", help="Result JSON path for --print-result-sheet.")
parser.add_argument("--seed-matrix-json", default="", help="Seed matrix JSON path for --print-seed-matrix-result-sheet.")
parser.add_argument("--seed-matrix-results-json", default="", help="Seed matrix results JSON path for --print-seed-matrix-result-sheet.")
parser.add_argument("--seed-matrix-result-sheet-json", default="", help="Scored seed matrix result sheet JSON path for --print-seed-matrix-promotion-report.")
parser.add_argument("--seed-matrix-promotion-report-json", default="", help="Seed matrix promotion report JSON path for --print-matrix-sidecar-update-draft.")
parser.add_argument("--result-sheet-json", default="", help="Scored result sheet JSON path for --print-promotion-report.")
parser.add_argument("--promotion-report-json", default="", help="Promotion report JSON path for --print-sidecar-update-draft.")
parser.add_argument("--sidecar-update-draft-json", default="", help="Sidecar update draft JSON path for --validate-sidecar-update-draft.")
parser.add_argument("--matrix-sidecar-update-draft-json", default="", help="Matrix sidecar update draft JSON path for validation or apply.")
parser.add_argument("--baseline-score-sheet-json", default="", help="Baseline score sheet JSON path for --print-baseline-score-update-draft.")
parser.add_argument("--baseline-score-update-draft-json", default="", help="Baseline score update draft JSON path for validation or apply.")
parser.add_argument("--prompt-cleanup-sheet-json", default="", help="Prompt cleanup sheet JSON path for validation or apply.")
parser.add_argument("--reference-cue-review-sheet-json", default="", help="Filled reference cue-review sheet JSON path for --print-reference-cue-candidate-draft.")
parser.add_argument("--reference-cue-candidate-draft-json", default="", help="Reference cue candidate draft JSON path for --print-reference-cue-sidecar-author-draft.")
parser.add_argument("--reference-cue-sidecar-author-draft-json", default="", help="Reference cue sidecar author draft JSON path for validation or apply.")
parser.add_argument("--notes", default="", help="Notes to include in --print-result-sheet output.")
args = parser.parse_args(argv)
if args.apply_reference_cue_sidecar_author_draft:
if not args.reference_cue_sidecar_author_draft_json or not args.folder:
parser.error("--reference-cue-sidecar-author-draft-json and --folder are required with --apply-reference-cue-sidecar-author-draft")
reference_cue_sidecar_author_draft = _load_json_object(
args.reference_cue_sidecar_author_draft_json,
field="reference-cue-sidecar-author-draft-json",
)
payload = apply_reference_cue_sidecar_author_draft(reference_cue_sidecar_author_draft, args.folder)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["applied"] else 1
if args.validate_reference_cue_sidecar_author_draft:
if not args.reference_cue_sidecar_author_draft_json:
parser.error("--reference-cue-sidecar-author-draft-json is required with --validate-reference-cue-sidecar-author-draft")
reference_cue_sidecar_author_draft = _load_json_object(
args.reference_cue_sidecar_author_draft_json,
field="reference-cue-sidecar-author-draft-json",
)
payload = validate_reference_cue_sidecar_author_draft(reference_cue_sidecar_author_draft)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["valid"] else 1
if args.apply_prompt_cleanup_sheet:
if not args.prompt_cleanup_sheet_json or not args.folder:
parser.error("--prompt-cleanup-sheet-json and --folder are required with --apply-prompt-cleanup-sheet")
prompt_cleanup_sheet = _load_json_object(args.prompt_cleanup_sheet_json, field="prompt-cleanup-sheet-json")
payload = apply_prompt_cleanup_sheet(prompt_cleanup_sheet, args.folder)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["applied"] else 1
if args.validate_prompt_cleanup_sheet:
if not args.prompt_cleanup_sheet_json:
parser.error("--prompt-cleanup-sheet-json is required with --validate-prompt-cleanup-sheet")
prompt_cleanup_sheet = _load_json_object(args.prompt_cleanup_sheet_json, field="prompt-cleanup-sheet-json")
payload = validate_prompt_cleanup_sheet(prompt_cleanup_sheet)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["valid"] else 1
if args.apply_baseline_score_update_draft:
if not args.baseline_score_update_draft_json or not args.folder:
parser.error("--baseline-score-update-draft-json and --folder are required with --apply-baseline-score-update-draft")
baseline_score_update_draft = _load_json_object(args.baseline_score_update_draft_json, field="baseline-score-update-draft-json")
payload = apply_baseline_score_update_draft(baseline_score_update_draft, args.folder)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["applied"] else 1
if args.validate_baseline_score_update_draft:
if not args.baseline_score_update_draft_json:
parser.error("--baseline-score-update-draft-json is required with --validate-baseline-score-update-draft")
baseline_score_update_draft = _load_json_object(args.baseline_score_update_draft_json, field="baseline-score-update-draft-json")
payload = validate_baseline_score_update_draft(baseline_score_update_draft)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["valid"] else 1
if args.print_baseline_score_update_draft:
if not args.baseline_score_sheet_json:
parser.error("--baseline-score-sheet-json is required with --print-baseline-score-update-draft")
baseline_score_sheet = _load_json_object(args.baseline_score_sheet_json, field="baseline-score-sheet-json")
payload = build_baseline_score_update_draft(baseline_score_sheet)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.apply_matrix_sidecar_update_draft:
if not args.matrix_sidecar_update_draft_json or not args.folder:
parser.error("--matrix-sidecar-update-draft-json and --folder are required with --apply-matrix-sidecar-update-draft")
matrix_sidecar_update_draft = _load_json_object(
args.matrix_sidecar_update_draft_json,
field="matrix-sidecar-update-draft-json",
)
payload = apply_matrix_sidecar_update_draft(matrix_sidecar_update_draft, args.folder)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["applied"] else 1
if args.validate_matrix_sidecar_update_draft:
if not args.matrix_sidecar_update_draft_json:
parser.error("--matrix-sidecar-update-draft-json is required with --validate-matrix-sidecar-update-draft")
matrix_sidecar_update_draft = _load_json_object(
args.matrix_sidecar_update_draft_json,
field="matrix-sidecar-update-draft-json",
)
payload = validate_matrix_sidecar_update_draft(matrix_sidecar_update_draft)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["valid"] else 1
if args.apply_sidecar_update_draft:
if not args.sidecar_update_draft_json or not args.folder:
parser.error("--sidecar-update-draft-json and --folder are required with --apply-sidecar-update-draft")
sidecar_update_draft = _load_json_object(args.sidecar_update_draft_json, field="sidecar-update-draft-json")
payload = apply_sidecar_update_draft(sidecar_update_draft, args.folder)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["applied"] else 1
if args.validate_sidecar_update_draft:
if not args.sidecar_update_draft_json:
parser.error("--sidecar-update-draft-json is required with --validate-sidecar-update-draft")
sidecar_update_draft = _load_json_object(args.sidecar_update_draft_json, field="sidecar-update-draft-json")
payload = validate_sidecar_update_draft(sidecar_update_draft)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0 if payload["valid"] else 1
if args.print_sidecar_update_draft:
if not args.promotion_report_json:
parser.error("--promotion-report-json is required with --print-sidecar-update-draft")
promotion_report = _load_json_object(args.promotion_report_json, field="promotion-report-json")
payload = build_sidecar_update_draft(promotion_report)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_promotion_report:
if not args.result_sheet_json:
parser.error("--result-sheet-json is required with --print-promotion-report")
result_sheet = _load_json_object(args.result_sheet_json, field="result-sheet-json")
payload = build_promotion_report(result_sheet)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_result_sheet:
if not args.batch_json or not args.result_json:
parser.error("--batch-json and --result-json are required with --print-result-sheet")
batch = _load_json_object(args.batch_json, field="batch-json")
results = _load_json_object(args.result_json, field="result-json")
payload = build_result_sheet(batch, results, notes=args.notes)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_seed_matrix_result_sheet:
if not args.seed_matrix_json or not args.seed_matrix_results_json:
parser.error("--seed-matrix-json and --seed-matrix-results-json are required with --print-seed-matrix-result-sheet")
seed_matrix = _load_json_object(args.seed_matrix_json, field="seed-matrix-json")
seed_matrix_results = _load_json_object(args.seed_matrix_results_json, field="seed-matrix-results-json")
payload = build_seed_matrix_result_sheet(seed_matrix, seed_matrix_results, notes=args.notes)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_seed_matrix_promotion_report:
if not args.seed_matrix_result_sheet_json:
parser.error("--seed-matrix-result-sheet-json is required with --print-seed-matrix-promotion-report")
seed_matrix_result_sheet = _load_json_object(args.seed_matrix_result_sheet_json, field="seed-matrix-result-sheet-json")
payload = build_seed_matrix_promotion_report(seed_matrix_result_sheet)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_matrix_sidecar_update_draft:
if not args.seed_matrix_promotion_report_json:
parser.error("--seed-matrix-promotion-report-json is required with --print-matrix-sidecar-update-draft")
seed_matrix_promotion_report = _load_json_object(args.seed_matrix_promotion_report_json, field="seed-matrix-promotion-report-json")
payload = build_matrix_sidecar_update_draft(seed_matrix_promotion_report)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_reference_pool_report:
if not args.variant_key:
parser.error("--variant-key is required with --print-reference-pool-report")
payload = build_reference_pool_report(
args.variant_key,
supplemental_folders=list(args.reference_pool_folder or []),
)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_reference_cue_review_sheet:
if not args.variant_key:
parser.error("--variant-key is required with --print-reference-cue-review-sheet")
payload = build_reference_cue_review_sheet(
args.variant_key,
supplemental_folders=list(args.reference_pool_folder or []),
)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if args.print_reference_cue_candidate_draft:
if not args.reference_cue_review_sheet_json:
parser.error("--reference-cue-review-sheet-json is required with --print-reference-cue-candidate-draft")
reference_cue_review_sheet = _load_json_object(
args.reference_cue_review_sheet_json,
field="reference-cue-review-sheet-json",
)
payload = build_reference_cue_candidate_draft(reference_cue_review_sheet)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if not args.folder:
parser.error("--folder is required unless a JSON-only output mode is set")
manifest = build_manifest(args.folder, subject_id=args.subject_id)
payload = manifest
if args.print_seed_selection:
if not args.variant_key:
parser.error("--variant-key is required with --print-seed-selection")
if args.selection_seed is None:
parser.error("--selection-seed is required with --print-seed-selection")
payload = select_seeded_prompt_variant(
manifest,
args.variant_key,
selection_seed=args.selection_seed,
seed_slot=args.seed_slot,
)
elif args.print_seed_selected_batch:
if not args.variant_key:
parser.error("--variant-key is required with --print-seed-selected-batch")
if args.selection_seed is None or args.sampler_seed is None:
parser.error("--selection-seed and --sampler-seed are required with --print-seed-selected-batch")
payload = build_seed_selected_prompt_batch(
manifest,
args.variant_key,
selection_seed=args.selection_seed,
sampler_seed=args.sampler_seed,
seed_slot=args.seed_slot,
)
elif args.print_seed_matrix:
if not args.variant_key:
parser.error("--variant-key is required with --print-seed-matrix")
if not args.selection_seeds or not args.sampler_seeds:
parser.error("--selection-seeds and --sampler-seeds are required with --print-seed-matrix")
payload = build_seed_matrix(
manifest,
args.variant_key,
selection_seeds=_parse_int_csv(args.selection_seeds, field="selection-seeds"),
sampler_seeds=_parse_int_csv(args.sampler_seeds, field="sampler-seeds"),
seed_slot=args.seed_slot,
)
elif args.print_reference_cue_sidecar_author_draft:
if not args.reference_cue_candidate_draft_json:
parser.error("--reference-cue-candidate-draft-json is required with --print-reference-cue-sidecar-author-draft")
reference_cue_candidate_draft = _load_json_object(
args.reference_cue_candidate_draft_json,
field="reference-cue-candidate-draft-json",
)
payload = build_reference_cue_sidecar_author_draft(
manifest,
reference_cue_candidate_draft,
variant_key=args.variant_key,
)
elif args.print_catalog_cue_draft:
payload = build_catalog_cue_draft(manifest, variant_key=args.variant_key)
elif args.print_coverage_report:
payload = build_coverage_report(manifest)
elif args.print_sidecar_scaffold:
payload = build_sidecar_scaffold(manifest, variant_key=args.variant_key)
elif args.print_baseline_score_sheet:
payload = build_baseline_score_sheet(manifest, variant_key=args.variant_key)
elif args.print_prompt_noise_report:
payload = build_prompt_noise_report(manifest, variant_key=args.variant_key)
elif args.print_prompt_cleanup_sheet:
payload = build_prompt_cleanup_sheet(manifest, variant_key=args.variant_key)
elif args.print_batch:
if not args.variant_key:
parser.error("--variant-key is required with --print-batch")
payload = build_prompt_batch(manifest, args.variant_key, sampler_seed=args.sampler_seed)
print(json.dumps(payload, ensure_ascii=True, indent=args.indent, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())