# Stale-by-Default Versioning — Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Implement automatic LLM triage of code changes so cosmetic changes are auto-blessed and semantic changes trigger Pushover notifications + `jobctl stale` visibility.

**Architecture:** At stage worker startup, compare current hash against stored snapshots. On mismatch, diff old vs new source, ask Opus "cosmetic or semantic?", and auto-bless or notify. Source snapshots stored in SQLite alongside existing version_equivalences.

**Tech Stack:** SQLite (version_snapshots table), lib.llm.json.call_llm_json (Opus triage), lib.notify.push (Pushover), click (CLI)

**Design doc:** `docs/plans/2026-03-05-stale-by-default-versioning.md`

---

### Task 1: version_snapshots table

**Files:**
- Modify: `jobs/lib/db/__init__.py` (add schema + init)
- Modify: `jobs/lib/db/results.py` (add store/load functions)
- Test: `jobs/lib/db/tests/test_snapshots.py`

**Step 1: Write failing tests**

```python
# jobs/lib/db/tests/test_snapshots.py
"""Tests for version snapshot storage."""
import sqlite3
from jobs.lib.db import open_raw_db
from jobs.lib.db.results import store_snapshot, load_snapshot


def _mem_db():
    """In-memory DB with schema."""
    from jobs.lib.db import (SCHEMA, ITEM_STAGES_SCHEMA, EXTRA_SCHEMA,
                              VERSION_EQUIVALENCES_SCHEMA, VERSION_SNAPSHOTS_SCHEMA,
                              PARENT_INDEX, _run_migrations)
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)
    conn.executescript(ITEM_STAGES_SCHEMA)
    conn.executescript(EXTRA_SCHEMA)
    conn.executescript(VERSION_EQUIVALENCES_SCHEMA)
    conn.executescript(VERSION_SNAPSHOTS_SCHEMA)
    _run_migrations(conn)
    conn.executescript(PARENT_INDEX)
    return conn


def test_store_and_load_snapshot():
    conn = _mem_db()
    store_snapshot(conn, "vic_ideas", "extract", "abc123", "def foo(): pass", "import bar")
    snap = load_snapshot(conn, "vic_ideas", "extract", "abc123")
    assert snap is not None
    assert snap["source_text"] == "def foo(): pass"
    assert snap["deps_text"] == "import bar"


def test_store_snapshot_idempotent():
    conn = _mem_db()
    store_snapshot(conn, "vic_ideas", "extract", "abc123", "def foo(): pass", None)
    store_snapshot(conn, "vic_ideas", "extract", "abc123", "def foo(): pass", None)
    row = conn.execute("SELECT COUNT(*) FROM version_snapshots").fetchone()
    assert row[0] == 1


def test_load_snapshot_missing():
    conn = _mem_db()
    assert load_snapshot(conn, "vic_ideas", "extract", "missing") is None
```

**Step 2: Run tests, verify they fail**

Run: `python -m pytest jobs/lib/db/tests/test_snapshots.py -v`
Expected: ImportError — `VERSION_SNAPSHOTS_SCHEMA`, `store_snapshot`, `load_snapshot` don't exist yet.

**Step 3: Add VERSION_SNAPSHOTS_SCHEMA to `jobs/lib/db/__init__.py`**

After the existing `VERSION_EQUIVALENCES_SCHEMA` block (~line 150), add:

```python
VERSION_SNAPSHOTS_SCHEMA = """
CREATE TABLE IF NOT EXISTS version_snapshots (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id       TEXT NOT NULL,
    stage        TEXT NOT NULL,
    version_hash TEXT NOT NULL,
    source_text  TEXT NOT NULL,
    deps_text    TEXT,
    created_at   TEXT DEFAULT (datetime('now')),
    UNIQUE(job_id, stage, version_hash)
);
"""
```

In `open_raw_db()`, add after `conn.executescript(VERSION_EQUIVALENCES_SCHEMA)`:

```python
conn.executescript(VERSION_SNAPSHOTS_SCHEMA)
```

**Step 4: Add `store_snapshot` and `load_snapshot` to `jobs/lib/db/results.py`**

At bottom of file:

```python
def store_snapshot(conn: sqlite3.Connection, job_id: str, stage: str,
                   version_hash: str, source_text: str, deps_text: str | None = None) -> None:
    """Store source snapshot for a version hash. Idempotent (INSERT OR IGNORE)."""
    conn.execute(
        "INSERT OR IGNORE INTO version_snapshots "
        "(job_id, stage, version_hash, source_text, deps_text) VALUES (?, ?, ?, ?, ?)",
        (job_id, stage, version_hash, source_text, deps_text),
    )
    conn.commit()


def load_snapshot(conn: sqlite3.Connection, job_id: str, stage: str,
                  version_hash: str) -> dict | None:
    """Load stored snapshot for a version hash. Returns dict with source_text, deps_text."""
    row = conn.execute(
        "SELECT source_text, deps_text FROM version_snapshots "
        "WHERE job_id = ? AND stage = ? AND version_hash = ?",
        (job_id, stage, version_hash),
    ).fetchone()
    if not row:
        return None
    return {"source_text": row["source_text"], "deps_text": row["deps_text"]}
```

**Step 5: Run tests, verify they pass**

Run: `python -m pytest jobs/lib/db/tests/test_snapshots.py -v`
Expected: 3 passed.

**Step 6: Commit**

```bash
git add jobs/lib/db/__init__.py jobs/lib/db/results.py jobs/lib/db/tests/test_snapshots.py
git commit -m "feat(jobs): add version_snapshots table for source diffing"
```

---

### Task 2: Source capture helper

**Files:**
- Modify: `jobs/lib/db/results.py` (add `capture_stage_snapshot`)
- Test: `jobs/lib/db/tests/test_snapshots.py` (add capture test)

The runner needs a function that, given a handler + stage, captures the source text and stores it as a snapshot. This wraps `inspect.getsource()` + VERSION_DEPS extraction + `store_snapshot`.

**Step 1: Write failing test**

Append to `test_snapshots.py`:

```python
def test_capture_stage_snapshot():
    """capture_stage_snapshot extracts source and stores snapshot."""
    conn = _mem_db()

    # Use a real function as the handler
    def _stage_greet(item_key, data, stage, job):
        return {"greeting": f"hello {item_key}"}

    import types
    mod = types.ModuleType("fake_handler")
    mod._stage_greet = _stage_greet
    _stage_greet.__module__ = mod.__name__
    _stage_greet.__qualname__ = "_stage_greet"
    import sys
    sys.modules["fake_handler"] = mod

    from jobs.lib.db.results import capture_stage_snapshot, stage_version_hash
    version = stage_version_hash(_stage_greet, "greet")
    capture_stage_snapshot(conn, "test_job", "greet", _stage_greet, version)

    snap = load_snapshot(conn, "test_job", "greet", version)
    assert snap is not None
    assert "_stage_greet" in snap["source_text"]

    del sys.modules["fake_handler"]
```

**Step 2: Run test, verify failure**

Run: `python -m pytest jobs/lib/db/tests/test_snapshots.py::test_capture_stage_snapshot -v`
Expected: ImportError — `capture_stage_snapshot` doesn't exist.

**Step 3: Implement `capture_stage_snapshot`**

Add to `jobs/lib/db/results.py`, after `load_snapshot`:

```python
def capture_stage_snapshot(conn: sqlite3.Connection, job_id: str, stage: str,
                           handler_func, version_hash: str) -> None:
    """Capture and store source code snapshot for current version hash.

    Extracts source via inspect.getsource() for the stage function (or handler),
    plus any VERSION_DEPS sources. Idempotent — skips if snapshot already exists.
    """
    module = inspect.getmodule(handler_func)
    stage_func = _find_stage_func(module, stage)
    target = stage_func or handler_func

    # Main source
    try:
        source_text = inspect.getsource(target)
    except (OSError, TypeError):
        source_text = f"# Could not extract source for {getattr(target, '__qualname__', str(target))}"

    # VERSION_DEPS sources
    deps_parts = []
    version_deps = getattr(module, "VERSION_DEPS", None) if module else None
    if version_deps and stage in version_deps:
        for dep in version_deps[stage]:
            if callable(dep):
                try:
                    deps_parts.append(inspect.getsource(dep))
                except (OSError, TypeError):
                    deps_parts.append(f"# {getattr(dep, '__qualname__', str(dep))}")
            elif isinstance(dep, str):
                deps_parts.append(dep)
    deps_text = "\n---dep-boundary---\n".join(deps_parts) if deps_parts else None

    store_snapshot(conn, job_id, stage, version_hash, source_text, deps_text)
```

**Step 4: Run tests, verify pass**

Run: `python -m pytest jobs/lib/db/tests/test_snapshots.py -v`
Expected: 4 passed.

