> **Note (2026-03-24):** intel/learning/ideas/semnet UIs consolidated into `kb.localhost` (port 7840). Old standalone URLs (intel.localhost, learning.localhost, ideas.localhost, semnet.localhost) are retired.

# Learning Eval System — Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Add data integrity (changelog, no-delete, YAML export), eval annotations table, Tier 1 eval pipeline, and a question-driven eval dashboard to the learning system.

**Architecture:** Two new tables in learning.db (principle_changelog, eval_annotations) populated by triggers and the eval pipeline. Evolve retroactive_study.py to write structured annotations. Add eval dashboard tab to existing Gradio app.

**Tech Stack:** SQLite, Python, Gradio 6, click CLI, loguru, lib.llm

**Design doc:** `docs/plans/2026-02-24-learning-eval-design.md`

---

### Task 1: Schema — changelog table and triggers

**Files:**
- Modify: `learning/schema/init.sql` (append after line 288)
- Test: `learning/schema/tests/test_changelog.py` (create)

**Step 1: Write the failing test**

Create `learning/schema/tests/__init__.py` (empty) and `learning/schema/tests/test_changelog.py`:

```python
"""Tests for principle_changelog triggers."""

import sqlite3
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))

from learning.schema.learning_store import (
    LearningStore,
    LearningType,
    Principle,
    PrincipleStatus,
    new_id,
)


def _mem_store() -> LearningStore:
    """Create an in-memory learning store for testing."""
    store = LearningStore(":memory:")
    return store


def test_changelog_created_on_insert():
    """INSERT into principles should log a 'created' changelog entry."""
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test Principle", text="Do the thing.",
        learning_type=LearningType.PRINCIPLE,
    ))
    with store._conn() as conn:
        rows = conn.execute(
            "SELECT * FROM principle_changelog WHERE principle_id = ?", (pid,)
        ).fetchall()
    assert len(rows) == 1
    assert rows[0]["action"] == "created"
    assert rows[0]["new_value"] == "Do the thing."


def test_changelog_tracks_text_update():
    """UPDATE principles.text should log an 'updated' entry with old and new values."""
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Version 1",
        learning_type=LearningType.PRINCIPLE,
    ))
    # Update via upsert
    store.upsert_principle(Principle(
        id=pid, name="Test", text="Version 2",
        learning_type=LearningType.PRINCIPLE,
    ))
    with store._conn() as conn:
        rows = conn.execute(
            "SELECT * FROM principle_changelog WHERE principle_id = ? ORDER BY id",
            (pid,),
        ).fetchall()
    # Should have: created + updated(text)
    assert len(rows) >= 2
    updates = [r for r in rows if r["action"] == "updated"]
    assert any(r["field_changed"] == "text" for r in updates)
    text_update = next(r for r in updates if r["field_changed"] == "text")
    assert text_update["old_value"] == "Version 1"
    assert text_update["new_value"] == "Version 2"


def test_changelog_tracks_status_change():
    """UPDATE principles.status should log an 'updated' entry."""
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Do it.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.PROPOSED,
    ))
    with store._conn() as conn:
        conn.execute(
            "UPDATE principles SET status = 'active', updated_at = CURRENT_TIMESTAMP WHERE id = ?",
            (pid,),
        )
    with store._conn() as conn:
        rows = conn.execute(
            "SELECT * FROM principle_changelog WHERE principle_id = ? AND field_changed = 'status'",
            (pid,),
        ).fetchall()
    assert len(rows) == 1
    assert rows[0]["old_value"] == "proposed"
    assert rows[0]["new_value"] == "active"


def test_changelog_no_entry_on_noop_update():
    """UPDATE that doesn't change a tracked field should not log."""
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Same",
        learning_type=LearningType.PRINCIPLE,
    ))
    # Update with same text
    store.upsert_principle(Principle(
        id=pid, name="Test", text="Same",
        learning_type=LearningType.PRINCIPLE,
    ))
    with store._conn() as conn:
        rows = conn.execute(
            "SELECT * FROM principle_changelog WHERE principle_id = ? AND action = 'updated'",
            (pid,),
        ).fetchall()
    assert len(rows) == 0
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/schema/tests/test_changelog.py -v`
Expected: FAIL — `principle_changelog` table does not exist

**Step 3: Add changelog table and triggers to init.sql**

Append to `learning/schema/init.sql` after the `unlinked_instances` view (after line 288):

