# Session Extraction Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Extract learnings directly from raw session transcript chunks, bypassing condensation, and store them as learning.db instances.

**Architecture:** SessionAdapter implements SemanticNet's DomainAdapter. Parses JSONL → turns, chunks into episodes via LLM topic detection, triages episodes (routine vs interesting), extracts learnings from interesting episodes in parallel, stores to learning.db via LearningStore.add_instance().

**Tech Stack:** lib/sessions (JSONL reader), lib/semnet (DomainAdapter, chunking pattern), lib/llm (async LLM calls), learning/schema (LearningStore, LearningInstance), click (CLI)

**Design doc:** `docs/plans/2026-02-23-session-extraction-design.md`

---

### Task 1: Turn parser — JSONL → structured turns

**Files:**
- Create: `learning/session_extract/__init__.py`
- Create: `learning/session_extract/turns.py`
- Create: `learning/session_extract/tests/test_turns.py`

**Context:**
- `lib/sessions/reader.py:read_session_full()` returns raw JSONL dicts
- `supervisor/sidekick/hooks/learning_worker.py:parse_transcript()` (line 200-324) already does similar work but produces one big text blob. We need structured turns instead.
- `supervisor/sidekick/hooks/handler.py:_is_low_info_prompt()` filters "yes", "ok", "cnp" etc.

**Step 1: Write the failing test**

```python
# learning/session_extract/tests/test_turns.py
import json
from pathlib import Path

from learning.session_extract.turns import parse_turns, Turn


def _make_record(role: str, text: str, **kwargs) -> dict:
    """Build a minimal JSONL record."""
    if role == "user":
        return {"type": "user", "message": {"content": text}, **kwargs}
    elif role == "assistant":
        return {"type": "assistant", "message": {"content": [{"type": "text", "text": text}]}, **kwargs}
    elif role == "tool_result":
        return {"type": "tool_result", "tool_name": kwargs.get("tool_name", "Bash"),
                "content": text, "is_error": kwargs.get("is_error", False)}
    return {"type": role, **kwargs}


def test_parse_basic_turns():
    records = [
        _make_record("user", "fix the auth bug"),
        _make_record("assistant", "I'll look at the auth module."),
        _make_record("tool_result", "def login():", tool_name="Read"),
        _make_record("assistant", "Found the issue, fixing now."),
        _make_record("user", "looks good, commit"),
    ]
    turns = parse_turns(records)
    assert len(turns) >= 2
    assert turns[0].role == "user"
    assert "auth bug" in turns[0].text


def test_skips_low_info():
    records = [
        _make_record("user", "fix the auth bug"),
        _make_record("assistant", "Done."),
        _make_record("user", "yes"),
        _make_record("user", "ok"),
    ]
    turns = parse_turns(records)
    user_turns = [t for t in turns if t.role == "user"]
    # "yes" and "ok" should be excluded or marked low-info
    assert len(user_turns) == 1


def test_captures_errors():
    records = [
        _make_record("user", "run the tests"),
        _make_record("tool_result", "ImportError: no module named foo",
                     tool_name="Bash", is_error=True),
        _make_record("assistant", "Let me fix that import."),
    ]
    turns = parse_turns(records)
    error_turns = [t for t in turns if t.has_error]
    assert len(error_turns) >= 1
    assert "ImportError" in error_turns[0].text


def test_turn_dataclass():
    t = Turn(role="user", text="hello", turn_idx=0, tools_used=[], has_error=False)
    assert t.role == "user"
    assert t.turn_idx == 0
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/session_extract/tests/test_turns.py -v`
Expected: FAIL with "ModuleNotFoundError" or "ImportError"

**Step 3: Write minimal implementation**

```python
# learning/session_extract/__init__.py
"""Session extraction — chunk raw transcripts, extract learnings."""

# learning/session_extract/turns.py
"""Parse JSONL records into structured turns."""

from __future__ import annotations

from dataclasses import dataclass, field


@dataclass
class Turn:
    """A single conversational turn."""
    role: str                    # "user" | "assistant" | "tool"
    text: str                    # content (full for user/errors, summarized for tool output)
    turn_idx: int
    tools_used: list[str] = field(default_factory=list)
    has_error: bool = False
    tool_targets: list[str] = field(default_factory=list)  # file paths, commands


# Low-info patterns — skip these user messages
_LOW_INFO = {
    "yes", "y", "ok", "no", "n", "sure", "thanks", "thank you",
    "cnp", "looks good", "lgtm", "go ahead", "proceed", "continue",
}


def _is_low_info(text: str) -> bool:
    stripped = text.strip().lower().rstrip("!.,")
    return stripped in _LOW_INFO or len(stripped) < 3


def _summarize_tool(tool_name: str, tool_input: dict) -> str:
    """Brief summary of a tool call (name + target)."""
    if "file_path" in tool_input:
        from pathlib import Path
        return f"{tool_name}({Path(tool_input['file_path']).name})"
    if "command" in tool_input:
        cmd = tool_input["command"]
        return f"{tool_name}({cmd[:60]})"
    if "pattern" in tool_input:
        return f"{tool_name}(pattern={tool_input['pattern']})"
    return tool_name


def parse_turns(records: list[dict]) -> list[Turn]:
    """Parse JSONL records into structured turns.

    Merges consecutive assistant + tool_result records into single assistant turns.
    Drops low-info user messages. Captures errors with full text.
    """
    turns: list[Turn] = []
    idx = 0

    # Pending assistant turn accumulator
    asst_text_parts: list[str] = []
    asst_tools: list[str] = []
    asst_targets: list[str] = []
    asst_has_error = False

    def _flush_assistant():
        nonlocal asst_text_parts, asst_tools, asst_targets, asst_has_error, idx
        if asst_text_parts or asst_tools:
            turns.append(Turn(
                role="assistant",
                text="\n".join(asst_text_parts),
                turn_idx=idx,
                tools_used=asst_tools[:],
                has_error=asst_has_error,
                tool_targets=asst_targets[:],
            ))
            idx += 1
            asst_text_parts = []
            asst_tools = []
            asst_targets = []
            asst_has_error = False

    for rec in records:
        rec_type = rec.get("type", "")

        if rec_type == "user":
            _flush_assistant()
            msg = rec.get("message", {})
            content = msg.get("content", "") if isinstance(msg, dict) else ""
            if not isinstance(content, str):
                continue
            if _is_low_info(content):
                continue
            # Skip command triggers
            stripped = content.strip()
            if stripped.startswith("<command-name>") or stripped.startswith("<local-command"):
                continue

            turns.append(Turn(role="user", text=content.strip(), turn_idx=idx))
            idx += 1

        elif rec_type == "assistant":
            msg = rec.get("message", {})
            content = msg.get("content", [])
            if isinstance(content, str):
                asst_text_parts.append(content)
            elif isinstance(content, list):
                for block in content:
                    if isinstance(block, dict):
                        if block.get("type") == "text":
                            asst_text_parts.append(block.get("text", ""))
                        elif block.get("type") == "tool_use":
                            tool_name = block.get("name", "")
                            tool_input = block.get("input", {})
                            asst_tools.append(tool_name)
                            asst_targets.append(_summarize_tool(tool_name, tool_input))

        elif rec_type == "tool_result":
            is_error = rec.get("is_error", False)
            content = rec.get("content", "")
            if isinstance(content, list):
                content = " ".join(
                    b.get("text", "") for b in content if isinstance(b, dict)
                )
            if is_error and content:
                asst_text_parts.append(f"ERROR: {content[:500]}")
                asst_has_error = True

    _flush_assistant()
    return turns
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/session_extract/tests/test_turns.py -v`
Expected: PASS (4 tests)

**Step 5: Commit**

```bash
git add learning/session_extract/__init__.py learning/session_extract/turns.py \
       learning/session_extract/tests/test_turns.py
git commit -m "feat(session_extract): turn parser — JSONL records to structured turns"
```

---

### Task 2: Episode chunker — turns → episodes

**Files:**
- Create: `learning/session_extract/episodes.py`
- Create: `learning/session_extract/tests/test_episodes.py`

**Context:**
- SemanticNet's `lib/semnet/chunk.py` has `chunk_by_topic_change()` and `chunk_fixed_window()` — same two-mode pattern but adapted from time-based to turn-based.
- Episodes are groups of turns about one topic/task.

**Step 1: Write the failing test**

```python
# learning/session_extract/tests/test_episodes.py
from learning.session_extract.turns import Turn
from learning.session_extract.episodes import chunk_fixed_window, Episode


def _turns(topics: list[str]) -> list[Turn]:
    """Make N turns with distinct text."""
    return [Turn(role="user", text=t, turn_idx=i, tools_used=[], has_error=False)
            for i, t in enumerate(topics)]


def test_fixed_window_basic():
    turns = _turns(["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
    episodes = chunk_fixed_window(turns, window_size=3)
    assert len(episodes) == 4  # 3+3+3+1
    assert episodes[0].start_turn == 0
    assert episodes[0].end_turn == 2
    assert episodes[-1].end_turn == 9


def test_fixed_window_small_input():
    turns = _turns(["a", "b"])
    episodes = chunk_fixed_window(turns, window_size=5)
    assert len(episodes) == 1


def test_episode_text():
    turns = _turns(["fix auth bug", "add tests"])
    episodes = chunk_fixed_window(turns, window_size=5)
    assert "fix auth bug" in episodes[0].text
    assert "add tests" in episodes[0].text
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/session_extract/tests/test_episodes.py -v`
Expected: FAIL

**Step 3: Write minimal implementation**

```python
# learning/session_extract/episodes.py
"""Chunk turns into episodes (conversation segments about one topic)."""

from __future__ import annotations

from dataclasses import dataclass, field

from learning.session_extract.turns import Turn


@dataclass
class Episode:
    """A segment of conversation about one topic/task."""
    start_turn: int
    end_turn: int
    text: str
    topic: str = ""
    turns: list[Turn] = field(default_factory=list)
    triage_tag: str = ""  # routine | error_recovery | pivot | user_correction | design_decision

    @property
    def turn_count(self) -> int:
        return self.end_turn - self.start_turn + 1

    @property
    def has_errors(self) -> bool:
        return any(t.has_error for t in self.turns)


def chunk_fixed_window(turns: list[Turn], *, window_size: int = 5) -> list[Episode]:
    """Split turns into fixed-size episode windows (fallback, no LLM)."""
    if not turns:
        return []

    episodes = []
    for i in range(0, len(turns), window_size):
        chunk = turns[i:i + window_size]
        text = "\n\n".join(f"[{t.role}] {t.text}" for t in chunk)
        episodes.append(Episode(
            start_turn=chunk[0].turn_idx,
            end_turn=chunk[-1].turn_idx,
            text=text,
            turns=chunk,
        ))
    return episodes


async def chunk_by_topic(
    turns: list[Turn],
    *,
    model: str = "flash",
    min_turns: int = 3,
    max_turns: int = 15,
) -> list[Episode]:
    """Chunk turns at topic boundaries detected by LLM.

    Falls back to fixed windows on failure.
    """
    if not turns or len(turns) <= max_turns:
        return chunk_fixed_window(turns, window_size=max(len(turns), 1))

    from lib.llm import call_llm
    import json
    import re

    # Build turn summary for LLM
    summary_lines = []
    for t in turns:
        prefix = "USER" if t.role == "user" else "ASST"
        err = " [ERROR]" if t.has_error else ""
        tools = f" [{', '.join(t.tools_used)}]" if t.tools_used else ""
        summary_lines.append(f"Turn {t.turn_idx}: {prefix}{err}{tools} {t.text[:150]}")

    prompt = f"""\
Analyze this session transcript and identify where the topic/task changes.
Return ONLY a JSON array of objects:
- "turn_idx": turn index where a new topic starts
- "topic": 3-5 word label for the topic starting here

Constraints:
- Minimum {min_turns} turns per topic
- Maximum {max_turns} turns per topic (force split if longer)
- First entry must be turn {turns[0].turn_idx}

Transcript:
{chr(10).join(summary_lines)}"""

    try:
        resp = await call_llm(model=model, prompt=prompt, temperature=0.0, stream=False)
        cleaned = resp.strip()
        if cleaned.startswith("```"):
            cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned)
            cleaned = re.sub(r'\s*```$', '', cleaned)
        splits = json.loads(cleaned)
    except Exception:
        return chunk_fixed_window(turns, window_size=max_turns)

    # Build episodes from splits
    turn_by_idx = {t.turn_idx: t for t in turns}
    episodes = []
    for i, split in enumerate(splits):
        s_idx = int(split["turn_idx"])
        e_idx = int(splits[i + 1]["turn_idx"]) - 1 if i + 1 < len(splits) else turns[-1].turn_idx
        chunk_turns = [t for t in turns if s_idx <= t.turn_idx <= e_idx]
        if chunk_turns:
            text = "\n\n".join(f"[{t.role}] {t.text}" for t in chunk_turns)
            episodes.append(Episode(
                start_turn=s_idx,
                end_turn=e_idx,
                text=text,
                topic=split.get("topic", ""),
                turns=chunk_turns,
            ))

    return episodes if episodes else chunk_fixed_window(turns, window_size=max_turns)
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/session_extract/tests/test_episodes.py -v`
Expected: PASS (3 tests)

**Step 5: Commit**

```bash
git add learning/session_extract/episodes.py learning/session_extract/tests/test_episodes.py
git commit -m "feat(session_extract): episode chunker — turns to topic episodes"
```

---

### Task 3: Episode triage — classify episodes

**Files:**
- Create: `learning/session_extract/triage.py`
- Create: `learning/session_extract/tests/test_triage.py`

**Context:**
- Before extraction, classify each episode as routine (skip) or interesting (extract from).
- Two modes: heuristic (fast, no LLM) and LLM-based (better quality).
- Heuristic: episodes with errors, user corrections ("no", "instead", "actually"), or approach changes are interesting.

**Step 1: Write the failing test**

```python
# learning/session_extract/tests/test_triage.py
from learning.session_extract.turns import Turn
from learning.session_extract.episodes import Episode
from learning.session_extract.triage import triage_heuristic


def _episode(text: str, has_error: bool = False) -> Episode:
    turns = [Turn(role="user", text=text, turn_idx=0, tools_used=[], has_error=has_error)]
    return Episode(start_turn=0, end_turn=0, text=text, turns=turns)


def test_routine_episode():
    ep = _episode("read the file and edit line 5")
    tag = triage_heuristic(ep)
    assert tag == "routine"


def test_error_episode():
    ep = _episode("ImportError: no module named foo", has_error=True)
    tag = triage_heuristic(ep)
    assert tag == "error_recovery"


def test_correction_episode():
    ep = _episode("no, don't do that. instead use the other approach")
    tag = triage_heuristic(ep)
    assert tag == "user_correction"
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/session_extract/tests/test_triage.py -v`
Expected: FAIL

**Step 3: Write minimal implementation**

```python
# learning/session_extract/triage.py
"""Episode triage — classify episodes as routine or interesting."""

from __future__ import annotations

import re

from learning.session_extract.episodes import Episode


# Patterns suggesting user correction
_CORRECTION_PATTERNS = re.compile(
    r"\b(no[,.]?\s+(don't|do not|instead|actually|wait)|"
    r"that's wrong|not what i|try .* instead|"
    r"actually[,.]?\s+(let's|use|do)|"
    r"stop[,.]?\s|go back)\b",
    re.IGNORECASE,
)

# Patterns suggesting design decisions
_DESIGN_PATTERNS = re.compile(
    r"\b(should we|which approach|trade.?off|architecture|"
    r"let's go with|i prefer|design)\b",
    re.IGNORECASE,
)

# Patterns suggesting pivots
_PIVOT_PATTERNS = re.compile(
    r"\b(abandon|scrap|different approach|start over|"
    r"that .* (isn't|won't) work|let's try something else)\b",
    re.IGNORECASE,
)


def triage_heuristic(episode: Episode) -> str:
    """Classify episode using heuristics (no LLM). Fast fallback.

    Returns one of: routine, error_recovery, pivot, user_correction, design_decision
    """
    text = episode.text

    if episode.has_errors:
        return "error_recovery"

    if _CORRECTION_PATTERNS.search(text):
        return "user_correction"

    if _PIVOT_PATTERNS.search(text):
        return "pivot"

    if _DESIGN_PATTERNS.search(text):
        return "design_decision"

    return "routine"