**Step 5: Commit**

```bash
git add jobs/lib/db/results.py jobs/lib/db/tests/test_snapshots.py
git commit -m "feat(jobs): capture_stage_snapshot extracts and stores source"
```

---

### Task 3: LLM triage function

**Files:**
- Create: `jobs/lib/version_triage.py`
- Test: `jobs/lib/tests/test_version_triage.py`

**Step 1: Write failing test**

```python
# jobs/lib/tests/test_version_triage.py
"""Tests for LLM version triage."""
import pytest
from unittest.mock import AsyncMock, patch


@pytest.mark.asyncio
async def test_triage_cosmetic():
    """Cosmetic change returns cosmetic verdict."""
    from jobs.lib.version_triage import triage_version_change

    mock_response = {"verdict": "cosmetic", "reason": "only added debug logging"}
    with patch("jobs.lib.version_triage.call_llm_json", new_callable=AsyncMock, return_value=mock_response):
        result = await triage_version_change(
            job_id="vic_ideas", stage="extract",
            old_source="def foo(): pass",
            new_source="def foo():\n    log.debug('hi')\n    pass",
        )
    assert result.verdict == "cosmetic"
    assert "logging" in result.reason


@pytest.mark.asyncio
async def test_triage_semantic():
    """Semantic change returns semantic verdict."""
    from jobs.lib.version_triage import triage_version_change

    mock_response = {"verdict": "semantic", "reason": "changed scoring threshold from 0.5 to 0.7"}
    with patch("jobs.lib.version_triage.call_llm_json", new_callable=AsyncMock, return_value=mock_response):
        result = await triage_version_change(
            job_id="vic_ideas", stage="score",
            old_source="def score(): return x > 0.5",
            new_source="def score(): return x > 0.7",
        )
    assert result.verdict == "semantic"
    assert "threshold" in result.reason


@pytest.mark.asyncio
async def test_triage_llm_error_defaults_semantic():
    """If LLM call fails, default to semantic (safe side)."""
    from jobs.lib.version_triage import triage_version_change

    with patch("jobs.lib.version_triage.call_llm_json", new_callable=AsyncMock, side_effect=Exception("API down")):
        result = await triage_version_change(
            job_id="vic_ideas", stage="extract",
            old_source="def foo(): pass",
            new_source="def foo(): return 1",
        )
    assert result.verdict == "semantic"
    assert "error" in result.reason.lower() or "failed" in result.reason.lower()
```

**Step 2: Run tests, verify failure**

Run: `python -m pytest jobs/lib/tests/test_version_triage.py -v`
Expected: ModuleNotFoundError — `jobs.lib.version_triage` doesn't exist.

**Step 3: Implement `jobs/lib/version_triage.py`**

```python
"""LLM triage of version changes — cosmetic vs semantic."""

from __future__ import annotations

from dataclasses import dataclass

from loguru import logger

from lib.llm.json import call_llm_json

TRIAGE_PROMPT = """\
You are reviewing a code change to a data processing stage.
The stage "{stage}" in job "{job_id}" has changed.

OLD SOURCE:
{old_source}

NEW SOURCE:
{new_source}

Would this change affect the output data produced by this function?

- "cosmetic" = only logging, comments, formatting, variable renames, type hints, \
error messages, or other changes that don't affect the returned data
- "semantic" = prompt changes, logic changes, model changes, new/removed fields, \
different API calls, changed thresholds — anything that could produce different output

Answer with a JSON object:
{{"verdict": "cosmetic" | "semantic", "reason": "one sentence explanation"}}"""


@dataclass
class TriageResult:
    verdict: str  # "cosmetic" | "semantic"
    reason: str


async def triage_version_change(
    *,
    job_id: str,
    stage: str,
    old_source: str,
    new_source: str,
    old_deps: str | None = None,
    new_deps: str | None = None,
) -> TriageResult:
    """Ask Opus to classify a code change as cosmetic or semantic.

    On LLM failure, defaults to "semantic" (safe side — user must decide).
    """
    # Include deps diff if present
    full_old = old_source
    full_new = new_source
    if old_deps or new_deps:
        full_old += f"\n\n# VERSION_DEPS:\n{old_deps or '(none)'}"
        full_new += f"\n\n# VERSION_DEPS:\n{new_deps or '(none)'}"

    prompt = TRIAGE_PROMPT.format(
        job_id=job_id, stage=stage,
        old_source=full_old, new_source=full_new,
    )

    try:
        result = await call_llm_json(
            "opus", prompt,
            temperature=0,
            reasoning_effort="high",
        )
        verdict = result.get("verdict", "semantic")
        if verdict not in ("cosmetic", "semantic"):
            verdict = "semantic"
        return TriageResult(verdict=verdict, reason=result.get("reason", "no reason given"))
    except Exception as e:
        logger.warning(f"Version triage failed for {job_id}/{stage}: {e}")
        return TriageResult(verdict="semantic", reason=f"Triage failed: {e}")
```

