Report index / reports-and-code

portfolio.py

Source: /Users/borker/dev/hybrid-blog-writer-26-voice-pipeline/experiments/same_author_lift/portfolio.py

Open raw file

"""Select the best same-author candidate from existing raw rewrites.

This is a research harness. It does not generate new text. It scans candidates
already written by the transformation experiments, repeats the production
same-author judge to reduce fluke passes, and accepts only candidates that keep
meaning/facts while not increasing obvious batch-level tells.
"""

from __future__ import annotations

import json
import sys
from dataclasses import asdict, dataclass
from pathlib import Path

ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(ROOT))

from simple_writer.pipeline import CHARACTER_PROMPT
from voice_pipeline.drift import detect_drift
from voice_pipeline.llm import call_llm_json
from voice_pipeline.metrics import _words
from voice_pipeline.slop import audit_text

from experiments.same_author_lift.run import (
    AFTER_DIR,
    EXP_DIR,
    SOURCE_DIR,
    clean_article,
    full_quality_guard,
    load_cache,
    make_seed,
    save_cache,
    scaffold_hits,
    sha,
    total_hits,
)


OUT_PATH = EXP_DIR / "portfolio_results.json"
REPORT_PATH = EXP_DIR / "PORTFOLIO_REPORT.md"
RAW_DIR = EXP_DIR / "raw_rewrites"


@dataclass
class CandidateScore:
    path: str
    same_author_votes: int
    same_author_trials: int
    word_count: int
    word_ratio: float
    slop_rate: float
    drift_score: float
    scaffold_hits: int
    voice_delta: int | None
    quality_winner: str | None
    accepted: bool
    reason: str


def same_author_votes(
    seed_excerpt: str,
    text: str,
    trials: int,
    cache: dict | None = None,
) -> tuple[int, list[str]]:
    article_excerpt = " ".join(text.split()[:300])
    bucket: dict[str, list[dict]] | None = None
    key_suffix = f":{sha(seed_excerpt)}:{sha(article_excerpt)}"
    key = f"portfolio:same-author{key_suffix}"
    if cache is not None:
        bucket = cache.setdefault("portfolio_same_author", {})
        if key not in bucket:
            legacy_key = next(
                (cached_key for cached_key in bucket if cached_key.endswith(key_suffix)),
                None,
            )
            if legacy_key:
                bucket[key] = bucket[legacy_key]
        cached = bucket.get(key, [])
        if len(cached) >= trials:
            votes = sum(1 for row in cached[:trials] if row.get("same_author"))
            reasons = [str(row.get("reasoning", "")) for row in cached[:trials]]
            return votes, reasons

    cached_rows = list(bucket.get(key, [])) if bucket is not None else []
    votes = 0
    reasons: list[str] = []
    for row in cached_rows[:trials]:
        yes = bool(row.get("same_author"))
        votes += int(yes)
        reasons.append(str(row.get("reasoning", "")))

    for _ in range(len(cached_rows), trials):
        data = call_llm_json(
            model="anthropic/claude-sonnet-4.6",
            system=CHARACTER_PROMPT.format(excerpt_a=seed_excerpt[:1500], excerpt_b=article_excerpt),
            user="Return JSON.",
            temperature=0.2,
            max_tokens=300,
            timeout=90,
            fallback_models=["x-ai/grok-4.3"],
        )
        yes = bool(data.get("same_author")) if data else False
        votes += int(yes)
        reasons.append((data or {}).get("reasoning", ""))
        cached_rows.append({
            "same_author": yes,
            "reasoning": (data or {}).get("reasoning", ""),
        })
        if bucket is not None:
            bucket[key] = cached_rows
            save_cache(cache)
    return votes, reasons


def raw_candidates_for(slug: str) -> list[Path]:
    candidates = []
    for path in RAW_DIR.glob("*.md"):
        if path.name == slug or path.name.endswith(slug):
            candidates.append(path)
    return sorted(candidates)