async def triage_llm(episodes: list[Episode], *, model: str = "flash") -> list[Episode]:
    """Classify episodes using LLM. Better quality, costs one LLM call.

    Mutates episode.triage_tag in place and returns the same list.
    """
    from lib.llm import call_llm
    import json

    summaries = []
    for i, ep in enumerate(episodes):
        err = " [HAS ERRORS]" if ep.has_errors else ""
        summaries.append(f"Episode {i}: {ep.topic or ep.text[:100]}{err}")

    prompt = f"""\
Classify each episode of this coding session. Return JSON array of objects:
- "index": episode index
- "tag": one of: routine, error_recovery, pivot, user_correction, design_decision

Tags:
- routine: standard read/edit/commit, no learning signal
- error_recovery: something broke, debugging happened
- pivot: approach abandoned, new direction taken
- user_correction: user redirected the AI
- design_decision: architectural choice with tradeoffs

Episodes:
{chr(10).join(summaries)}"""

    try:
        resp = await call_llm(model=model, prompt=prompt, temperature=0.0, stream=False)
        cleaned = resp.strip()
        if cleaned.startswith("```"):
            import re
            cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned)
            cleaned = re.sub(r'\s*```$', '', cleaned)
        tags = json.loads(cleaned)
        for item in tags:
            idx = item.get("index", -1)
            if 0 <= idx < len(episodes):
                episodes[idx].triage_tag = item.get("tag", "routine")
    except Exception:
        # Fallback to heuristic
        for ep in episodes:
            ep.triage_tag = triage_heuristic(ep)

    return episodes
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/session_extract/tests/test_triage.py -v`
Expected: PASS (3 tests)

**Step 5: Commit**

```bash
git add learning/session_extract/triage.py learning/session_extract/tests/test_triage.py
git commit -m "feat(session_extract): episode triage — heuristic + LLM classification"
```

---

### Task 4: Extraction — episodes → learning instances

**Files:**
- Create: `learning/session_extract/extract.py`
- Create: `learning/session_extract/tests/test_extract.py`

**Context:**
- `learning/schema/learning_store.py`: `LearningStore.add_instance()` takes a `LearningInstance` (line 160)
- `LearningInstance` fields (line 61-79): content, source_type, project, domain_tags, learning_type, metadata
- `SourceType` enum likely includes values like `session_review` — check `learning/schema/learning_store.py`
- Extraction prompt runs on each non-routine episode, extracts 0-2 learnings

**Step 1: Write the failing test**

```python
# learning/session_extract/tests/test_extract.py
import json
from unittest.mock import AsyncMock, patch

import pytest

from learning.session_extract.episodes import Episode
from learning.session_extract.turns import Turn
from learning.session_extract.extract import extract_from_episode, EXTRACT_PROMPT


def _episode(text: str, topic: str = "test") -> Episode:
    turns = [Turn(role="user", text=text, turn_idx=0, tools_used=[], has_error=True)]
    return Episode(start_turn=0, end_turn=0, text=text, topic=topic, turns=turns)


def test_extract_prompt_exists():
    assert "observation" in EXTRACT_PROMPT
    assert "generalization" in EXTRACT_PROMPT


@pytest.mark.asyncio
async def test_extract_from_episode():
    mock_response = json.dumps({"learnings": [{
        "observation": "Import failed because module moved",
        "generalization": "Check imports after refactoring",
        "project": "rivus",
        "type": "pattern",
    }]})

    with patch("learning.session_extract.extract.call_llm", new_callable=AsyncMock,
               return_value=mock_response):
        results = await extract_from_episode(
            _episode("ImportError: no module found\nFixed by updating import path"),
            model="flash",
        )
    assert len(results) == 1
    assert "Import failed" in results[0]["observation"]


@pytest.mark.asyncio
async def test_extract_returns_empty_for_routine():
    mock_response = json.dumps({"learnings": []})
    with patch("learning.session_extract.extract.call_llm", new_callable=AsyncMock,
               return_value=mock_response):
        results = await extract_from_episode(
            _episode("read file, edit line 5, commit"),
            model="flash",
        )
    assert results == []