**Step 4: Run tests, verify pass**

Run: `python -m pytest jobs/lib/tests/test_version_triage.py -v`
Expected: 3 passed.

**Step 5: Commit**

```bash
git add jobs/lib/version_triage.py jobs/lib/tests/test_version_triage.py
git commit -m "feat(jobs): LLM triage for version changes (cosmetic vs semantic)"
```

---

### Task 4: Runner integration — triage at stage worker startup

**Files:**
- Modify: `jobs/runner.py` (~line 467, after version hash computation)

**Step 1: Implement triage integration**

In `jobs/runner.py`, after line 467 (`version = stage_version_hash(handler, stage)`), add the triage block. This runs once at stage worker startup, not per item.

```python
    # --- Version triage: snapshot + auto-bless cosmetic changes ---
    from jobs.lib.db.results import capture_stage_snapshot, load_snapshot, get_stale_count
    from jobs.lib.db import open_db as _open_db

    with _open_db() as _conn:
        # Store snapshot for current version
        capture_stage_snapshot(_conn, job.id, stage, handler, version)

        # Check if there are stale items with a different hash
        stale = get_stale_count(_conn, job.id, stage, version)
        if stale > 0:
            # Find the most common old hash to diff against
            old_row = _conn.execute(
                "SELECT handler_version, COUNT(*) as cnt FROM results "
                "WHERE job_id = ? AND stage = ? "
                "AND handler_version IS NOT NULL AND handler_version != ? "
                "AND handler_version NOT IN ("
                "  SELECT old_hash FROM version_equivalences "
                "  WHERE job_id = ? AND stage = ? AND new_hash = ?"
                ") GROUP BY handler_version ORDER BY cnt DESC LIMIT 1",
                (job.id, stage, version, job.id, stage, version),
            ).fetchone()

            if old_row:
                old_hash = old_row["handler_version"]
                old_snap = load_snapshot(_conn, job.id, stage, old_hash)
                new_snap = load_snapshot(_conn, job.id, stage, version)

                if old_snap and new_snap:
                    from jobs.lib.version_triage import triage_version_change

                    logger.info(f"Triaging version change: {old_hash[:8]} -> {version[:8]} ({stale} stale items)")
                    triage = await triage_version_change(
                        job_id=job.id, stage=stage,
                        old_source=old_snap["source_text"],
                        new_source=new_snap["source_text"],
                        old_deps=old_snap["deps_text"],
                        new_deps=new_snap["deps_text"],
                    )

                    if triage.verdict == "cosmetic":
                        # Auto-bless: record equivalence for all stale hashes
                        stale_hashes = _conn.execute(
                            "SELECT DISTINCT handler_version FROM results "
                            "WHERE job_id = ? AND stage = ? "
                            "AND handler_version IS NOT NULL AND handler_version != ? "
                            "AND handler_version NOT IN ("
                            "  SELECT old_hash FROM version_equivalences "
                            "  WHERE job_id = ? AND stage = ? AND new_hash = ?"
                            ")",
                            (job.id, stage, version, job.id, stage, version),
                        ).fetchall()
                        for row in stale_hashes:
                            _conn.execute(
                                "INSERT OR IGNORE INTO version_equivalences "
                                "(job_id, stage, old_hash, new_hash, reason) VALUES (?, ?, ?, ?, ?)",
                                (job.id, stage, row["handler_version"], version,
                                 f"llm_auto: {triage.reason}"),
                            )
                        _conn.commit()
                        logger.info(f"Auto-blessed {stage}: {triage.reason}")
                    else:
                        # Semantic change — notify user
                        logger.warning(f"Semantic change in {job.id}/{stage}: {triage.reason} ({stale} stale items)")
                        try:
                            from lib.notify import push
                            push(
                                title=f"jobs: {job.id}/{stage} changed (semantic)",
                                body=f"{stale} stale items. {triage.reason}",
                            )
                        except Exception as e:
                            logger.warning(f"Pushover notification failed: {e}")
                else:
                    if not old_snap:
                        logger.debug(f"No old snapshot for {old_hash[:8]} — cannot triage")
    # --- End version triage ---
```