def score_candidate(
    seed,
    cache: dict,
    slug: str,
    source: str,
    path: Path,
    trials: int,
    probe_trials: int,
) -> CandidateScore:
    text = path.read_text()

    source_words = len(_words(source))
    words = len(_words(text))
    source_slop = audit_text(source).slop_rate
    slop = audit_text(text).slop_rate
    drift = detect_drift(text, seed.profile).drift_score
    source_hits = total_hits(scaffold_hits(source))
    hits = total_hits(scaffold_hits(text))

    guard: dict = {}
    votes = 0
    reason = ""
    accepted = False
    if words < source_words * 0.76:
        reason = "word count dropped too far"
    elif slop > source_slop + 0.75:
        reason = "slop rose too far"
    elif hits > source_hits:
        reason = "scaffold hits rose"
    else:
        probe_votes, _ = same_author_votes(seed.representative_excerpts[1], text, probe_trials, cache)
        votes = probe_votes
        if probe_votes < max(1, probe_trials):
            reason = "same-author probe failed"
        else:
            votes, _ = same_author_votes(seed.representative_excerpts[1], text, trials, cache)
            majority = trials // 2 + 1
            if votes < majority:
                reason = "same-author majority failed"
            else:
                guard = full_quality_guard(f"portfolio-{path.name}", source, text, cache)
                if guard.get("meaning_preserved") is False:
                    reason = "meaning not preserved"
                elif guard.get("facts_preserved") is False:
                    reason = "facts not preserved"
                elif int(guard.get("voice_delta", 0) or 0) < -1:
                    reason = "voice degraded"
                elif guard.get("new_repetitive_tell") is True:
                    reason = "new repetitive tell"
                else:
                    accepted = True
                    reason = "accepted"

    # Drift is a ranking signal, not a hard reject, because several stable
    # same-author candidates move seed profile drift in the wrong direction
    # while improving slop/repetition and passing the LLM voice gate.
    return CandidateScore(
        path=str(path),
        same_author_votes=votes,
        same_author_trials=trials,
        word_count=words,
        word_ratio=round(words / max(source_words, 1), 3),
        slop_rate=round(slop, 4),
        drift_score=round(drift, 4),
        scaffold_hits=hits,
        voice_delta=int(guard.get("voice_delta", 0) or 0) if guard else None,
        quality_winner=guard.get("winner") if guard else None,
        accepted=accepted,
        reason=reason,
    )


def choose(scores: list[CandidateScore]) -> CandidateScore | None:
    accepted = [score for score in scores if score.accepted]
    if not accepted:
        return None
    return sorted(
        accepted,
        key=lambda s: (
            -s.same_author_votes,
            -(s.voice_delta or 0),
            s.slop_rate,
            s.scaffold_hits,
            abs(1.0 - s.word_ratio),
        ),
    )[0]


def run(limit: int | None, trials: int, probe_trials: int) -> dict:
    seed = make_seed()
    cache = load_cache()
    paths = sorted(SOURCE_DIR.glob("*.md"))
    if limit:
        paths = paths[:limit]

    rows = []
    AFTER_DIR.mkdir(parents=True, exist_ok=True)
    for source_path in paths:
        source = clean_article(source_path.read_text())
        source_votes, _ = same_author_votes(seed.representative_excerpts[1], source, trials, cache)
        candidate_scores = [
            score_candidate(seed, cache, source_path.name, source, candidate_path, trials, probe_trials)
            for candidate_path in raw_candidates_for(source_path.name)
        ]
        winner = choose(candidate_scores)
        if winner:
            selected_text = Path(winner.path).read_text()
            (AFTER_DIR / source_path.name).write_text(selected_text)
        else:
            selected_text = source
            (AFTER_DIR / source_path.name).write_text(source)
        rows.append({
            "slug": source_path.name,
            "source_votes": source_votes,
            "source_pass": source_votes >= 2,
            "selected_path": winner.path if winner else None,
            "selected_pass": bool(winner) or source_votes >= 2,
            "winner": asdict(winner) if winner else None,
            "candidates": [asdict(score) for score in candidate_scores],
        })

    selected_passes = sum(1 for row in rows if row["selected_pass"])
    source_passes = sum(1 for row in rows if row["source_pass"])
    result = {
        "limit": limit,
        "trials": trials,
        "probe_trials": probe_trials,
        "source_passes": source_passes,
        "selected_passes": selected_passes,
        "rows": rows,
    }
    OUT_PATH.write_text(json.dumps(result, indent=2))
    write_report(result)
    return result


def write_report(result: dict) -> None:
    lines = [
        "# Same-author Portfolio Selection",
        "",
        f"- Articles evaluated: {len(result['rows'])}",
        f"- Probe trials per candidate: {result.get('probe_trials', 1)}",
        f"- Repeated trials per candidate: {result['trials']}",
        f"- Source passes: {result['source_passes']}/{len(result['rows'])}",
        f"- Selected passes: {result['selected_passes']}/{len(result['rows'])}",
        "",
        "| Slug | Source | Selected | Winner | Reason |",
        "|---|---:|---:|---|---|",
    ]
    for row in result["rows"]:
        winner = row.get("winner")
        winner_name = Path(winner["path"]).name if winner else ""
        reason = winner["reason"] if winner else "kept source"
        lines.append(
            f"| `{row['slug']}` | {row['source_votes']}/{result['trials']} | "
            f"{str(row['selected_pass']).lower()} | `{winner_name}` | {reason} |"
        )
    REPORT_PATH.write_text("\n".join(lines) + "\n")


def main() -> int:
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--limit", type=int)
    parser.add_argument("--trials", type=int, default=3)
    parser.add_argument("--probe-trials", type=int, default=1)
    args = parser.parse_args()
    result = run(args.limit, args.trials, args.probe_trials)
    print(f"selected passes: {result['selected_passes']}/{len(result['rows'])}")
    print(f"report: {REPORT_PATH}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())