```

**Step 2: Run test to verify it fails**

Run: `python -m pytest learning/session_extract/tests/test_extract.py -v`
Expected: FAIL

**Step 3: Write minimal implementation**

```python
# learning/session_extract/extract.py
"""Extract learnings from episodes."""

from __future__ import annotations

import asyncio
import json
import re

from loguru import logger

from lib.llm import call_llm
from learning.session_extract.episodes import Episode


EXTRACT_PROMPT = """\
Extract 0-2 learnings from this coding session episode.

A learning is worth extracting when:
- A bug took >2 attempts to diagnose (non-obvious root cause)
- An approach was abandoned and replaced (wrong first instinct)
- A tool/API/library behaved unexpectedly (gotcha worth recording)
- A pattern emerged that applies to future sessions
- The user corrected the AI's approach (reveals preference or convention)
- A debugging technique was particularly effective

Do NOT extract:
- Routine operations (read, edit, run tests)
- Well-known best practices
- Session-specific context without generalizable insight

Return JSON:
{
  "learnings": [
    {
      "observation": "Concrete description of what happened",
      "generalization": "Broader principle or pattern",
      "project": "project name or 'general'",
      "type": "principle|convention|pattern|howto|observation"
    }
  ]
}

If nothing worth extracting: {"learnings": []}

Episode topic: {topic}
---
{text}
"""


async def extract_from_episode(
    episode: Episode,
    *,
    model: str = "flash",
    project: str = "rivus",
) -> list[dict]:
    """Extract learnings from a single episode.

    Returns list of dicts with: observation, generalization, project, type.
    """
    prompt = EXTRACT_PROMPT.format(
        topic=episode.topic or "unknown",
        text=episode.text[:15000],  # safety truncation
    )

    try:
        resp = await call_llm(model=model, prompt=prompt, temperature=0.0, stream=False)
        cleaned = resp.strip()
        if cleaned.startswith("```"):
            cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned)
            cleaned = re.sub(r'\s*```$', '', cleaned)
        data = json.loads(cleaned)
        learnings = data.get("learnings", [])

        # Attach episode metadata
        for l in learnings:
            l.setdefault("project", project)
            l["_episode_topic"] = episode.topic
            l["_turn_range"] = f"{episode.start_turn}-{episode.end_turn}"

        return learnings

    except Exception as e:
        logger.warning("Extraction failed for episode {}: {}", episode.topic, e)
        return []


async def extract_from_episodes(
    episodes: list[Episode],
    *,
    model: str = "flash",
    project: str = "rivus",
    concurrency: int = 5,
) -> list[dict]:
    """Extract learnings from multiple episodes concurrently.

    Skips episodes tagged as 'routine'.
    """
    sem = asyncio.Semaphore(concurrency)
    interesting = [ep for ep in episodes if ep.triage_tag != "routine"]

    logger.info("Extracting from {}/{} non-routine episodes", len(interesting), len(episodes))

    async def _one(ep: Episode) -> list[dict]:
        async with sem:
            return await extract_from_episode(ep, model=model, project=project)

    results = await asyncio.gather(*[_one(ep) for ep in interesting])
    return [l for batch in results for l in batch]
```

**Step 4: Run test to verify it passes**

Run: `python -m pytest learning/session_extract/tests/test_extract.py -v`
Expected: PASS (3 tests)

**Step 5: Commit**

```bash
git add learning/session_extract/extract.py learning/session_extract/tests/test_extract.py
git commit -m "feat(session_extract): per-episode learning extraction"
```

---

### Task 5: Pipeline + CLI — end-to-end extraction with storage

**Files:**
- Create: `learning/session_extract/pipeline.py`
- Create: `learning/session_extract/__main__.py`
- Modify: `learning/tasks.py` (add `inv learning.extract` task)

**Context:**
- `lib/sessions` provides `discover_sessions()`, `read_session_full()`, `CLAUDE_PROJECTS_DIR`
- `learning/schema/learning_store.py`: `LearningStore`, `LearningInstance`, `SourceType`
- Pipeline: load JSONL → parse turns → chunk episodes → triage → extract → store to learning.db

**Step 1: Write the pipeline**

```python
# learning/session_extract/pipeline.py
"""End-to-end session extraction pipeline."""