```sql
-- ─────────────────────────────────────────────────────────────────────────────
-- Principle changelog (append-only audit trail)
-- ─────────────────────────────────────────────────────────────────────────────

CREATE TABLE IF NOT EXISTS principle_changelog (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    principle_id TEXT NOT NULL,
    changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    action TEXT NOT NULL,      -- created | updated | deprecated | rejected | restored
    field_changed TEXT,        -- null for 'created'; specific field for 'updated'
    old_value TEXT,
    new_value TEXT,
    reason TEXT,
    changed_by TEXT            -- 'claude-code' | 'manual' | session_id
);

CREATE INDEX IF NOT EXISTS idx_changelog_principle ON principle_changelog(principle_id);
CREATE INDEX IF NOT EXISTS idx_changelog_action ON principle_changelog(action);

-- Log 'created' on INSERT
CREATE TRIGGER IF NOT EXISTS changelog_insert AFTER INSERT ON principles
BEGIN
    INSERT INTO principle_changelog (principle_id, action, new_value, changed_by)
    VALUES (new.id, 'created', new.text, new.proposed_by);
END;

-- Log per-field diffs on UPDATE (only when value actually changes)
CREATE TRIGGER IF NOT EXISTS changelog_update_text AFTER UPDATE OF text ON principles
WHEN old.text IS NOT new.text
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'text', old.text, new.text);
END;

CREATE TRIGGER IF NOT EXISTS changelog_update_name AFTER UPDATE OF name ON principles
WHEN old.name IS NOT new.name
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'name', old.name, new.name);
END;

CREATE TRIGGER IF NOT EXISTS changelog_update_status AFTER UPDATE OF status ON principles
WHEN old.status IS NOT new.status
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'status', old.status, new.status);
END;

CREATE TRIGGER IF NOT EXISTS changelog_update_rationale AFTER UPDATE OF rationale ON principles
WHEN old.rationale IS NOT new.rationale
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'rationale', old.rationale, new.rationale);
END;

CREATE TRIGGER IF NOT EXISTS changelog_update_anti_pattern AFTER UPDATE OF anti_pattern ON principles
WHEN old.anti_pattern IS NOT new.anti_pattern
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'anti_pattern', old.anti_pattern, new.anti_pattern);
END;

CREATE TRIGGER IF NOT EXISTS changelog_update_full_text AFTER UPDATE OF full_text ON principles
WHEN old.full_text IS NOT new.full_text
BEGIN
    INSERT INTO principle_changelog (principle_id, action, field_changed, old_value, new_value)
    VALUES (new.id, 'updated', 'full_text', old.full_text, new.full_text);
END;
```

**Step 4: Run tests to verify they pass**

Run: `python -m pytest learning/schema/tests/test_changelog.py -v`
Expected: All 4 tests PASS

**Step 5: Commit**

```bash
git add learning/schema/init.sql learning/schema/tests/
git commit -m "feat(learning): principle changelog table with per-field triggers"
```

---

### Task 2: Schema — eval_annotations table

**Files:**
- Modify: `learning/schema/init.sql` (append after changelog section)
- Test: `learning/schema/tests/test_changelog.py` (add annotation test)

**Step 1: Write the failing test**

Add to `learning/schema/tests/test_changelog.py`:

```python
def test_eval_annotations_table_exists():
    """eval_annotations table should exist after init."""
    store = _mem_store()
    with store._conn() as conn:
        conn.execute("""
            INSERT INTO eval_annotations
                (principle_id, session_id, episode_index, annotation, annotator_model)
            VALUES ('test-principle', 'sess-123', 0, 'followed', 'flash')
        """)
        rows = conn.execute("SELECT * FROM eval_annotations").fetchall()
    assert len(rows) == 1
    assert rows[0]["annotation"] == "followed"
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/schema/tests/test_changelog.py::test_eval_annotations_table_exists -v`
Expected: FAIL — table does not exist

**Step 3: Add eval_annotations table to init.sql**

Append to `learning/schema/init.sql`:

```sql
-- ─────────────────────────────────────────────────────────────────────────────
-- Eval annotations (Tier 1: retroactive annotation results)
-- ─────────────────────────────────────────────────────────────────────────────

CREATE TABLE IF NOT EXISTS eval_annotations (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    principle_id TEXT NOT NULL REFERENCES principles(id),
    session_id TEXT,
    episode_index INTEGER,
    episode_text TEXT,
    annotation TEXT NOT NULL,   -- followed | violated | not_applicable
    confidence REAL,
    annotator_model TEXT,
    annotated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata TEXT               -- JSON: refinement suggestions, new_patterns
);

CREATE INDEX IF NOT EXISTS idx_eval_principle ON eval_annotations(principle_id);
CREATE INDEX IF NOT EXISTS idx_eval_session ON eval_annotations(session_id);
CREATE INDEX IF NOT EXISTS idx_eval_annotation ON eval_annotations(annotation);
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/schema/tests/test_changelog.py -v`
Expected: All 5 tests PASS

**Step 5: Commit**

```bash
git add learning/schema/init.sql learning/schema/tests/test_changelog.py
git commit -m "feat(learning): eval_annotations table for Tier 1 retroactive results"
```

---

### Task 3: LearningStore — deprecate_principle and add_eval_annotation

**Files:**
- Modify: `learning/schema/learning_store.py` (add methods after line ~570)
- Test: `learning/schema/tests/test_store_methods.py` (create)

**Step 1: Write the failing tests**

Create `learning/schema/tests/test_store_methods.py`:

```python
"""Tests for new LearningStore methods: deprecate_principle, add_eval_annotation."""

import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))

from learning.schema.learning_store import (
    LearningStore,
    LearningType,
    Principle,
    PrincipleStatus,
    new_id,
)


def _mem_store() -> LearningStore:
    return LearningStore(":memory:")


# ── deprecate_principle ──────────────────────────────────────

def test_deprecate_sets_status():
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Old Way", text="Do it old.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
    ))
    store.deprecate_principle(pid, reason="superseded by new approach")
    p = store.get_principle(pid)
    assert p.status == PrincipleStatus.DEPRECATED


def test_deprecate_records_reason():
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Old Way", text="Do it old.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
    ))
    store.deprecate_principle(pid, reason="too vague")
    with store._conn() as conn:
        row = conn.execute("SELECT deprecated_reason FROM principles WHERE id = ?", (pid,)).fetchone()
    assert row["deprecated_reason"] == "too vague"


def test_deprecate_sets_superseded_by():
    store = _mem_store()
    old_id = new_id()
    new_pid = new_id()
    store.add_principle(Principle(
        id=old_id, name="Old", text="Old way.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
    ))
    store.add_principle(Principle(
        id=new_pid, name="New", text="New way.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
    ))
    store.deprecate_principle(old_id, reason="replaced", superseded_by=new_pid)
    p = store.get_principle(old_id)
    with store._conn() as conn:
        row = conn.execute("SELECT superseded_by FROM principles WHERE id = ?", (old_id,)).fetchone()
    assert row["superseded_by"] == new_pid


def test_deprecate_logs_changelog():
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Do it.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
    ))
    store.deprecate_principle(pid, reason="harmful")
    with store._conn() as conn:
        rows = conn.execute(
            "SELECT * FROM principle_changelog WHERE principle_id = ? AND field_changed = 'status'",
            (pid,),
        ).fetchall()
    assert len(rows) == 1
    assert rows[0]["old_value"] == "active"
    assert rows[0]["new_value"] == "deprecated"


def test_deprecate_nonexistent_raises():
    store = _mem_store()
    try:
        store.deprecate_principle("nonexistent", reason="test")
        assert False, "Should have raised"
    except ValueError:
        pass


# ── add_eval_annotation ──────────────────────────────────────

def test_add_eval_annotation():
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Do it.",
        learning_type=LearningType.PRINCIPLE,
    ))
    ann_id = store.add_eval_annotation(
        principle_id=pid,
        session_id="sess-abc",
        episode_index=3,
        episode_text="User asked to read a file...",
        annotation="followed",
        confidence=0.9,
        annotator_model="flash",
    )
    assert ann_id > 0
    with store._conn() as conn:
        row = conn.execute("SELECT * FROM eval_annotations WHERE id = ?", (ann_id,)).fetchone()
    assert row["annotation"] == "followed"
    assert row["confidence"] == 0.9


def test_list_eval_annotations_for_principle():
    store = _mem_store()
    pid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test", text="Do it.",
        learning_type=LearningType.PRINCIPLE,
    ))
    store.add_eval_annotation(principle_id=pid, annotation="followed", annotator_model="flash")
    store.add_eval_annotation(principle_id=pid, annotation="violated", annotator_model="flash")
    store.add_eval_annotation(principle_id=pid, annotation="not_applicable", annotator_model="flash")
    anns = store.list_eval_annotations(principle_id=pid)
    assert len(anns) == 3
    assert {a["annotation"] for a in anns} == {"followed", "violated", "not_applicable"}
```

**Step 2: Run tests to verify they fail**

Run: `python -m pytest learning/schema/tests/test_store_methods.py -v`
Expected: FAIL — `deprecate_principle` and `add_eval_annotation` not found

**Step 3: Add methods to learning_store.py**

Add after the `record_application` / `_row_to_application` section (~line 603):

```python
    # ─────────────────────────────────────────────────────────────────────────
    # Deprecation (never delete)
    # ─────────────────────────────────────────────────────────────────────────

    def deprecate_principle(
        self,
        principle_id: str,
        reason: str,
        superseded_by: str | None = None,
    ) -> None:
        """Deprecate a principle. Never delete — changelog trigger fires automatically."""
        p = self.get_principle(principle_id)
        if not p:
            raise ValueError(f"Principle not found: {principle_id}")
        with self._conn() as conn:
            conn.execute(
                """UPDATE principles
                   SET status = 'deprecated',
                       deprecated_reason = ?,
                       superseded_by = ?,
                       updated_at = CURRENT_TIMESTAMP
                   WHERE id = ?""",
                (reason, superseded_by, principle_id),
            )
        logger.info(f"Deprecated principle {principle_id}: {reason}")

    # ─────────────────────────────────────────────────────────────────────────
    # Eval Annotations (Tier 1)
    # ─────────────────────────────────────────────────────────────────────────

    def add_eval_annotation(
        self,
        principle_id: str,
        annotation: str,
        annotator_model: str,
        session_id: str | None = None,
        episode_index: int | None = None,
        episode_text: str | None = None,
        confidence: float | None = None,
        metadata: dict | None = None,
    ) -> int:
        """Record a Tier 1 eval annotation for a principle."""
        with self._conn() as conn:
            cursor = conn.execute(
                """INSERT INTO eval_annotations
                   (principle_id, session_id, episode_index, episode_text,
                    annotation, confidence, annotator_model, metadata)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    principle_id, session_id, episode_index, episode_text,
                    annotation, confidence, annotator_model,
                    json.dumps(metadata) if metadata else None,
                ),
            )
            return cursor.lastrowid

    def list_eval_annotations(
        self,
        principle_id: str | None = None,
        session_id: str | None = None,
        annotation: str | None = None,
        limit: int = 100,
    ) -> list[dict]:
        """List eval annotations with optional filters."""
        query = "SELECT * FROM eval_annotations WHERE 1=1"
        params: list = []
        if principle_id:
            query += " AND principle_id = ?"
            params.append(principle_id)
        if session_id:
            query += " AND session_id = ?"
            params.append(session_id)
        if annotation:
            query += " AND annotation = ?"
            params.append(annotation)
        query += " ORDER BY annotated_at DESC LIMIT ?"
        params.append(limit)
        with self._conn() as conn:
            rows = conn.execute(query, params).fetchall()
            return [dict(r) for r in rows]
```