**Step 2: Test manually**

Run: `inv jobs.test -j vic_ideas --stage extract --dry-run`

Check logs for either:
- "Auto-blessed extract: ..." (cosmetic)
- "Semantic change in vic_ideas/extract: ..." (semantic)
- "No old snapshot for ..." (first run — no prior snapshot to diff against)

On first run with no prior snapshots, nothing happens (snapshot stored for future use). After a code change, the next startup will triage.

**Step 3: Commit**

```bash
git add jobs/runner.py
git commit -m "feat(jobs): triage version changes at stage worker startup"
```

---

### Task 5: `jobctl stale` CLI command

**Files:**
- Modify: `jobs/ctl.py` (add `stale` subcommand)

**Step 1: Implement `jobctl stale`**

Add before the `bless` command in `jobs/ctl.py`:

```python
@cli.command()
def stale():
    """Show stages with unresolved version changes.

    Lists stages where the handler code changed and items may need
    reprocessing. Auto-blessed cosmetic changes shown separately.

    \b
    Examples:
        jobctl stale
    """
    from jobs.lib.db.results import stage_version_hash, get_stale_count
    from jobs.lib.executor import resolve_handler

    jobs_map = load_jobs(Path(__file__).parent / "jobs.yaml")

    pending_rows = []
    blessed_rows = []

    with closing(open_raw_db()) as conn:
        for job_id, job in sorted(jobs_map.items()):
            handler_func = resolve_handler(job.handler)
            for stage_name in job.stages:
                current_hash = stage_version_hash(handler_func, stage_name)
                stale_count = get_stale_count(conn, job_id, stage_name, current_hash)

                if stale_count > 0:
                    pending_rows.append((job_id, stage_name, stale_count, current_hash))

                # Check recent auto-blessings (last 24h)
                blessed = conn.execute(
                    "SELECT reason, created_at FROM version_equivalences "
                    "WHERE job_id = ? AND stage = ? AND new_hash = ? "
                    "AND reason LIKE 'llm_auto:%' "
                    "AND created_at > datetime('now', '-1 day') "
                    "ORDER BY created_at DESC LIMIT 1",
                    (job_id, stage_name, current_hash),
                ).fetchone()
                if blessed:
                    blessed_rows.append((job_id, stage_name, blessed["reason"], blessed["created_at"]))

    if pending_rows:
        click.echo("Pending decisions:\n")
        click.echo(f"  {'Job':<30} {'Stage':<20} {'Stale Items':>12}  Hash")
        click.echo(f"  {'---':<30} {'-----':<20} {'-----------':>12}  ----")
        for job_id, stage_name, count, h in pending_rows:
            click.echo(f"  {job_id:<30} {stage_name:<20} {count:>12}  {h[:8]}")
        click.echo(f"\nUse 'jobctl bless JOB STAGE' to mark as equivalent")
        click.echo(f"Use 'jobctl reset JOB STAGE' to reprocess stale items")
    else:
        click.echo("No pending version changes.")

    if blessed_rows:
        click.echo(f"\nAuto-blessed (last 24h):\n")
        click.echo(f"  {'Job':<30} {'Stage':<20} Reason")
        click.echo(f"  {'---':<30} {'-----':<20} ------")
        for job_id, stage_name, reason, ts in blessed_rows:
            # Strip "llm_auto: " prefix for display
            display_reason = reason.replace("llm_auto: ", "", 1) if reason else ""
            click.echo(f"  {job_id:<30} {stage_name:<20} {display_reason}")
```

**Step 2: Test manually**

Run: `jobctl stale`
Expected: Either "No pending version changes." or a table of stale stages.

**Step 3: Commit**

```bash
git add jobs/ctl.py
git commit -m "feat(jobs): jobctl stale shows unresolved version changes"
```

---

### Task 6: Update design doc status

**Files:**
- Modify: `docs/plans/2026-03-05-stale-by-default-versioning.md` (change Status from Design to Implemented)

**Step 1: Update status**

Change line 4 from `Status: Design` to `Status: Implemented`

**Step 2: Commit**

```bash
git add docs/plans/2026-03-05-stale-by-default-versioning.md
git commit -m "docs: mark stale-by-default versioning as implemented"
```
