Extract formatter input parsing policy

This commit is contained in:
2026-06-27 01:22:07 +02:00
parent b54b8b9421
commit 4c45d96472
7 changed files with 239 additions and 159 deletions
+9 -54
View File
@@ -1,10 +1,10 @@
from __future__ import annotations
import json
import re
from typing import Any
try:
from . import formatter_input as input_policy
from .krea_action_context import (
is_close_foreplay_text as _is_close_foreplay_text,
is_outercourse_text as _is_outercourse_text,
@@ -34,6 +34,7 @@ try:
from .krea_pov_actions import pov_action_phrase as _pov_action_phrase
from .prompt_hygiene import sanitize_negative_text, sanitize_prose_text
except ImportError: # Allows local smoke tests with `python -c`.
import formatter_input as input_policy
from krea_action_context import (
is_close_foreplay_text as _is_close_foreplay_text,
is_outercourse_text as _is_outercourse_text,
@@ -91,11 +92,7 @@ PROMPT_FIELD_LABELS = (
def _clean(value: Any) -> str:
text = "" if value is None else str(value)
text = text.replace("\n", " ")
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"\s+([,.;:])", r"\1", text)
return text
return input_policy.clean_text(value)
def _is_false(value: Any) -> bool:
@@ -133,69 +130,27 @@ def _with_indefinite_article(text: str) -> str:
def _maybe_json(text: str) -> dict[str, Any] | None:
text = _clean(text)
if not text.startswith("{"):
return None
try:
value = json.loads(text)
except json.JSONDecodeError:
return None
return value if isinstance(value, dict) else None
return input_policy.maybe_json(text)
def _row_from_inputs(source_text: str, metadata_json: str, input_hint: str) -> tuple[dict[str, Any] | None, str]:
candidates: list[tuple[str, str]] = []
if input_hint in ("auto", "metadata_json"):
candidates.append((metadata_json, "metadata_json"))
candidates.append((source_text, "source_json"))
for text, method in candidates:
row = _maybe_json(text)
if row is not None:
return row, method
return None, "text"
return input_policy.row_from_inputs(source_text, metadata_json, input_hint)
def _strip_trigger(text: str, preserve_trigger: bool) -> str:
text = _clean(text)
if preserve_trigger:
return text
for trigger in TRIGGER_CANDIDATES:
if text.lower().startswith(trigger.lower() + ","):
return text[len(trigger) + 1 :].strip(" ,")
if text.lower().startswith(trigger.lower() + "."):
return text[len(trigger) + 1 :].strip(" ,")
return text
return input_policy.strip_trigger_prefix(text, TRIGGER_CANDIDATES, preserve_trigger=preserve_trigger)
def _split_avoid(text: str) -> tuple[str, str]:
match = re.search(r"\bAvoid:\s*(.*)$", text)
if not match:
return text, ""
return text[: match.start()].strip(" ."), match.group(1).strip(" .")
return input_policy.split_avoid(text)
def _prompt_field(text: str, label: str) -> str:
text = _clean(text)
if not text:
return ""
labels = "|".join(re.escape(name) for name in PROMPT_FIELD_LABELS)
pattern = rf"{re.escape(label)}:\s*(.*?)(?=\. (?:{labels}):|\. Use\b|\. Avoid\b|$)"
match = re.search(pattern, text)
if not match:
return ""
return _clean(match.group(1)).rstrip(".")
return input_policy.prompt_field(text, label, field_labels=PROMPT_FIELD_LABELS)
def _row_value(row: dict[str, Any], key: str, labels: tuple[str, ...] = ()) -> str:
value = _clean(row.get(key, ""))
if value:
return value
prompt = _clean(row.get("prompt", ""))
for label in labels:
value = _prompt_field(prompt, label)
if value:
return value
return ""
return input_policy.row_value(row, key, labels, field_labels=PROMPT_FIELD_LABELS)
def _body_phrase(body: Any, figure_note: Any = "") -> str: