#!/usr/bin/env python3 """Validate no-generation normal-camera atlas maintenance artifacts.""" from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) import normal_camera_atlas_prep as prep # noqa: E402 EXPECTED_VARIANT_COUNT = 131 EXPECTED_PRE_AB_COUNT = 55 EXPECTED_NEEDS_SAMPLES_COUNT = 76 EXPECTED_COVERED_NON_EMPTY_FOLDER_COUNT = 101 EXPECTED_UNCOVERED_NON_EMPTY_FOLDER_COUNT = 0 JSON_ARTIFACT_SCHEMAS = { "normal_camera_priority_plan.json": prep.PRIORITY_PLAN_SCHEMA, "normal_camera_prompt_cue_batch.json": prep.PROMPT_CUE_BATCH_SCHEMA, "normal_camera_score_sheet.json": prep.SCORE_SHEET_SCHEMA, "normal_camera_unused_pool_backlog.json": prep.UNUSED_POOL_BACKLOG_SCHEMA, "normal_camera_needs_samples_acquisition.json": prep.NEEDS_SAMPLES_ACQUISITION_SCHEMA, } TEXT_ARTIFACTS = ( "normal_camera_acceptance_gates.md", "normal_camera_needs_samples_acquisition.md", "review/index.md", "review/index.html", ) def _add_error(errors: list[str], message: str) -> None: errors.append(message) def _json_file(path: Path, errors: list[str]) -> dict[str, Any]: if not path.is_file(): _add_error(errors, f"missing artifact: {path}") return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: _add_error(errors, f"invalid JSON artifact {path}: {exc}") return {} if not isinstance(payload, dict): _add_error(errors, f"JSON artifact is not an object: {path}") return {} return payload def _expect_object(value: Any, name: str, errors: list[str]) -> dict[str, Any]: if not isinstance(value, dict): _add_error(errors, f"{name} must be an object") return {} return value def _expect_list(value: Any, name: str, errors: list[str]) -> list[Any]: if not isinstance(value, list): _add_error(errors, f"{name} must be a list") return [] return value def _expect_schema(payload: dict[str, Any], schema: str, name: str, errors: list[str]) -> None: if payload.get("schema") != schema: _add_error(errors, f"{name} schema changed: {payload.get('schema')!r}") if payload.get("no_generation") is not True: _add_error(errors, f"{name} must stay no_generation=true") def _validate_variant_catalog(errors: list[str]) -> tuple[dict[str, dict[str, Any]], set[str]]: atlas = prep.load_atlas() variants_catalog = prep.load_variants() folders = _expect_list(atlas.get("folders"), "normal_camera_atlas.folders", errors) variants = _expect_list(variants_catalog.get("variants"), "normal_camera_variants.variants", errors) inventory = _expect_object(variants_catalog.get("inventory"), "normal_camera_variants.inventory", errors) status_counts = _expect_object(inventory.get("status_counts"), "normal_camera_variants.inventory.status_counts", errors) if len(variants) != EXPECTED_VARIANT_COUNT: _add_error(errors, f"expected {EXPECTED_VARIANT_COUNT} variants, got {len(variants)}") if int(status_counts.get("pre_ab_candidate") or 0) != EXPECTED_PRE_AB_COUNT: _add_error(errors, "pre_ab_candidate count changed") if int(status_counts.get("needs_samples") or 0) != EXPECTED_NEEDS_SAMPLES_COUNT: _add_error(errors, "needs_samples count changed") if int(inventory.get("covered_non_empty_folder_count") or 0) != EXPECTED_COVERED_NON_EMPTY_FOLDER_COUNT: _add_error(errors, "covered non-empty folder count changed") if int(inventory.get("uncovered_non_empty_folder_count") or 0) != EXPECTED_UNCOVERED_NON_EMPTY_FOLDER_COUNT: _add_error(errors, "normal-camera atlas has uncovered non-empty folders") folder_names = {str(folder.get("folder")) for folder in folders if str(folder.get("folder") or "").strip()} variants_by_key: dict[str, dict[str, Any]] = {} seen_variant_keys: set[str] = set() status_values = set((variants_catalog.get("status_values") or {}).keys()) for index, variant in enumerate(variants, start=1): variant_obj = _expect_object(variant, f"variant[{index}]", errors) key = str(variant_obj.get("key") or "") if not key: _add_error(errors, f"variant[{index}] has no key") continue if key in seen_variant_keys: _add_error(errors, f"duplicate variant key: {key}") seen_variant_keys.add(key) variants_by_key[key] = variant_obj if variant_obj.get("status") not in status_values: _add_error(errors, f"{key} has unknown status: {variant_obj.get('status')!r}") for field in ("family", "action_family", "camera_view", "canonical_geometry"): if not str(variant_obj.get(field) or "").strip(): _add_error(errors, f"{key}.{field} is empty") for field in ("atlas_folders", "reference_images", "prompt_cues", "avoid_cues"): values = _expect_list(variant_obj.get(field), f"{key}.{field}", errors) if not values: _add_error(errors, f"{key}.{field} is empty") for folder_name in variant_obj.get("atlas_folders") or []: if str(folder_name) not in folder_names: _add_error(errors, f"{key} references unknown atlas folder: {folder_name!r}") for ref in variant_obj.get("reference_images") or []: ref_text = str(ref or "") if ".." in Path(ref_text).parts: _add_error(errors, f"{key} reference escapes atlas root: {ref_text!r}") hook = _expect_object(variant_obj.get("generator_hook"), f"{key}.generator_hook", errors) if "Future hook only" not in str(hook.get("notes") or ""): _add_error(errors, f"{key} generator hook should remain future-only") return variants_by_key, folder_names def _validate_built_artifacts(errors: list[str]) -> None: priority_plan = prep.build_priority_plan() prompt_batch = prep.build_prompt_cue_batch() score_sheet = prep.build_score_sheet() unused_pool_backlog = prep.build_unused_pool_backlog() acquisition = prep.build_needs_samples_acquisition() _expect_schema(priority_plan, prep.PRIORITY_PLAN_SCHEMA, "built priority plan", errors) _expect_schema(prompt_batch, prep.PROMPT_CUE_BATCH_SCHEMA, "built prompt cue batch", errors) _expect_schema(score_sheet, prep.SCORE_SHEET_SCHEMA, "built score sheet", errors) _expect_schema(unused_pool_backlog, prep.UNUSED_POOL_BACKLOG_SCHEMA, "built unused-pool backlog", errors) _expect_schema(acquisition, prep.NEEDS_SAMPLES_ACQUISITION_SCHEMA, "built needs-samples acquisition", errors) if int(priority_plan.get("selected_count") or 0) != EXPECTED_PRE_AB_COUNT: _add_error(errors, "priority plan should include all pre-A/B candidates") if len(prompt_batch.get("items") or []) != EXPECTED_PRE_AB_COUNT: _add_error(errors, "prompt cue batch should include all pre-A/B candidates") if len(score_sheet.get("rows") or []) != EXPECTED_PRE_AB_COUNT: _add_error(errors, "score sheet should include all pre-A/B candidates") if int(acquisition.get("available_variant_count") or 0) != EXPECTED_NEEDS_SAMPLES_COUNT: _add_error(errors, "needs-samples acquisition should include all needs_samples variants") acquisition_rows = acquisition.get("variants") or [] missing_counts = [int(row.get("missing_reference_count") or 0) for row in acquisition_rows] if missing_counts != sorted(missing_counts, reverse=True): _add_error(errors, "needs-samples acquisition must be sorted by missing reference count") for row in acquisition_rows: if row.get("target_reference_count") != prep.NEEDS_SAMPLES_TARGET_REFERENCE_COUNT: _add_error(errors, f"{row.get('variant_key')} has wrong acquisition target") if row.get("missing_reference_count") is None: _add_error(errors, f"{row.get('variant_key')} has no missing reference count") def _validate_artifact_files(artifacts_dir: Path, errors: list[str]) -> None: for relative_path, schema in JSON_ARTIFACT_SCHEMAS.items(): payload = _json_file(artifacts_dir / relative_path, errors) if payload: _expect_schema(payload, schema, relative_path, errors) for relative_path in TEXT_ARTIFACTS: path = artifacts_dir / relative_path if not path.is_file(): _add_error(errors, f"missing artifact: {path}") continue text = path.read_text(encoding="utf-8") if relative_path.endswith(".html") and "No-generation" not in text: _add_error(errors, f"{relative_path} lost no-generation language") if relative_path.endswith(".md") and "No-generation" not in text: _add_error(errors, f"{relative_path} lost no-generation language") gates_path = artifacts_dir / "normal_camera_acceptance_gates.md" if gates_path.is_file() and "body_proportion_control" not in gates_path.read_text(encoding="utf-8"): _add_error(errors, "acceptance gates lost body_proportion_control") def _validate_alias_metadata(artifacts_dir: Path, errors: list[str]) -> None: atlas_folders = {str(folder.get("folder")) for folder in prep.load_atlas().get("folders") or []} variant_folders = { str(folder) for variant in prep.load_variants().get("variants") or [] for folder in variant.get("atlas_folders") or [] } known_folder_names = atlas_folders | variant_folders for folder_name, alias in prep.SOURCE_FOLDER_ALIASES.items(): if folder_name not in known_folder_names: _add_error(errors, f"alias folder is not present in atlas/variants: {folder_name!r}") if not str(alias.get("canonical_folder") or "").strip(): _add_error(errors, f"alias has no canonical folder: {folder_name!r}") if not str(alias.get("alias_reason") or "").strip(): _add_error(errors, f"alias has no reason: {folder_name!r}") backlog = _json_file(artifacts_dir / "normal_camera_unused_pool_backlog.json", errors) for row in backlog.get("folders") or []: folder_name = str(row.get("folder") or "") alias = row.get("folder_alias") or {} if not alias: _add_error(errors, f"unused-pool row has no folder_alias: {folder_name!r}") continue if folder_name in prep.SOURCE_FOLDER_ALIASES and alias.get("alias_applied") is not True: _add_error(errors, f"known alias folder is not marked alias_applied in backlog: {folder_name!r}") def _validate_review_artifacts(artifacts_dir: Path, variants_by_key: dict[str, dict[str, Any]], errors: list[str]) -> None: review_dir = artifacts_dir / "review" for folder_name in prep.DEFAULT_REVIEW_FOLDERS: stem = prep._safe_artifact_stem(folder_name) manifest_path = review_dir / f"{stem}_review_manifest.json" contact_sheet_path = review_dir / f"{stem}_contact_sheet.html" manifest = _json_file(manifest_path, errors) if not contact_sheet_path.is_file(): _add_error(errors, f"missing contact sheet: {contact_sheet_path}") continue contact_sheet = contact_sheet_path.read_text(encoding="utf-8") if "No-generation contact sheet" not in contact_sheet: _add_error(errors, f"{folder_name} contact sheet lost no-generation scope") if "data-review-decision=" not in contact_sheet: _add_error(errors, f"{folder_name} contact sheet has no review-decision attributes") if not manifest: continue _expect_schema(manifest, prep.REVIEW_MANIFEST_SCHEMA, f"{folder_name} review manifest", errors) if manifest.get("folder") != folder_name: _add_error(errors, f"{folder_name} manifest folder changed: {manifest.get('folder')!r}") alias = manifest.get("folder_alias") or {} if alias.get("exact_source_folder") != folder_name: _add_error(errors, f"{folder_name} manifest alias does not preserve exact source folder") if folder_name in prep.SOURCE_FOLDER_ALIASES and alias.get("alias_applied") is not True: _add_error(errors, f"{folder_name} manifest should mark alias_applied") review_items = _expect_list(manifest.get("review_items"), f"{folder_name}.review_items", errors) if len(review_items) != int(manifest.get("image_count") or 0): _add_error(errors, f"{folder_name} review_items count does not match image_count") item_by_ref = {str(item.get("reference_image")): item for item in review_items} selected_refs: set[str] = set() for selected in manifest.get("selected_subvariants") or []: variant_key = str(selected.get("variant_key") or "") if variant_key not in variants_by_key: _add_error(errors, f"{folder_name} selected subvariant is missing from catalog: {variant_key}") continue catalog_refs = set(str(ref) for ref in variants_by_key[variant_key].get("reference_images") or []) for ref in selected.get("reference_images") or []: ref_text = str(ref) selected_refs.add(ref_text) if ref_text not in catalog_refs: _add_error(errors, f"{folder_name} selected ref is not in catalog variant {variant_key}: {ref_text}") for item in review_items: ref = str(item.get("reference_image") or "") decision = str(item.get("review_decision") or "") bucket = str(item.get("review_bucket") or "") if not ref: _add_error(errors, f"{folder_name} review item has no reference_image") if ".." in Path(ref).parts: _add_error(errors, f"{folder_name} review item escapes atlas root: {ref!r}") if decision not in {"selected_reference", "residual_unassigned"}: _add_error(errors, f"{folder_name} review item has unknown decision: {decision!r}") if decision == "selected_reference": if ref not in selected_refs: _add_error(errors, f"{folder_name} selected review item is not in selected_subvariants: {ref}") if bucket == "unassigned" or not bucket: _add_error(errors, f"{folder_name} selected review item has no selected bucket: {ref}") if not item.get("variant_key"): _add_error(errors, f"{folder_name} selected review item has no variant key: {ref}") if decision == "residual_unassigned": if bucket != "unassigned": _add_error(errors, f"{folder_name} residual review item should use unassigned bucket: {ref}") if not item.get("exclusion_reason"): _add_error(errors, f"{folder_name} residual review item has no exclusion reason: {ref}") page_refs = [ str(image.get("reference_image")) for page in manifest.get("contact_sheet_pages") or [] for image in page.get("images") or [] ] if set(page_refs) != set(item_by_ref): _add_error(errors, f"{folder_name} contact-sheet pages and review_items disagree") def validate_normal_camera_atlas( artifacts_dir: Path | str | None = None, *, require_artifacts: bool = True, ) -> list[str]: errors: list[str] = [] artifacts_path = Path(artifacts_dir) if artifacts_dir is not None else prep.DEFAULT_OUTPUT_DIR variants_by_key, _folder_names = _validate_variant_catalog(errors) _validate_built_artifacts(errors) if require_artifacts: _validate_artifact_files(artifacts_path, errors) _validate_alias_metadata(artifacts_path, errors) _validate_review_artifacts(artifacts_path, variants_by_key, errors) return errors def _parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--artifacts-dir", type=Path, default=prep.DEFAULT_OUTPUT_DIR, help="Normal-camera artifact directory to validate.", ) parser.add_argument( "--skip-artifacts", action="store_true", help="Validate in-memory builders and catalogs without requiring generated files.", ) return parser def main(argv: list[str] | None = None) -> int: args = _parser().parse_args(argv) errors = validate_normal_camera_atlas(args.artifacts_dir, require_artifacts=not args.skip_artifacts) if errors: for error in errors: print(f"ERROR: {error}", file=sys.stderr) return 1 print("OK: normal camera atlas validation passed") return 0 if __name__ == "__main__": raise SystemExit(main())