**Step 4: Run tests to verify they pass**

Run: `python -m pytest learning/schema/tests/test_store_methods.py -v`
Expected: All 8 tests PASS

**Step 5: Commit**

```bash
git add learning/schema/learning_store.py learning/schema/tests/test_store_methods.py
git commit -m "feat(learning): deprecate_principle and add_eval_annotation store methods"
```

---

### Task 4: Static check — no hard deletes

**Files:**
- Test: `learning/schema/tests/test_no_deletes.py` (create)

**Step 1: Write the test**

```python
"""Ensure no code path deletes from the principles table."""

import subprocess


def test_no_delete_from_principles():
    """Grep codebase for DELETE FROM principles — should find zero matches."""
    result = subprocess.run(
        ["rg", "-i", r"DELETE\s+FROM\s+principles", "learning/", "--type", "py", "-c"],
        capture_output=True, text=True,
        cwd=str(__import__("pathlib").Path(__file__).parent.parent.parent.parent),
    )
    # rg -c returns count per file; empty output means no matches
    assert result.stdout.strip() == "", (
        f"Found DELETE FROM principles in:\n{result.stdout}"
    )
```

**Step 2: Run test**

Run: `python -m pytest learning/schema/tests/test_no_deletes.py -v`
Expected: PASS (we verified no matches exist)

**Step 3: Commit**

```bash
git add learning/schema/tests/test_no_deletes.py
git commit -m "test(learning): static check — no DELETE FROM principles in codebase"
```

---

### Task 5: CLI — learn export (YAML export)

**Files:**
- Modify: `learning/cli.py` (add `export` command)
- Test: `learning/schema/tests/test_yaml_export.py` (create)

**Step 1: Write the failing test**

Create `learning/schema/tests/test_yaml_export.py`:

```python
"""Tests for YAML export and roundtrip."""

import sys
import tempfile
from pathlib import Path

import yaml

sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))

from learning.schema.learning_store import (
    LearningInstance,
    LearningStore,
    LearningType,
    LinkType,
    Principle,
    PrincipleStatus,
    SourceType,
    new_id,
)


def _mem_store() -> LearningStore:
    return LearningStore(":memory:")


def _populate_store(store: LearningStore) -> tuple[str, str]:
    """Add a principle with a linked instance. Returns (principle_id, instance_id)."""
    pid = "dev/test-principle"
    iid = new_id()
    store.add_principle(Principle(
        id=pid, name="Test Principle", text="Always test.",
        learning_type=LearningType.PRINCIPLE, status=PrincipleStatus.ACTIVE,
        rationale="Tests prevent regressions.", anti_pattern="Shipping without tests.",
    ))
    store.add_instance(LearningInstance(
        id=iid, content="Found a bug because of missing test.",
        source_type=SourceType.SESSION_REFLECTION,
    ))
    store.link_instance_to_principle(iid, pid, LinkType.SUPPORTS, strength=0.8)
    return pid, iid


def test_export_produces_valid_yaml():
    """Export should produce parseable YAML with principles."""
    store = _mem_store()
    pid, iid = _populate_store(store)
    from learning.export import export_principles_yaml
    data = export_principles_yaml(store)
    assert data["schema_version"] == 1
    assert len(data["principles"]) == 1
    p = data["principles"][0]
    assert p["id"] == pid
    assert p["name"] == "Test Principle"
    assert len(p["links"]) == 1
    assert p["links"][0]["instance_id"] == iid


def test_export_roundtrip_to_file():
    """Export to file and re-read — should be identical."""
    store = _mem_store()
    _populate_store(store)
    from learning.export import export_principles_yaml
    data = export_principles_yaml(store)
    with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w", delete=False) as f:
        yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
        path = f.name
    with open(path) as f:
        loaded = yaml.safe_load(f)
    assert loaded["principles"][0]["id"] == data["principles"][0]["id"]
    assert loaded["principles"][0]["text"] == data["principles"][0]["text"]
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/schema/tests/test_yaml_export.py -v`
Expected: FAIL — `learning.export` module does not exist

**Step 3: Create learning/export.py**

Create `learning/export.py`:

```python
"""YAML export/import for learning.db principles — disaster recovery layer."""

from datetime import datetime, timezone

from learning.schema.learning_store import LearningStore, PrincipleStatus


def export_principles_yaml(store: LearningStore) -> dict:
    """Export all principles with links to a dict suitable for YAML serialization."""
    principles = store.list_principles(limit=10000)
    result = []
    for p in principles:
        links = []
        with store._conn() as conn:
            rows = conn.execute(
                """SELECT instance_id, link_type, strength
                   FROM instance_principle_links WHERE principle_id = ?""",
                (p.id,),
            ).fetchall()
            for r in rows:
                links.append({
                    "instance_id": r["instance_id"],
                    "link_type": r["link_type"],
                    "strength": r["strength"],
                })
        result.append({
            "id": p.id,
            "name": p.name,
            "text": p.text,
            "full_text": p.full_text,
            "rationale": p.rationale,
            "anti_pattern": p.anti_pattern,
            "learning_type": p.learning_type.value,
            "status": p.status.value,
            "abstraction_level": p.abstraction_level,
            "domain_scope": p.domain_scope,
            "parent_id": p.parent_id,
            "instance_count": p.instance_count,
            "application_count": p.application_count,
            "success_rate": p.success_rate,
            "links": links,
        })
    return {
        "exported_at": datetime.now(timezone.utc).isoformat(),
        "schema_version": 1,
        "principle_count": len(result),
        "principles": result,
    }
```

**Step 4: Run tests to verify they pass**

Run: `python -m pytest learning/schema/tests/test_yaml_export.py -v`
Expected: All 2 tests PASS

**Step 5: Add `export` command to CLI**

Add to `learning/cli.py` (after the `embed` command, around line 1519):

```python
@cli.command("export")
@click.option("--output", "-o", default=None, help="Output path (default: learning/data/principles_export.yaml)")
@click.pass_context
def export_cmd(ctx, output):
    """Export principles + links to git-tracked YAML."""
    import yaml
    from learning.export import export_principles_yaml

    store = ctx.obj["store"]
    data = export_principles_yaml(store)

    if output is None:
        output = str(Path(__file__).parent / "data" / "principles_export.yaml")

    with open(output, "w") as f:
        yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)

    click.echo(f"Exported {data['principle_count']} principles to {output}")
```

**Step 6: Run export manually to verify**

Run: `python -m learning.cli export`
Expected: prints "Exported 239 principles to learning/data/principles_export.yaml"

**Step 7: Commit**

```bash
git add learning/export.py learning/cli.py learning/schema/tests/test_yaml_export.py
git commit -m "feat(learning): YAML export for principles — disaster recovery layer"
```

---

### Task 6: Eval pipeline — evolve retroactive_study to write eval_annotations

**Files:**
- Modify: `learning/session_review/retroactive_study.py`
- Create: `learning/eval.py` (thin orchestrator)
- Test: `learning/tests/test_eval_pipeline.py` (create)

**Step 1: Write the failing test**

Create `learning/tests/test_eval_pipeline.py`:

```python
"""Tests for the Tier 1 eval pipeline."""

import json
import sys
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, patch

sys.path.insert(0, str(Path(__file__).parent.parent.parent))

from learning.schema.learning_store import (
    LearningStore,
    LearningType,
    Principle,
    PrincipleStatus,
    new_id,
)


def _mem_store() -> LearningStore:
    store = LearningStore(":memory:")
    # Add a test principle
    store.add_principle(Principle(
        id="dev/read-before-edit",
        name="Read Before Edit",
        text="Always read a file before editing it.",
        learning_type=LearningType.PRINCIPLE,
        status=PrincipleStatus.ACTIVE,
    ))
    return store


MOCK_JUDGE_RESPONSE = json.dumps({
    "annotations": [
        {
            "principle_id": "dev/read-before-edit",
            "annotation": "followed",
            "confidence": 0.85,
            "evidence": "Agent read the file before editing.",
        }
    ],
    "new_patterns": [],
    "refinements": [],
})


def test_annotate_episode_writes_to_db():
    """annotate_episode should write eval_annotations to the store."""
    import asyncio
    from learning.eval import annotate_episode

    store = _mem_store()
    episode_text = "[user] Edit config.py\n[assistant] Let me read config.py first..."

    with patch("learning.eval.call_llm", new_callable=AsyncMock, return_value=MOCK_JUDGE_RESPONSE):
        result = asyncio.run(annotate_episode(
            store=store,
            episode_text=episode_text,
            session_id="test-session",
            episode_index=0,
            principles_catalog="<p id=\"dev/read-before-edit\">Read Before Edit</p>",
            model="flash",
        ))

    assert result["annotations_written"] >= 1
    anns = store.list_eval_annotations(principle_id="dev/read-before-edit")
    assert len(anns) >= 1
    assert anns[0]["annotation"] == "followed"
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/tests/test_eval_pipeline.py -v`
Expected: FAIL — `learning.eval` does not exist

**Step 3: Create learning/eval.py**

```python
"""Tier 1 eval pipeline — retroactive annotation of session episodes against principles.

Thin orchestrator that:
1. Loads active principles from learning.db
2. Breaks sessions into episodes (via session_extract)
3. Annotates each (episode, principle) with LLM judge
4. Writes eval_annotations to learning.db

Usage:
    python -m learning.eval --days 30
    python -m learning.eval --days 7 --summary
"""

import asyncio
import json
import sys
from pathlib import Path

import click
from loguru import logger

sys.path.insert(0, str(Path(__file__).parent.parent))

from learning.schema.learning_store import LearningStore, PrincipleStatus
from lib.llm import call_llm

JUDGE_MODEL = "flash"

SYSTEM_PROMPT = """You are evaluating whether coding principles were followed or violated in a session episode.

Given a list of principles and a session episode (conversation between user and AI assistant),
identify which principles are relevant and whether they were followed or violated.

Respond with JSON:
{
  "annotations": [
    {
      "principle_id": "exact/principle-id",
      "annotation": "followed" | "violated" | "not_applicable",
      "confidence": 0.0-1.0,
      "evidence": "brief explanation"
    }
  ],
  "new_patterns": ["any new patterns not covered by existing principles"],
  "refinements": [{"principle_id": "...", "suggestion": "..."}]
}

Only include principles that are clearly relevant. Skip principles with no connection to the episode.
Focus on the 3-5 most relevant principles per episode."""


async def annotate_episode(
    *,
    store: LearningStore,
    episode_text: str,
    session_id: str,
    episode_index: int,
    principles_catalog: str,
    model: str = JUDGE_MODEL,
) -> dict:
    """Annotate a single episode against all principles. Returns summary dict."""
    prompt = f"""## Principles Catalog
{principles_catalog}

## Session Episode
{episode_text[:8000]}"""

    response = await call_llm(
        prompt=prompt,
        system=SYSTEM_PROMPT,
        model=model,
        temperature=0.0,
        response_format="json",
    )

    try:
        data = json.loads(response)
    except json.JSONDecodeError:
        logger.warning(f"Failed to parse judge response for episode {episode_index}")
        return {"annotations_written": 0, "error": "json_parse_failed"}

    written = 0
    for ann in data.get("annotations", []):
        pid = ann.get("principle_id", "")
        annotation = ann.get("annotation", "")
        if annotation not in ("followed", "violated", "not_applicable"):
            continue
        store.add_eval_annotation(
            principle_id=pid,
            session_id=session_id,
            episode_index=episode_index,
            episode_text=episode_text[:2000],
            annotation=annotation,
            confidence=ann.get("confidence"),
            annotator_model=model,
            metadata={"evidence": ann.get("evidence", "")},
        )
        written += 1

    return {
        "annotations_written": written,
        "new_patterns": data.get("new_patterns", []),
        "refinements": data.get("refinements", []),
    }
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/tests/test_eval_pipeline.py -v`
Expected: PASS

**Step 5: Commit**

```bash
git add learning/eval.py learning/tests/test_eval_pipeline.py
git commit -m "feat(learning): Tier 1 eval pipeline — annotate episodes against principles"
```

---

### Task 7: CLI — learn eval command

**Files:**
- Modify: `learning/cli.py` (add `eval` command)

**Step 1: Add eval command to CLI**

Add to `learning/cli.py`:

```python
@cli.command("eval")
@click.option("--days", default=30, help="Look back N days for sessions")
@click.option("--sessions", default=30, help="Max sessions to process")
@click.option("--model", default="flash", help="Judge model")
@click.option("--summary", is_flag=True, help="Print summary stats only")
@click.option("--dry-run", is_flag=True, help="Show what would be processed without running")
@click.pass_context
def eval_cmd(ctx, days, sessions, model, summary, dry_run):
    """Run Tier 1 eval — retroactive annotation on recent sessions."""
    import asyncio
    from learning.eval import annotate_episode
    from learning.session_extract.turns import parse_turns
    from learning.session_extract.episodes import chunk_fixed_window
    from learning.session_review.retroactive_study import load_principles_catalog
    from learning.session_review.failure_mining import find_transcripts

    store = ctx.obj["store"]

    # Load principles
    principles_list, catalog_text = load_principles_catalog(store)
    click.echo(f"Loaded {len(principles_list)} active principles")

    # Find sessions
    transcripts = find_transcripts(days=days)[:sessions]
    click.echo(f"Found {len(transcripts)} sessions from last {days} days")

    if dry_run:
        for t in transcripts:
            click.echo(f"  {t.name}")
        return

    # Process
    total_annotations = 0
    total_episodes = 0

    async def process_all():
        nonlocal total_annotations, total_episodes
        for transcript_path in transcripts:
            try:
                turns = parse_turns(transcript_path)
                episodes = chunk_fixed_window(turns, window_size=8)
                session_id = transcript_path.stem
                for i, ep in enumerate(episodes):
                    if ep.triage_tag == "routine":
                        continue
                    result = await annotate_episode(
                        store=store,
                        episode_text=ep.text,
                        session_id=session_id,
                        episode_index=i,
                        principles_catalog=catalog_text,
                        model=model,
                    )
                    total_annotations += result["annotations_written"]
                    total_episodes += 1
            except Exception as e:
                logger.warning(f"Failed processing {transcript_path.name}: {e}")

    asyncio.run(process_all())
    click.echo(f"\nDone: {total_episodes} episodes, {total_annotations} annotations written")

    if summary:
        with store._conn() as conn:
            counts = dict(conn.execute(
                "SELECT annotation, COUNT(*) FROM eval_annotations GROUP BY annotation"
            ).fetchall())
        click.echo(f"  followed: {counts.get('followed', 0)}")
        click.echo(f"  violated: {counts.get('violated', 0)}")
        click.echo(f"  not_applicable: {counts.get('not_applicable', 0)}")
```

**Step 2: Test manually**

Run: `python -m learning.cli eval --dry-run --days 7`
Expected: Lists recent session files

**Step 3: Commit**

```bash
git add learning/cli.py
git commit -m "feat(learning): learn eval CLI — run Tier 1 retroactive annotations"
```

---

### Task 8: UI — eval dashboard tab

**Files:**
- Modify: `learning/schema/app.py` (add Eval tab)

This is the largest task. The tab answers six questions from the design doc. Read the existing app.py structure (4 tabs: Principles, Triage, Failures, Activity) and add a 5th "Eval" tab.

**Step 1: Read the existing app.py tab structure**

Check how tabs are created (look for `gr.Tab` or `gr.Tabs` patterns). The new tab goes after the existing 4.

**Step 2: Add data query functions**

Add these query functions to app.py (near the top, with other query functions):

```python
def _eval_health_stats() -> dict:
    """Landing stats: learning health overview."""
    with _db() as conn:
        total_principles = conn.execute("SELECT COUNT(*) FROM principles WHERE status = 'active'").fetchone()[0]
        total_annotations = conn.execute("SELECT COUNT(*) FROM eval_annotations").fetchone()[0]
        followed = conn.execute("SELECT COUNT(*) FROM eval_annotations WHERE annotation = 'followed'").fetchone()[0]
        violated = conn.execute("SELECT COUNT(*) FROM eval_annotations WHERE annotation = 'violated'").fetchone()[0]
        zero_evidence = conn.execute("""
            SELECT COUNT(*) FROM principles
            WHERE status = 'active' AND application_count = 0
        """).fetchone()[0]
    return {
        "total_principles": total_principles,
        "total_annotations": total_annotations,
        "followed": followed,
        "violated": violated,
        "zero_evidence": zero_evidence,
    }


def _principle_effectiveness_df() -> "pd.DataFrame":
    """Principles ranked by effectiveness (application_count × success_rate)."""
    with _db() as conn:
        rows = conn.execute("""
            SELECT p.id, p.name, p.application_count, p.success_rate,
                   p.instance_count, p.status,
                   COUNT(CASE WHEN ea.annotation = 'followed' THEN 1 END) as followed_count,
                   COUNT(CASE WHEN ea.annotation = 'violated' THEN 1 END) as violated_count
            FROM principles p
            LEFT JOIN eval_annotations ea ON p.id = ea.principle_id
            WHERE p.status = 'active'
            GROUP BY p.id
            ORDER BY p.application_count DESC, p.success_rate DESC
        """).fetchall()
    return pd.DataFrame([dict(r) for r in rows])


def _merge_candidates_df() -> "pd.DataFrame":
    """Principles with high semantic similarity — merge candidates."""
    # Uses embeddings if available, falls back to name similarity
    with _db() as conn:
        try:
            rows = conn.execute("""
                SELECT id, name, text, embedding FROM principles
                WHERE status = 'active' AND embedding IS NOT NULL
            """).fetchall()
        except Exception:
            return pd.DataFrame()

    if len(rows) < 2:
        return pd.DataFrame()

    import numpy as np
    pairs = []
    items = [(r["id"], r["name"], r["text"], json.loads(r["embedding"])) for r in rows]
    for i in range(len(items)):
        for j in range(i + 1, len(items)):
            a, b = np.array(items[i][3]), np.array(items[j][3])
            sim = float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
            if sim > 0.85:
                pairs.append({
                    "principle_a": items[i][1],
                    "principle_b": items[j][1],
                    "similarity": round(sim, 3),
                    "id_a": items[i][0],
                    "id_b": items[j][0],
                })
    return pd.DataFrame(pairs).sort_values("similarity", ascending=False) if pairs else pd.DataFrame()


def _harmful_principles_df() -> "pd.DataFrame":
    """Principles with high application count but low success rate — potentially harmful."""
    with _db() as conn:
        rows = conn.execute("""
            SELECT id, name, application_count, success_rate,
                   instance_count
            FROM principles
            WHERE status = 'active'
              AND application_count >= 2
              AND (success_rate IS NOT NULL AND success_rate < 0.5)
            ORDER BY success_rate ASC, application_count DESC
        """).fetchall()
    return pd.DataFrame([dict(r) for r in rows])
```

**Step 3: Add the Eval tab to the Gradio layout**

Add inside the `gr.Tabs()` block, after the last existing tab:

```python
with gr.Tab("Eval"):
    # Q1 + Q2: Health overview
    eval_health_md = gr.Markdown("Loading...")

    with gr.Row():
        eval_refresh_btn = gr.Button("Refresh", scale=0)

    # Q3: Ranked principles
    gr.Markdown("### Most → Least Useful Principles")
    effectiveness_table = gr.Dataframe(
        headers=["name", "application_count", "success_rate", "followed_count", "violated_count", "instance_count"],
        label="Principle Effectiveness",
        interactive=False,
    )

    # Q4: Merge candidates
    gr.Markdown("### Merge Candidates (similarity > 0.85)")
    merge_table = gr.Dataframe(
        headers=["principle_a", "principle_b", "similarity"],
        label="Similar Principles",
        interactive=False,
    )

    # Q6: Harmful
    gr.Markdown("### Overgeneralizing? (applied but failing)")
    harmful_table = gr.Dataframe(
        headers=["name", "application_count", "success_rate"],
        label="Potentially Harmful Principles",
        interactive=False,
    )

    def refresh_eval():
        stats = _eval_health_stats()
        health_md = f"""### Learning Health
- **{stats['total_principles']}** active principles
- **{stats['total_annotations']}** eval annotations ({stats['followed']} followed, {stats['violated']} violated)
- **{stats['zero_evidence']}** principles with zero evidence

**Prevention signal**: {stats['violated']} episodes where a principle would have helped
"""
        eff_df = _principle_effectiveness_df()
        merge_df = _merge_candidates_df()
        harm_df = _harmful_principles_df()
        return health_md, eff_df, merge_df, harm_df

    eval_refresh_btn.click(
        refresh_eval,
        outputs=[eval_health_md, effectiveness_table, merge_table, harmful_table],
    )
    # Auto-load on tab select
    demo.load(refresh_eval, outputs=[eval_health_md, effectiveness_table, merge_table, harmful_table])
```

