Report index / reports-and-code
portfolio.py
Source: /Users/borker/dev/hybrid-blog-writer-26-voice-pipeline/experiments/same_author_lift/portfolio.py
"""Select the best same-author candidate from existing raw rewrites.
This is a research harness. It does not generate new text. It scans candidates
already written by the transformation experiments, repeats the production
same-author judge to reduce fluke passes, and accepts only candidates that keep
meaning/facts while not increasing obvious batch-level tells.
"""
from __future__ import annotations
import json
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(ROOT))
from simple_writer.pipeline import CHARACTER_PROMPT
from voice_pipeline.drift import detect_drift
from voice_pipeline.llm import call_llm_json
from voice_pipeline.metrics import _words
from voice_pipeline.slop import audit_text
from experiments.same_author_lift.run import (
AFTER_DIR,
EXP_DIR,
SOURCE_DIR,
clean_article,
full_quality_guard,
load_cache,
make_seed,
save_cache,
scaffold_hits,
sha,
total_hits,
)
OUT_PATH = EXP_DIR / "portfolio_results.json"
REPORT_PATH = EXP_DIR / "PORTFOLIO_REPORT.md"
RAW_DIR = EXP_DIR / "raw_rewrites"
@dataclass
class CandidateScore:
path: str
same_author_votes: int
same_author_trials: int
word_count: int
word_ratio: float
slop_rate: float
drift_score: float
scaffold_hits: int
voice_delta: int | None
quality_winner: str | None
accepted: bool
reason: str
def same_author_votes(
seed_excerpt: str,
text: str,
trials: int,
cache: dict | None = None,
) -> tuple[int, list[str]]:
article_excerpt = " ".join(text.split()[:300])
bucket: dict[str, list[dict]] | None = None
key_suffix = f":{sha(seed_excerpt)}:{sha(article_excerpt)}"
key = f"portfolio:same-author{key_suffix}"
if cache is not None:
bucket = cache.setdefault("portfolio_same_author", {})
if key not in bucket:
legacy_key = next(
(cached_key for cached_key in bucket if cached_key.endswith(key_suffix)),
None,
)
if legacy_key:
bucket[key] = bucket[legacy_key]
cached = bucket.get(key, [])
if len(cached) >= trials:
votes = sum(1 for row in cached[:trials] if row.get("same_author"))
reasons = [str(row.get("reasoning", "")) for row in cached[:trials]]
return votes, reasons
cached_rows = list(bucket.get(key, [])) if bucket is not None else []
votes = 0
reasons: list[str] = []
for row in cached_rows[:trials]:
yes = bool(row.get("same_author"))
votes += int(yes)
reasons.append(str(row.get("reasoning", "")))
for _ in range(len(cached_rows), trials):
data = call_llm_json(
model="anthropic/claude-sonnet-4.6",
system=CHARACTER_PROMPT.format(excerpt_a=seed_excerpt[:1500], excerpt_b=article_excerpt),
user="Return JSON.",
temperature=0.2,
max_tokens=300,
timeout=90,
fallback_models=["x-ai/grok-4.3"],
)
yes = bool(data.get("same_author")) if data else False
votes += int(yes)
reasons.append((data or {}).get("reasoning", ""))
cached_rows.append({
"same_author": yes,
"reasoning": (data or {}).get("reasoning", ""),
})
if bucket is not None:
bucket[key] = cached_rows
save_cache(cache)
return votes, reasons
def raw_candidates_for(slug: str) -> list[Path]:
candidates = []
for path in RAW_DIR.glob("*.md"):
if path.name == slug or path.name.endswith(slug):
candidates.append(path)
return sorted(candidates)
def score_candidate(
seed,
cache: dict,
slug: str,
source: str,
path: Path,
trials: int,
probe_trials: int,
) -> CandidateScore:
text = path.read_text()
source_words = len(_words(source))
words = len(_words(text))
source_slop = audit_text(source).slop_rate
slop = audit_text(text).slop_rate
drift = detect_drift(text, seed.profile).drift_score
source_hits = total_hits(scaffold_hits(source))
hits = total_hits(scaffold_hits(text))
guard: dict = {}
votes = 0
reason = ""
accepted = False
if words < source_words * 0.76:
reason = "word count dropped too far"
elif slop > source_slop + 0.75:
reason = "slop rose too far"
elif hits > source_hits:
reason = "scaffold hits rose"
else:
probe_votes, _ = same_author_votes(seed.representative_excerpts[1], text, probe_trials, cache)
votes = probe_votes
if probe_votes < max(1, probe_trials):
reason = "same-author probe failed"
else:
votes, _ = same_author_votes(seed.representative_excerpts[1], text, trials, cache)
majority = trials // 2 + 1
if votes < majority:
reason = "same-author majority failed"
else:
guard = full_quality_guard(f"portfolio-{path.name}", source, text, cache)
if guard.get("meaning_preserved") is False:
reason = "meaning not preserved"
elif guard.get("facts_preserved") is False:
reason = "facts not preserved"
elif int(guard.get("voice_delta", 0) or 0) < -1:
reason = "voice degraded"
elif guard.get("new_repetitive_tell") is True:
reason = "new repetitive tell"
else:
accepted = True
reason = "accepted"
# Drift is a ranking signal, not a hard reject, because several stable
# same-author candidates move seed profile drift in the wrong direction
# while improving slop/repetition and passing the LLM voice gate.
return CandidateScore(
path=str(path),
same_author_votes=votes,
same_author_trials=trials,
word_count=words,
word_ratio=round(words / max(source_words, 1), 3),
slop_rate=round(slop, 4),
drift_score=round(drift, 4),
scaffold_hits=hits,
voice_delta=int(guard.get("voice_delta", 0) or 0) if guard else None,
quality_winner=guard.get("winner") if guard else None,
accepted=accepted,
reason=reason,
)
def choose(scores: list[CandidateScore]) -> CandidateScore | None:
accepted = [score for score in scores if score.accepted]
if not accepted:
return None
return sorted(
accepted,
key=lambda s: (
-s.same_author_votes,
-(s.voice_delta or 0),
s.slop_rate,
s.scaffold_hits,
abs(1.0 - s.word_ratio),
),
)[0]
def run(limit: int | None, trials: int, probe_trials: int) -> dict:
seed = make_seed()
cache = load_cache()
paths = sorted(SOURCE_DIR.glob("*.md"))
if limit:
paths = paths[:limit]
rows = []
AFTER_DIR.mkdir(parents=True, exist_ok=True)
for source_path in paths:
source = clean_article(source_path.read_text())
source_votes, _ = same_author_votes(seed.representative_excerpts[1], source, trials, cache)
candidate_scores = [
score_candidate(seed, cache, source_path.name, source, candidate_path, trials, probe_trials)
for candidate_path in raw_candidates_for(source_path.name)
]
winner = choose(candidate_scores)
if winner:
selected_text = Path(winner.path).read_text()
(AFTER_DIR / source_path.name).write_text(selected_text)
else:
selected_text = source
(AFTER_DIR / source_path.name).write_text(source)
rows.append({
"slug": source_path.name,
"source_votes": source_votes,
"source_pass": source_votes >= 2,
"selected_path": winner.path if winner else None,
"selected_pass": bool(winner) or source_votes >= 2,
"winner": asdict(winner) if winner else None,
"candidates": [asdict(score) for score in candidate_scores],
})
selected_passes = sum(1 for row in rows if row["selected_pass"])
source_passes = sum(1 for row in rows if row["source_pass"])
result = {
"limit": limit,
"trials": trials,
"probe_trials": probe_trials,
"source_passes": source_passes,
"selected_passes": selected_passes,
"rows": rows,
}
OUT_PATH.write_text(json.dumps(result, indent=2))
write_report(result)
return result
def write_report(result: dict) -> None:
lines = [
"# Same-author Portfolio Selection",
"",
f"- Articles evaluated: {len(result['rows'])}",
f"- Probe trials per candidate: {result.get('probe_trials', 1)}",
f"- Repeated trials per candidate: {result['trials']}",
f"- Source passes: {result['source_passes']}/{len(result['rows'])}",
f"- Selected passes: {result['selected_passes']}/{len(result['rows'])}",
"",
"| Slug | Source | Selected | Winner | Reason |",
"|---|---:|---:|---|---|",
]
for row in result["rows"]:
winner = row.get("winner")
winner_name = Path(winner["path"]).name if winner else ""
reason = winner["reason"] if winner else "kept source"
lines.append(
f"| `{row['slug']}` | {row['source_votes']}/{result['trials']} | "
f"{str(row['selected_pass']).lower()} | `{winner_name}` | {reason} |"
)
REPORT_PATH.write_text("\n".join(lines) + "\n")
def main() -> int:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--limit", type=int)
parser.add_argument("--trials", type=int, default=3)
parser.add_argument("--probe-trials", type=int, default=1)
args = parser.parse_args()
result = run(args.limit, args.trials, args.probe_trials)
print(f"selected passes: {result['selected_passes']}/{len(result['rows'])}")
print(f"report: {REPORT_PATH}")
return 0
if __name__ == "__main__":
raise SystemExit(main())