Add Krea2 POV routing and eval tooling

This commit is contained in:
2026-06-30 19:28:10 +02:00
parent 284c6279e6
commit f5ba07e340
29 changed files with 6331 additions and 400 deletions
+81 -1
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from collections import Counter
from pathlib import Path
import sys
from typing import Any
try:
@@ -31,11 +32,13 @@ def _latest_evidence(entries: list[dict[str, Any]], *, result: str | None = None
return {
"id": entry.get("id") or "",
"seed": entry.get("seed"),
"generator_seed": entry.get("generator_seed"),
"result": entry.get("result") or "",
"decision": entry.get("decision") or "",
"baseline_prompt_summary": entry.get("baseline_prompt_summary") or "",
"candidate_prompt_summary": entry.get("candidate_prompt_summary") or "",
"observation": entry.get("observation") or "",
"needs_expansion": bool(entry.get("needs_expansion")),
"commit": entry.get("commit") or "",
}
@@ -228,6 +231,45 @@ def next_test_plans() -> list[dict[str, Any]]:
return plans
def guide_expansion_plans() -> list[dict[str, Any]]:
plans: list[dict[str, Any]] = []
for row in coverage_rows():
latest_accepted = row.get("latest_accepted_evidence") or {}
decision = str(latest_accepted.get("decision") or "")
if decision not in {"prompt_guide_rule", "needs_more_tests"} and not (
decision == "provisional_generator_patch" and latest_accepted.get("needs_expansion")
):
continue
key = str(row.get("key") or "")
variant = krea2_pose_variant_catalog.get_variant(key)
if not variant:
continue
evidence = variant.get("evidence") or {}
plans.append(
{
"key": key,
"family": variant.get("family") or "",
"action_family": variant.get("action_family") or "",
"status": variant.get("status") or "",
"coverage_state": row.get("coverage_state") or "",
"target": "multi_seed_multi_woman_matrix",
"latest_accepted_id": latest_accepted.get("id") or "",
"latest_accepted_seed": latest_accepted.get("seed"),
"latest_accepted_decision": decision,
"accepted_evidence_count": row.get("accepted_evidence_count") or 0,
"total_evidence_count": row.get("total_evidence_count") or 0,
"canonical_geometry": variant.get("canonical_geometry") or "",
"prompt_cues": list(variant.get("prompt_cues") or []),
"avoid_cues": list(variant.get("avoid_cues") or []),
"reference_paths": [str(path) for path in krea2_pose_variant_catalog.reference_paths(key)],
"generator_hook": variant.get("generator_hook") or {},
"guide_section": evidence.get("guide_section") or "",
"notes": evidence.get("notes") or "",
}
)
return plans
def next_eval_template_commands(*, seed_token: str = "<fixed_seed>") -> list[dict[str, str]]:
commands: list[dict[str, str]] = []
for plan in next_test_plans():
@@ -261,14 +303,32 @@ def markdown_report(atlas_root: str | Path | None = None) -> str:
evidence = row.get("latest_evidence") or {}
seed = evidence.get("seed")
seed_text = f"seed {seed}" if isinstance(seed, int) else "seed unknown"
generator_seed = evidence.get("generator_seed")
generator_seed_text = f", generator seed {generator_seed}" if isinstance(generator_seed, int) else ""
commit = evidence.get("commit") or "uncommitted"
lines.append(
f"- {row['key']}: {evidence.get('id') or 'unnamed'} ({evidence.get('result') or 'unknown'}, {seed_text}, {evidence.get('decision') or 'unknown'}, commit {commit})"
f"- {row['key']}: {evidence.get('id') or 'unnamed'} ({evidence.get('result') or 'unknown'}, {seed_text}{generator_seed_text}, {evidence.get('decision') or 'unknown'}, commit {commit})"
)
if evidence.get("candidate_prompt_summary"):
lines.append(f" Candidate: {evidence['candidate_prompt_summary']}")
if evidence.get("observation"):
lines.append(f" Observation: {evidence['observation']}")
accepted = row.get("latest_accepted_evidence") or {}
if accepted and accepted.get("id") != evidence.get("id"):
accepted_seed = accepted.get("seed")
accepted_seed_text = f"seed {accepted_seed}" if isinstance(accepted_seed, int) else "seed unknown"
accepted_generator_seed = accepted.get("generator_seed")
accepted_generator_seed_text = (
f", generator seed {accepted_generator_seed}" if isinstance(accepted_generator_seed, int) else ""
)
accepted_commit = accepted.get("commit") or "uncommitted"
lines.append(
f" Latest accepted: {accepted.get('id') or 'unnamed'} ({accepted.get('result') or 'unknown'}, {accepted_seed_text}{accepted_generator_seed_text}, {accepted.get('decision') or 'unknown'}, commit {accepted_commit})"
)
if accepted.get("candidate_prompt_summary"):
lines.append(f" Accepted candidate: {accepted['candidate_prompt_summary']}")
if accepted.get("observation"):
lines.append(f" Accepted observation: {accepted['observation']}")
summary = coverage_summary()
if summary["next_test_candidates"]:
lines.extend(
@@ -294,6 +354,16 @@ def markdown_report(atlas_root: str | Path | None = None) -> str:
lines.append(
f"- {row['key']}: {difficulty}, {priority} priority, {control_requirement}"
)
expansion_plans = guide_expansion_plans()
if expansion_plans:
lines.extend(["", "## Guide/Fragile Evidence Expansion", ""])
for plan in expansion_plans:
seed = plan.get("latest_accepted_seed")
seed_text = f"seed {seed}" if isinstance(seed, int) else "seed unknown"
lines.append(
f"- {plan['key']}: {plan['target']} after {plan['latest_accepted_decision']} "
f"({plan['latest_accepted_id']}, {seed_text})"
)
plans = next_test_plans()
if plans:
lines.extend(["", "## Next Test Plans"])
@@ -344,3 +414,13 @@ def markdown_report(atlas_root: str | Path | None = None) -> str:
]
)
return "\n".join(lines)
def main(argv: list[str] | None = None) -> int:
_ = argv
print(markdown_report())
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))