**Step 4: Verify the app loads**

Run: `GRADIO_SERVER_PORT=7850 python learning/schema/app.py`
Check: Navigate to learning.localhost:7850, click Eval tab, verify it loads without errors.

**Step 5: Commit**

```bash
git add learning/schema/app.py
git commit -m "feat(learning): eval dashboard tab — question-driven principle health UI"
```

---

### Task 9: Docker component tests

**Files:**
- Create: `learning/session_review/tests/test_docker_sandbox.py`

These tests verify the Docker infrastructure works. Tagged `@pytest.mark.docker` — skipped by default.

**Step 1: Write the tests**

```python
"""Component tests for Docker sandbox infrastructure.

Run explicitly: python -m pytest learning/session_review/tests/test_docker_sandbox.py -v -m docker
"""

import subprocess
import sys
from pathlib import Path

import pytest

sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))

RIVUS_ROOT = Path(__file__).parent.parent.parent.parent
DOCKERFILE = RIVUS_ROOT / "learning" / "session_review" / "Dockerfile.replay"


@pytest.mark.docker
def test_dockerfile_exists():
    """Dockerfile.replay must exist."""
    assert DOCKERFILE.exists(), f"Missing {DOCKERFILE}"


@pytest.mark.docker
def test_docker_image_builds():
    """Docker image claude-replay should build successfully."""
    result = subprocess.run(
        ["docker", "build", "-f", str(DOCKERFILE), "-t", "claude-replay-test", "."],
        capture_output=True, text=True, cwd=str(RIVUS_ROOT),
        timeout=300,
    )
    assert result.returncode == 0, f"Build failed:\n{result.stderr[-2000:]}"


@pytest.mark.docker
def test_docker_container_runs():
    """Container should start and run a basic command."""
    result = subprocess.run(
        ["docker", "run", "--rm", "claude-replay-test", "echo", "hello"],
        capture_output=True, text=True, timeout=30,
    )
    assert result.returncode == 0
    assert "hello" in result.stdout


@pytest.mark.docker
def test_docker_repo_mount():
    """Container should be able to clone the mounted repo."""
    result = subprocess.run(
        [
            "docker", "run", "--rm",
            "-v", f"{RIVUS_ROOT}:/repo.git:ro",
            "claude-replay-test",
            "bash", "-c", "git clone /repo.git /workspace/test && ls /workspace/test/CLAUDE.md",
        ],
        capture_output=True, text=True, timeout=60,
    )
    assert result.returncode == 0, f"Clone failed:\n{result.stderr[-1000:]}"
    assert "CLAUDE.md" in result.stdout
```

**Step 2: Register the docker marker**

Add to `learning/session_review/tests/conftest.py` (create if needed):

```python
import pytest

def pytest_configure(config):
    config.addinivalue_line("markers", "docker: tests requiring Docker daemon")
```

**Step 3: Run tests (requires Docker)**

Run: `python -m pytest learning/session_review/tests/test_docker_sandbox.py -v -m docker`
Expected: All 4 PASS (if Docker is running)

**Step 4: Commit**

```bash
git add learning/session_review/tests/test_docker_sandbox.py learning/session_review/tests/conftest.py
git commit -m "test(learning): Docker sandbox component tests"
```

---

### Task 10: Final integration check and commit

**Step 1: Run all new tests together**

```bash
python -m pytest learning/schema/tests/ learning/tests/test_eval_pipeline.py -v
```

Expected: All tests PASS (excluding docker-marked tests)

**Step 2: Run existing tests to verify no regressions**

```bash
python -m pytest learning/ -v --ignore=learning/session_review/tests -k "not docker"
```

**Step 3: Verify learn export works on real DB**

```bash
python -m learning.cli export
python -m learning.cli eval --dry-run --days 3
```

**Step 4: Final commit if any fixups needed**

```bash
git add -A && git commit -m "fix: address test/integration issues from learning eval implementation"
```

---

## Summary of deliverables

| Task | What | Test count |
|------|------|------------|
| 1 | Changelog table + triggers | 4 |
| 2 | Eval annotations table | 1 |
| 3 | deprecate_principle + add_eval_annotation | 8 |
| 4 | No hard deletes static check | 1 |
| 5 | YAML export (learn export) | 2 |
| 6 | Eval pipeline (learning/eval.py) | 1 |
| 7 | CLI: learn eval | manual |
| 8 | UI: eval dashboard tab | manual |
| 9 | Docker component tests | 4 |
| 10 | Integration check | existing |
| **Total** | | **21 tests** |