from __future__ import annotations

import asyncio
from pathlib import Path

from loguru import logger

from lib.sessions import read_session_full, discover_sessions, CLAUDE_PROJECTS_DIR
from learning.session_extract.turns import parse_turns
from learning.session_extract.episodes import chunk_fixed_window, chunk_by_topic
from learning.session_extract.triage import triage_heuristic, triage_llm
from learning.session_extract.extract import extract_from_episodes


async def extract_session(
    jsonl_path: Path,
    *,
    model: str = "flash",
    use_llm_chunking: bool = True,
    use_llm_triage: bool = False,
    project: str = "rivus",
    dry_run: bool = False,
) -> list[dict]:
    """Full extraction pipeline for one session.

    Returns list of extracted learning dicts.
    """
    session_id = jsonl_path.stem

    # 1. Load JSONL
    records = read_session_full(jsonl_path)
    if not records:
        logger.info("Empty session: {}", session_id[:8])
        return []

    # 2. Parse turns
    turns = parse_turns(records)
    if len(turns) < 3:
        logger.info("Too few turns ({}): {}", len(turns), session_id[:8])
        return []

    logger.info("Session {}: {} turns from {} records", session_id[:8], len(turns), len(records))

    # 3. Chunk into episodes
    if use_llm_chunking and len(turns) > 10:
        episodes = await chunk_by_topic(turns, model=model)
    else:
        episodes = chunk_fixed_window(turns, window_size=8)

    logger.info("Chunked into {} episodes", len(episodes))

    # 4. Triage
    if use_llm_triage:
        episodes = await triage_llm(episodes, model=model)
    else:
        for ep in episodes:
            ep.triage_tag = triage_heuristic(ep)

    tags = {}
    for ep in episodes:
        tags[ep.triage_tag] = tags.get(ep.triage_tag, 0) + 1
    logger.info("Triage: {}", tags)

    # 5. Extract
    learnings = await extract_from_episodes(episodes, model=model, project=project)

    # Attach session metadata
    for l in learnings:
        l["_session_id"] = session_id

    logger.info("Extracted {} learnings from session {}", len(learnings), session_id[:8])

    # 6. Store (unless dry run)
    if not dry_run and learnings:
        _store_learnings(learnings, session_id, model)

    return learnings


def _store_learnings(learnings: list[dict], session_id: str, model: str):
    """Store extracted learnings to learning.db."""
    from learning.schema.learning_store import LearningStore, LearningInstance, SourceType, LearningType

    store = LearningStore()
    for l in learnings:
        lt = None
        type_str = l.get("type", "observation")
        for member in LearningType:
            if member.value == type_str:
                lt = member
                break

        instance = LearningInstance(
            id=f"se-{session_id[:8]}-{hash(l['observation']) % 10000:04d}",
            content=l["observation"],
            context_snippet=l.get("generalization", ""),
            source_type=SourceType.SESSION_REVIEW,
            source_id=session_id,
            project=l.get("project", "rivus"),
            domain_tags=[
                f"episode:{l.get('_episode_topic', '')}",
                f"turns:{l.get('_turn_range', '')}",
            ],
            learning_type=lt,
            extracted_by="session_extract",
            extraction_model=model,
            metadata={"generalization": l.get("generalization", "")},
        )
        store.add_instance(instance)
        logger.debug("Stored: {}", instance.id)


async def extract_batch(
    *,
    count: int = 10,
    model: str = "flash",
    dry_run: bool = False,
    project_dir: Path = CLAUDE_PROJECTS_DIR,
) -> list[dict]:
    """Extract from recent sessions."""
    sessions = discover_sessions(days=30, limit=count * 3)

    # Filter to substantial sessions
    candidates = []
    for s in sessions:
        if s.jsonl_path and s.message_count >= 5:
            candidates.append(s)
        if len(candidates) >= count:
            break

    logger.info("Processing {} sessions", len(candidates))

    all_learnings = []
    for s in candidates:
        learnings = await extract_session(
            s.jsonl_path,
            model=model,
            dry_run=dry_run,
            project=s.project or "rivus",
        )
        all_learnings.extend(learnings)

    return all_learnings
