Add route simulation quality summary

This commit is contained in:
2026-06-27 20:08:11 +02:00
parent eb1bdbf305
commit de6615c024
4 changed files with 192 additions and 0 deletions
+165
View File
@@ -1403,6 +1403,147 @@ def _route_family_coverage_checks(cases: list[dict[str, Any]]) -> list[dict[str,
]
def _issue_bucket(issue: Any) -> str:
text = str(issue or "").strip()
if not text:
return "empty_issue"
if ":" not in text:
return _clean_key(text.split()[0] if text.split() else text) or "message"
parts = [part.strip() for part in text.split(":") if part.strip()]
if len(parts) >= 2:
return _clean_key(parts[1]).replace(" ", "_") or "message"
return _clean_key(parts[0]).replace(" ", "_") or "message"
def _counter_increment(table: dict[str, int], key: Any, amount: int = 1) -> None:
label = str(key or "").strip()
if not label:
return
table[label] = table.get(label, 0) + amount
def _quality_group_increment(groups: dict[str, dict[str, int]], key: Any, *, issues: int) -> None:
label = str(key or "").strip()
if not label:
return
group = groups.setdefault(label, {"cases": 0, "issues": 0})
group["cases"] += 1
group["issues"] += issues
def _quality_summary(
cases: list[dict[str, Any]],
coverage_checks: list[dict[str, Any]],
axis_checks: list[dict[str, Any]],
pair_seed_checks: list[dict[str, Any]],
) -> dict[str, Any]:
issue_buckets: dict[str, int] = {}
targets: dict[str, dict[str, int]] = {}
action_families: dict[str, dict[str, int]] = {}
position_families: dict[str, dict[str, int]] = {}
weakest_cases: list[dict[str, Any]] = []
route_issues = 0
for case in cases:
issues = list(case.get("issues") or [])
issue_count = len(issues)
route_issues += issue_count
summary = case.get("summary") if isinstance(case.get("summary"), dict) else {}
_quality_group_increment(targets, case.get("target"), issues=issue_count)
_quality_group_increment(action_families, summary.get("action_family"), issues=issue_count)
_quality_group_increment(position_families, summary.get("position_family"), issues=issue_count)
for issue in issues:
_counter_increment(issue_buckets, _issue_bucket(issue))
if issue_count:
weakest_cases.append(
{
"name": case.get("name"),
"target": case.get("target"),
"issues": issue_count,
"action_family": summary.get("action_family"),
"position_family": summary.get("position_family"),
"position_key": summary.get("position_key"),
}
)
check_groups = {
"coverage": coverage_checks,
"axis": axis_checks,
"pair_seed": pair_seed_checks,
}
check_issues_by_group: dict[str, int] = {}
for group_name, checks in check_groups.items():
issue_count = sum(len(check.get("issues") or []) for check in checks)
check_issues_by_group[group_name] = issue_count
for check in checks:
for issue in check.get("issues") or []:
_counter_increment(issue_buckets, _issue_bucket(issue))
weakest_cases.sort(key=lambda item: (-int(item.get("issues") or 0), str(item.get("name") or "")))
return {
"route_cases": len(cases),
"route_issues": route_issues,
"check_issues": sum(check_issues_by_group.values()),
"check_issues_by_group": check_issues_by_group,
"issue_buckets": dict(sorted(issue_buckets.items())),
"targets": dict(sorted(targets.items())),
"action_families": dict(sorted(action_families.items())),
"position_families": dict(sorted(position_families.items())),
"weakest_cases": weakest_cases[:8],
}
def _merge_quality_groups(target: dict[str, dict[str, int]], source: dict[str, Any]) -> None:
for key, raw_group in source.items():
if not isinstance(raw_group, dict):
continue
group = target.setdefault(str(key), {"cases": 0, "issues": 0})
group["cases"] += int(raw_group.get("cases") or 0)
group["issues"] += int(raw_group.get("issues") or 0)
def _sweep_quality_summary(runs: list[dict[str, Any]]) -> dict[str, Any]:
totals = {
"route_cases": 0,
"route_issues": 0,
"check_issues": 0,
}
check_issues_by_group: dict[str, int] = {}
issue_buckets: dict[str, int] = {}
targets: dict[str, dict[str, int]] = {}
action_families: dict[str, dict[str, int]] = {}
position_families: dict[str, dict[str, int]] = {}
weakest_cases: list[dict[str, Any]] = []
for run in runs:
run_seed = (run.get("summary") or {}).get("seed")
quality = run.get("quality") if isinstance(run.get("quality"), dict) else {}
for key in totals:
totals[key] += int(quality.get(key) or 0)
for key, value in (quality.get("check_issues_by_group") or {}).items():
_counter_increment(check_issues_by_group, key, int(value or 0))
for key, value in (quality.get("issue_buckets") or {}).items():
_counter_increment(issue_buckets, key, int(value or 0))
_merge_quality_groups(targets, quality.get("targets") or {})
_merge_quality_groups(action_families, quality.get("action_families") or {})
_merge_quality_groups(position_families, quality.get("position_families") or {})
for case in quality.get("weakest_cases") or []:
if not isinstance(case, dict):
continue
weakest_cases.append({"seed": run_seed, **case})
weakest_cases.sort(key=lambda item: (-int(item.get("issues") or 0), int(item.get("seed") or 0), str(item.get("name") or "")))
return {
**totals,
"check_issues_by_group": dict(sorted(check_issues_by_group.items())),
"issue_buckets": dict(sorted(issue_buckets.items())),
"targets": dict(sorted(targets.items())),
"action_families": dict(sorted(action_families.items())),
"position_families": dict(sorted(position_families.items())),
"weakest_cases": weakest_cases[:12],
}
def run_simulation(seed: int = 3901, *, include_prompts: bool = False) -> dict[str, Any]:
cases: list[dict[str, Any]] = []
regular = _regular_single_case(seed)
@@ -1452,6 +1593,7 @@ def run_simulation(seed: int = 3901, *, include_prompts: bool = False) -> dict[s
for check in pair_seed_checks
for issue in check.get("issues", [])
)
quality = _quality_summary(cases, coverage_checks, axis_checks, pair_seed_checks)
return {
"summary": {
"seed": seed,
@@ -1461,6 +1603,7 @@ def run_simulation(seed: int = 3901, *, include_prompts: bool = False) -> dict[s
"pair_seed_checks": len(pair_seed_checks),
"issues": len(issues),
},
"quality": quality,
"issues": issues,
"cases": cases,
"coverage_checks": coverage_checks,
@@ -1484,6 +1627,7 @@ def run_simulation_sweep(
for run in runs:
run_seed = (run.get("summary") or {}).get("seed")
issues.extend({"seed": run_seed, **issue} for issue in run.get("issues") or [])
quality = _sweep_quality_summary(runs)
return {
"summary": {
"seed": seed,
@@ -1496,6 +1640,7 @@ def run_simulation_sweep(
"pair_seed_checks": sum((run.get("summary") or {}).get("pair_seed_checks", 0) for run in runs),
"issues": len(issues),
},
"quality": quality,
"issues": issues,
"runs": runs,
}
@@ -1503,12 +1648,22 @@ def run_simulation_sweep(
def _print_text_report(report: dict[str, Any]) -> None:
summary = report.get("summary") or {}
quality = report.get("quality") or {}
print(
f"Prompt route simulation: seed={summary.get('seed')} "
f"cases={summary.get('cases')} coverage_checks={summary.get('coverage_checks')} "
f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} "
f"issues={summary.get('issues')}"
)
print(
f"Quality: route_issues={quality.get('route_issues')} "
f"check_issues={quality.get('check_issues')} "
f"targets={quality.get('targets')}"
)
if quality.get("issue_buckets"):
print(f"Quality issue buckets: {quality.get('issue_buckets')}")
if quality.get("weakest_cases"):
print(f"Quality weakest cases: {quality.get('weakest_cases')}")
for case in report.get("cases") or []:
summary_text = case.get("summary") or {}
route = ", ".join(f"{key}={value}" for key, value in summary_text.items() if value not in (None, "", []))
@@ -1534,6 +1689,7 @@ def _print_text_report(report: dict[str, Any]) -> None:
def _print_sweep_report(report: dict[str, Any]) -> None:
summary = report.get("summary") or {}
quality = report.get("quality") or {}
seeds = ", ".join(str(seed) for seed in (summary.get("seeds") or []))
print(
f"Prompt route simulation sweep: seed={summary.get('seed')} "
@@ -1542,6 +1698,15 @@ def _print_sweep_report(report: dict[str, Any]) -> None:
f"axis_checks={summary.get('axis_checks')} pair_seed_checks={summary.get('pair_seed_checks')} "
f"issues={summary.get('issues')}"
)
print(
f"Quality: route_issues={quality.get('route_issues')} "
f"check_issues={quality.get('check_issues')} "
f"targets={quality.get('targets')}"
)
if quality.get("issue_buckets"):
print(f"Quality issue buckets: {quality.get('issue_buckets')}")
if quality.get("weakest_cases"):
print(f"Quality weakest cases: {quality.get('weakest_cases')}")
for run in report.get("runs") or []:
run_summary = run.get("summary") or {}
print(
+16
View File
@@ -7926,11 +7926,20 @@ def smoke_seed_config_policy() -> None:
def smoke_prompt_route_simulation_policy() -> None:
report = prompt_route_simulation.run_simulation(seed=3901, include_prompts=False)
summary = report.get("summary") or {}
quality = report.get("quality") or {}
_expect(summary.get("cases") == 14, "Prompt route simulation case count changed unexpectedly")
_expect(summary.get("coverage_checks") == 2, "Prompt route simulation lost family coverage checks")
_expect(summary.get("axis_checks") == 6, "Prompt route simulation lost axis check coverage")
_expect(summary.get("pair_seed_checks") == 7, "Prompt route simulation lost pair seed check coverage")
_expect(summary.get("issues") == 0, f"Prompt route simulation reported issues: {report.get('issues')}")
_expect(quality.get("route_cases") == 14, "Prompt route simulation quality summary lost route case count")
_expect(quality.get("route_issues") == 0, f"Prompt route simulation quality reported route issues: {quality}")
_expect(quality.get("check_issues") == 0, f"Prompt route simulation quality reported check issues: {quality}")
_expect((quality.get("targets") or {}).get("single", {}).get("cases") == 10, "Prompt route simulation quality lost single target count")
_expect((quality.get("targets") or {}).get("softcore", {}).get("cases") == 2, "Prompt route simulation quality lost softcore target count")
_expect((quality.get("targets") or {}).get("hardcore", {}).get("cases") == 2, "Prompt route simulation quality lost hardcore target count")
_expect(not quality.get("issue_buckets"), "Prompt route simulation quality should have no issue buckets on clean baseline")
_expect(not quality.get("weakest_cases"), "Prompt route simulation quality should have no weak cases on clean baseline")
cases = {case.get("name"): case for case in report.get("cases") or []}
for route_name in (
"hardcore.single.oral",
@@ -8027,10 +8036,17 @@ def smoke_prompt_route_simulation_policy() -> None:
)
sweep = prompt_route_simulation.run_simulation_sweep(seed=3901, count=3, seed_step=101, include_prompts=False)
sweep_summary = sweep.get("summary") or {}
sweep_quality = sweep.get("quality") or {}
_expect(sweep_summary.get("runs") == 3, "Prompt route simulation sweep lost run coverage")
_expect(sweep_summary.get("seeds") == [3901, 4002, 4103], "Prompt route simulation sweep seed sequence changed")
_expect(sweep_summary.get("cases") == 42, "Prompt route simulation sweep case count changed")
_expect(sweep_summary.get("issues") == 0, f"Prompt route simulation sweep reported issues: {sweep.get('issues')}")
_expect(sweep_quality.get("route_cases") == 42, "Prompt route simulation sweep quality lost route case count")
_expect(sweep_quality.get("route_issues") == 0, f"Prompt route simulation sweep quality reported route issues: {sweep_quality}")
_expect(sweep_quality.get("check_issues") == 0, f"Prompt route simulation sweep quality reported check issues: {sweep_quality}")
_expect((sweep_quality.get("targets") or {}).get("single", {}).get("cases") == 30, "Prompt route simulation sweep quality lost single target count")
_expect((sweep_quality.get("targets") or {}).get("softcore", {}).get("cases") == 6, "Prompt route simulation sweep quality lost softcore target count")
_expect((sweep_quality.get("targets") or {}).get("hardcore", {}).get("cases") == 6, "Prompt route simulation sweep quality lost hardcore target count")
def smoke_node_camera_registration() -> None: