340 lines
16 KiB
Python
340 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Validate no-generation normal-camera atlas maintenance artifacts."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
import normal_camera_atlas_prep as prep # noqa: E402
|
|
|
|
|
|
EXPECTED_VARIANT_COUNT = 131
|
|
EXPECTED_PRE_AB_COUNT = 55
|
|
EXPECTED_NEEDS_SAMPLES_COUNT = 76
|
|
EXPECTED_COVERED_NON_EMPTY_FOLDER_COUNT = 101
|
|
EXPECTED_UNCOVERED_NON_EMPTY_FOLDER_COUNT = 0
|
|
|
|
JSON_ARTIFACT_SCHEMAS = {
|
|
"normal_camera_priority_plan.json": prep.PRIORITY_PLAN_SCHEMA,
|
|
"normal_camera_prompt_cue_batch.json": prep.PROMPT_CUE_BATCH_SCHEMA,
|
|
"normal_camera_score_sheet.json": prep.SCORE_SHEET_SCHEMA,
|
|
"normal_camera_unused_pool_backlog.json": prep.UNUSED_POOL_BACKLOG_SCHEMA,
|
|
"normal_camera_needs_samples_acquisition.json": prep.NEEDS_SAMPLES_ACQUISITION_SCHEMA,
|
|
}
|
|
|
|
TEXT_ARTIFACTS = (
|
|
"normal_camera_acceptance_gates.md",
|
|
"normal_camera_needs_samples_acquisition.md",
|
|
"review/index.md",
|
|
"review/index.html",
|
|
)
|
|
|
|
|
|
def _add_error(errors: list[str], message: str) -> None:
|
|
errors.append(message)
|
|
|
|
|
|
def _json_file(path: Path, errors: list[str]) -> dict[str, Any]:
|
|
if not path.is_file():
|
|
_add_error(errors, f"missing artifact: {path}")
|
|
return {}
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
_add_error(errors, f"invalid JSON artifact {path}: {exc}")
|
|
return {}
|
|
if not isinstance(payload, dict):
|
|
_add_error(errors, f"JSON artifact is not an object: {path}")
|
|
return {}
|
|
return payload
|
|
|
|
|
|
def _expect_object(value: Any, name: str, errors: list[str]) -> dict[str, Any]:
|
|
if not isinstance(value, dict):
|
|
_add_error(errors, f"{name} must be an object")
|
|
return {}
|
|
return value
|
|
|
|
|
|
def _expect_list(value: Any, name: str, errors: list[str]) -> list[Any]:
|
|
if not isinstance(value, list):
|
|
_add_error(errors, f"{name} must be a list")
|
|
return []
|
|
return value
|
|
|
|
|
|
def _expect_schema(payload: dict[str, Any], schema: str, name: str, errors: list[str]) -> None:
|
|
if payload.get("schema") != schema:
|
|
_add_error(errors, f"{name} schema changed: {payload.get('schema')!r}")
|
|
if payload.get("no_generation") is not True:
|
|
_add_error(errors, f"{name} must stay no_generation=true")
|
|
|
|
|
|
def _validate_variant_catalog(errors: list[str]) -> tuple[dict[str, dict[str, Any]], set[str]]:
|
|
atlas = prep.load_atlas()
|
|
variants_catalog = prep.load_variants()
|
|
folders = _expect_list(atlas.get("folders"), "normal_camera_atlas.folders", errors)
|
|
variants = _expect_list(variants_catalog.get("variants"), "normal_camera_variants.variants", errors)
|
|
inventory = _expect_object(variants_catalog.get("inventory"), "normal_camera_variants.inventory", errors)
|
|
status_counts = _expect_object(inventory.get("status_counts"), "normal_camera_variants.inventory.status_counts", errors)
|
|
|
|
if len(variants) != EXPECTED_VARIANT_COUNT:
|
|
_add_error(errors, f"expected {EXPECTED_VARIANT_COUNT} variants, got {len(variants)}")
|
|
if int(status_counts.get("pre_ab_candidate") or 0) != EXPECTED_PRE_AB_COUNT:
|
|
_add_error(errors, "pre_ab_candidate count changed")
|
|
if int(status_counts.get("needs_samples") or 0) != EXPECTED_NEEDS_SAMPLES_COUNT:
|
|
_add_error(errors, "needs_samples count changed")
|
|
if int(inventory.get("covered_non_empty_folder_count") or 0) != EXPECTED_COVERED_NON_EMPTY_FOLDER_COUNT:
|
|
_add_error(errors, "covered non-empty folder count changed")
|
|
if int(inventory.get("uncovered_non_empty_folder_count") or 0) != EXPECTED_UNCOVERED_NON_EMPTY_FOLDER_COUNT:
|
|
_add_error(errors, "normal-camera atlas has uncovered non-empty folders")
|
|
|
|
folder_names = {str(folder.get("folder")) for folder in folders if str(folder.get("folder") or "").strip()}
|
|
variants_by_key: dict[str, dict[str, Any]] = {}
|
|
seen_variant_keys: set[str] = set()
|
|
status_values = set((variants_catalog.get("status_values") or {}).keys())
|
|
for index, variant in enumerate(variants, start=1):
|
|
variant_obj = _expect_object(variant, f"variant[{index}]", errors)
|
|
key = str(variant_obj.get("key") or "")
|
|
if not key:
|
|
_add_error(errors, f"variant[{index}] has no key")
|
|
continue
|
|
if key in seen_variant_keys:
|
|
_add_error(errors, f"duplicate variant key: {key}")
|
|
seen_variant_keys.add(key)
|
|
variants_by_key[key] = variant_obj
|
|
if variant_obj.get("status") not in status_values:
|
|
_add_error(errors, f"{key} has unknown status: {variant_obj.get('status')!r}")
|
|
for field in ("family", "action_family", "camera_view", "canonical_geometry"):
|
|
if not str(variant_obj.get(field) or "").strip():
|
|
_add_error(errors, f"{key}.{field} is empty")
|
|
for field in ("atlas_folders", "reference_images", "prompt_cues", "avoid_cues"):
|
|
values = _expect_list(variant_obj.get(field), f"{key}.{field}", errors)
|
|
if not values:
|
|
_add_error(errors, f"{key}.{field} is empty")
|
|
for folder_name in variant_obj.get("atlas_folders") or []:
|
|
if str(folder_name) not in folder_names:
|
|
_add_error(errors, f"{key} references unknown atlas folder: {folder_name!r}")
|
|
for ref in variant_obj.get("reference_images") or []:
|
|
ref_text = str(ref or "")
|
|
if ".." in Path(ref_text).parts:
|
|
_add_error(errors, f"{key} reference escapes atlas root: {ref_text!r}")
|
|
hook = _expect_object(variant_obj.get("generator_hook"), f"{key}.generator_hook", errors)
|
|
if "Future hook only" not in str(hook.get("notes") or ""):
|
|
_add_error(errors, f"{key} generator hook should remain future-only")
|
|
return variants_by_key, folder_names
|
|
|
|
|
|
def _validate_built_artifacts(errors: list[str]) -> None:
|
|
priority_plan = prep.build_priority_plan()
|
|
prompt_batch = prep.build_prompt_cue_batch()
|
|
score_sheet = prep.build_score_sheet()
|
|
unused_pool_backlog = prep.build_unused_pool_backlog()
|
|
acquisition = prep.build_needs_samples_acquisition()
|
|
|
|
_expect_schema(priority_plan, prep.PRIORITY_PLAN_SCHEMA, "built priority plan", errors)
|
|
_expect_schema(prompt_batch, prep.PROMPT_CUE_BATCH_SCHEMA, "built prompt cue batch", errors)
|
|
_expect_schema(score_sheet, prep.SCORE_SHEET_SCHEMA, "built score sheet", errors)
|
|
_expect_schema(unused_pool_backlog, prep.UNUSED_POOL_BACKLOG_SCHEMA, "built unused-pool backlog", errors)
|
|
_expect_schema(acquisition, prep.NEEDS_SAMPLES_ACQUISITION_SCHEMA, "built needs-samples acquisition", errors)
|
|
|
|
if int(priority_plan.get("selected_count") or 0) != EXPECTED_PRE_AB_COUNT:
|
|
_add_error(errors, "priority plan should include all pre-A/B candidates")
|
|
if len(prompt_batch.get("items") or []) != EXPECTED_PRE_AB_COUNT:
|
|
_add_error(errors, "prompt cue batch should include all pre-A/B candidates")
|
|
if len(score_sheet.get("rows") or []) != EXPECTED_PRE_AB_COUNT:
|
|
_add_error(errors, "score sheet should include all pre-A/B candidates")
|
|
if int(acquisition.get("available_variant_count") or 0) != EXPECTED_NEEDS_SAMPLES_COUNT:
|
|
_add_error(errors, "needs-samples acquisition should include all needs_samples variants")
|
|
|
|
acquisition_rows = acquisition.get("variants") or []
|
|
missing_counts = [int(row.get("missing_reference_count") or 0) for row in acquisition_rows]
|
|
if missing_counts != sorted(missing_counts, reverse=True):
|
|
_add_error(errors, "needs-samples acquisition must be sorted by missing reference count")
|
|
for row in acquisition_rows:
|
|
if row.get("target_reference_count") != prep.NEEDS_SAMPLES_TARGET_REFERENCE_COUNT:
|
|
_add_error(errors, f"{row.get('variant_key')} has wrong acquisition target")
|
|
if row.get("missing_reference_count") is None:
|
|
_add_error(errors, f"{row.get('variant_key')} has no missing reference count")
|
|
|
|
|
|
def _validate_artifact_files(artifacts_dir: Path, errors: list[str]) -> None:
|
|
for relative_path, schema in JSON_ARTIFACT_SCHEMAS.items():
|
|
payload = _json_file(artifacts_dir / relative_path, errors)
|
|
if payload:
|
|
_expect_schema(payload, schema, relative_path, errors)
|
|
|
|
for relative_path in TEXT_ARTIFACTS:
|
|
path = artifacts_dir / relative_path
|
|
if not path.is_file():
|
|
_add_error(errors, f"missing artifact: {path}")
|
|
continue
|
|
text = path.read_text(encoding="utf-8")
|
|
if relative_path.endswith(".html") and "No-generation" not in text:
|
|
_add_error(errors, f"{relative_path} lost no-generation language")
|
|
if relative_path.endswith(".md") and "No-generation" not in text:
|
|
_add_error(errors, f"{relative_path} lost no-generation language")
|
|
|
|
gates_path = artifacts_dir / "normal_camera_acceptance_gates.md"
|
|
if gates_path.is_file() and "body_proportion_control" not in gates_path.read_text(encoding="utf-8"):
|
|
_add_error(errors, "acceptance gates lost body_proportion_control")
|
|
|
|
|
|
def _validate_alias_metadata(artifacts_dir: Path, errors: list[str]) -> None:
|
|
atlas_folders = {str(folder.get("folder")) for folder in prep.load_atlas().get("folders") or []}
|
|
variant_folders = {
|
|
str(folder)
|
|
for variant in prep.load_variants().get("variants") or []
|
|
for folder in variant.get("atlas_folders") or []
|
|
}
|
|
known_folder_names = atlas_folders | variant_folders
|
|
for folder_name, alias in prep.SOURCE_FOLDER_ALIASES.items():
|
|
if folder_name not in known_folder_names:
|
|
_add_error(errors, f"alias folder is not present in atlas/variants: {folder_name!r}")
|
|
if not str(alias.get("canonical_folder") or "").strip():
|
|
_add_error(errors, f"alias has no canonical folder: {folder_name!r}")
|
|
if not str(alias.get("alias_reason") or "").strip():
|
|
_add_error(errors, f"alias has no reason: {folder_name!r}")
|
|
|
|
backlog = _json_file(artifacts_dir / "normal_camera_unused_pool_backlog.json", errors)
|
|
for row in backlog.get("folders") or []:
|
|
folder_name = str(row.get("folder") or "")
|
|
alias = row.get("folder_alias") or {}
|
|
if not alias:
|
|
_add_error(errors, f"unused-pool row has no folder_alias: {folder_name!r}")
|
|
continue
|
|
if folder_name in prep.SOURCE_FOLDER_ALIASES and alias.get("alias_applied") is not True:
|
|
_add_error(errors, f"known alias folder is not marked alias_applied in backlog: {folder_name!r}")
|
|
|
|
|
|
def _validate_review_artifacts(artifacts_dir: Path, variants_by_key: dict[str, dict[str, Any]], errors: list[str]) -> None:
|
|
review_dir = artifacts_dir / "review"
|
|
for folder_name in prep.DEFAULT_REVIEW_FOLDERS:
|
|
stem = prep._safe_artifact_stem(folder_name)
|
|
manifest_path = review_dir / f"{stem}_review_manifest.json"
|
|
contact_sheet_path = review_dir / f"{stem}_contact_sheet.html"
|
|
manifest = _json_file(manifest_path, errors)
|
|
if not contact_sheet_path.is_file():
|
|
_add_error(errors, f"missing contact sheet: {contact_sheet_path}")
|
|
continue
|
|
contact_sheet = contact_sheet_path.read_text(encoding="utf-8")
|
|
if "No-generation contact sheet" not in contact_sheet:
|
|
_add_error(errors, f"{folder_name} contact sheet lost no-generation scope")
|
|
if "data-review-decision=" not in contact_sheet:
|
|
_add_error(errors, f"{folder_name} contact sheet has no review-decision attributes")
|
|
if not manifest:
|
|
continue
|
|
|
|
_expect_schema(manifest, prep.REVIEW_MANIFEST_SCHEMA, f"{folder_name} review manifest", errors)
|
|
if manifest.get("folder") != folder_name:
|
|
_add_error(errors, f"{folder_name} manifest folder changed: {manifest.get('folder')!r}")
|
|
alias = manifest.get("folder_alias") or {}
|
|
if alias.get("exact_source_folder") != folder_name:
|
|
_add_error(errors, f"{folder_name} manifest alias does not preserve exact source folder")
|
|
if folder_name in prep.SOURCE_FOLDER_ALIASES and alias.get("alias_applied") is not True:
|
|
_add_error(errors, f"{folder_name} manifest should mark alias_applied")
|
|
|
|
review_items = _expect_list(manifest.get("review_items"), f"{folder_name}.review_items", errors)
|
|
if len(review_items) != int(manifest.get("image_count") or 0):
|
|
_add_error(errors, f"{folder_name} review_items count does not match image_count")
|
|
item_by_ref = {str(item.get("reference_image")): item for item in review_items}
|
|
selected_refs: set[str] = set()
|
|
for selected in manifest.get("selected_subvariants") or []:
|
|
variant_key = str(selected.get("variant_key") or "")
|
|
if variant_key not in variants_by_key:
|
|
_add_error(errors, f"{folder_name} selected subvariant is missing from catalog: {variant_key}")
|
|
continue
|
|
catalog_refs = set(str(ref) for ref in variants_by_key[variant_key].get("reference_images") or [])
|
|
for ref in selected.get("reference_images") or []:
|
|
ref_text = str(ref)
|
|
selected_refs.add(ref_text)
|
|
if ref_text not in catalog_refs:
|
|
_add_error(errors, f"{folder_name} selected ref is not in catalog variant {variant_key}: {ref_text}")
|
|
|
|
for item in review_items:
|
|
ref = str(item.get("reference_image") or "")
|
|
decision = str(item.get("review_decision") or "")
|
|
bucket = str(item.get("review_bucket") or "")
|
|
if not ref:
|
|
_add_error(errors, f"{folder_name} review item has no reference_image")
|
|
if ".." in Path(ref).parts:
|
|
_add_error(errors, f"{folder_name} review item escapes atlas root: {ref!r}")
|
|
if decision not in {"selected_reference", "residual_unassigned"}:
|
|
_add_error(errors, f"{folder_name} review item has unknown decision: {decision!r}")
|
|
if decision == "selected_reference":
|
|
if ref not in selected_refs:
|
|
_add_error(errors, f"{folder_name} selected review item is not in selected_subvariants: {ref}")
|
|
if bucket == "unassigned" or not bucket:
|
|
_add_error(errors, f"{folder_name} selected review item has no selected bucket: {ref}")
|
|
if not item.get("variant_key"):
|
|
_add_error(errors, f"{folder_name} selected review item has no variant key: {ref}")
|
|
if decision == "residual_unassigned":
|
|
if bucket != "unassigned":
|
|
_add_error(errors, f"{folder_name} residual review item should use unassigned bucket: {ref}")
|
|
if not item.get("exclusion_reason"):
|
|
_add_error(errors, f"{folder_name} residual review item has no exclusion reason: {ref}")
|
|
|
|
page_refs = [
|
|
str(image.get("reference_image"))
|
|
for page in manifest.get("contact_sheet_pages") or []
|
|
for image in page.get("images") or []
|
|
]
|
|
if set(page_refs) != set(item_by_ref):
|
|
_add_error(errors, f"{folder_name} contact-sheet pages and review_items disagree")
|
|
|
|
|
|
def validate_normal_camera_atlas(
|
|
artifacts_dir: Path | str | None = None,
|
|
*,
|
|
require_artifacts: bool = True,
|
|
) -> list[str]:
|
|
errors: list[str] = []
|
|
artifacts_path = Path(artifacts_dir) if artifacts_dir is not None else prep.DEFAULT_OUTPUT_DIR
|
|
variants_by_key, _folder_names = _validate_variant_catalog(errors)
|
|
_validate_built_artifacts(errors)
|
|
if require_artifacts:
|
|
_validate_artifact_files(artifacts_path, errors)
|
|
_validate_alias_metadata(artifacts_path, errors)
|
|
_validate_review_artifacts(artifacts_path, variants_by_key, errors)
|
|
return errors
|
|
|
|
|
|
def _parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"--artifacts-dir",
|
|
type=Path,
|
|
default=prep.DEFAULT_OUTPUT_DIR,
|
|
help="Normal-camera artifact directory to validate.",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-artifacts",
|
|
action="store_true",
|
|
help="Validate in-memory builders and catalogs without requiring generated files.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = _parser().parse_args(argv)
|
|
errors = validate_normal_camera_atlas(args.artifacts_dir, require_artifacts=not args.skip_artifacts)
|
|
if errors:
|
|
for error in errors:
|
|
print(f"ERROR: {error}", file=sys.stderr)
|
|
return 1
|
|
print("OK: normal camera atlas validation passed")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|