```

**Step 2: Write the CLI**

```python
# learning/session_extract/__main__.py
"""CLI for session extraction."""

import asyncio
from pathlib import Path

import click
from loguru import logger

from learning.session_extract.pipeline import extract_session, extract_batch


@click.group()
def cli():
    """Session extraction — learnings from raw transcripts."""
    pass


@cli.command()
@click.argument("session_id_or_path")
@click.option("--model", default="flash", help="LLM model for extraction")
@click.option("--dry-run", is_flag=True, help="Don't store to learning.db")
def one(session_id_or_path: str, model: str, dry_run: bool):
    """Extract from a single session."""
    path = Path(session_id_or_path)
    if not path.exists():
        from lib.sessions import find_jsonl
        path = find_jsonl(session_id_or_path)
        if not path:
            click.echo(f"Session not found: {session_id_or_path}")
            return

    learnings = asyncio.run(extract_session(path, model=model, dry_run=dry_run))

    for l in learnings:
        click.echo(f"\n  [{l.get('type', '?')}] {l['observation']}")
        if l.get('generalization'):
            click.echo(f"  → {l['generalization']}")

    click.echo(f"\n{len(learnings)} learnings extracted{'  (dry run)' if dry_run else ''}")


@cli.command()
@click.option("--count", default=10, help="Number of sessions")
@click.option("--model", default="flash", help="LLM model")
@click.option("--dry-run", is_flag=True, help="Don't store to learning.db")
def batch(count: int, model: str, dry_run: bool):
    """Extract from recent sessions."""
    learnings = asyncio.run(extract_batch(count=count, model=model, dry_run=dry_run))

    click.echo(f"\n{len(learnings)} learnings from {count} sessions{'  (dry run)' if dry_run else ''}")
    for l in learnings:
        click.echo(f"  [{l.get('type', '?')}] {l['observation'][:80]}")


if __name__ == "__main__":
    cli()
```

**Step 3: Add inv task to `learning/tasks.py`**

Add after the existing `gym` task:

```python
@task
def extract(c, session=None, count=10, model="flash", dry_run=False):
    """Extract learnings from raw session transcripts.

    Examples:
        inv learning.extract                           # batch: 10 recent sessions
        inv learning.extract --session abc123          # single session
        inv learning.extract --count 20 --model opus   # batch with opus
        inv learning.extract --dry-run                 # preview, don't store
    """
    set_iterm_badge("🔍 extract")
    base = "python -m learning.session_extract"
    if session:
        cmd = f"{base} one {session} --model {model}"
    else:
        cmd = f"{base} batch --count {count} --model {model}"
    if dry_run:
        cmd += " --dry-run"
    c.run(cmd, pty=True)
```

**Step 4: Run smoke test**

Run: `python -m learning.session_extract one --dry-run $(ls -t ~/.claude/projects/-Users-tchklovski-all-code-rivus/*.jsonl | head -1)`
Expected: Extracted learnings printed, no storage

**Step 5: Commit**

```bash
git add learning/session_extract/pipeline.py learning/session_extract/__main__.py learning/tasks.py
git commit -m "feat(session_extract): end-to-end pipeline + CLI + inv task"
```

---

### Task 6: Smoke test on real sessions

**Files:** None (testing only)

**Step 1: Dry run on 3 sessions**

Run: `python -m learning.session_extract batch --count 3 --dry-run`
Expected: Learnings extracted and printed, no storage

**Step 2: Verify triage distribution**

Check that triage isn't classifying everything as routine or everything as interesting. Expect ~40-60% routine.

**Step 3: Single session deep check**

Run: `python -m learning.session_extract one <session_id_with_known_errors> --dry-run`
Expected: Finds the known error/learning from that session

**Step 4: Store for real**

Run: `python -m learning.session_extract batch --count 3`
Expected: Learnings stored, verify with `learn list --limit 5`

**Step 5: Commit test artifacts cleanup**

No files to commit — just verification.

---

Plan complete and saved to `docs/plans/2026-02-23-session-extraction.md`. Two execution options:

**1. Subagent-Driven (this session)** - I dispatch fresh subagent per task, review between tasks, fast iteration

**2. Parallel Session (separate)** - Open new session with executing-plans, batch execution with checkpoints

Which approach?