# Doctor Upgrade Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Transform doctor from a polling-only log scanner into a signal-driven fix dispatcher with safety guardrails.

**Architecture:** Two signal tiers (log baseline + push signals) feed into a unified triage pipeline. New `lib/doctor_signal.py` (stdlib-only) lets any service push signals. Doctor reader uses watchdog + queue for instant, contention-free processing. Fix lifecycle gets branch naming, attempt limits, timeouts, and stale lock recovery.

**Tech Stack:** Python stdlib (signal library), watchdog (reader), SQLite WAL (events + dedup), loguru (sink integration), Pushover (notifications)

**Design doc:** `docs/plans/2026-03-07-doctor-upgrade-design.md` (v3)

---

## Task 1: Signal Library (`lib/doctor_signal.py`)

The foundation. Zero third-party deps. Every other task depends on this.

**Files:**
- Create: `lib/doctor_signal.py`
- Create: `lib/tests/test_doctor_signal.py`

**Step 1: Write failing tests**

```python
# lib/tests/test_doctor_signal.py
"""Tests for lib/doctor_signal.py — doctor signal library."""
import json
import os
import tempfile
from pathlib import Path

import pytest


@pytest.fixture
def signals_dir(tmp_path):
    incoming = tmp_path / "incoming"
    incoming.mkdir()
    processed = tmp_path / "processed"
    processed.mkdir()
    return tmp_path


def test_signal_error_writes_json(signals_dir):
    from lib.doctor_signal import signal_error

    path = signal_error(
        "auth-svc", "KeyError", "KeyError: 'user_id'",
        traceback="Traceback...\nKeyError: 'user_id'",
        signals_dir=signals_dir / "incoming",
    )
    assert path is not None
    assert path.exists()
    data = json.loads(path.read_text())
    assert data["service"] == "auth-svc"
    assert data["type"] == "KeyError"
    assert data["details"]["error_msg"] == "KeyError: 'user_id'"
    assert data["details"]["traceback"] == "Traceback...\nKeyError: 'user_id'"
    assert "event_id" in data
    assert "timestamp" in data
    assert "fingerprint" in data


def test_signal_error_rate_limited(signals_dir):
    from lib.doctor_signal import signal_error

    incoming = signals_dir / "incoming"
    p1 = signal_error("svc", "KeyError", "msg", signals_dir=incoming)
    p2 = signal_error("svc", "KeyError", "msg", signals_dir=incoming)
    assert p1 is not None
    assert p2 is None  # rate-limited (same fingerprint within 10 min)


def test_signal_error_dir_cap(signals_dir):
    from lib.doctor_signal import signal_error, MAX_INCOMING_FILES

    incoming = signals_dir / "incoming"
    # Fill directory to cap
    for i in range(MAX_INCOMING_FILES):
        (incoming / f"dummy_{i}.json").write_text("{}")

    path = signal_error("svc", "Err", "msg", signals_dir=incoming)
    assert path is None  # dropped — dir at cap


def test_make_fingerprint_stable():
    from lib.doctor_signal import make_fingerprint

    fp1 = make_fingerprint("svc", "KeyError", "KeyError: 'user_id'")
    fp2 = make_fingerprint("svc", "KeyError", "KeyError: 'user_id'")
    assert fp1 == fp2
    assert len(fp1) == 16


def test_make_fingerprint_normalizes():
    from lib.doctor_signal import make_fingerprint

    fp1 = make_fingerprint("svc", "KeyError", "KeyError at line 42: 'x'")
    fp2 = make_fingerprint("svc", "KeyError", "KeyError at line 99: 'x'")
    assert fp1 == fp2  # line numbers stripped


def test_make_fingerprint_with_traceback():
    from lib.doctor_signal import make_fingerprint

    tb1 = 'File "foo.py", line 10, in bar\n  x = d["k"]\nKeyError: "k"'
    tb2 = 'File "foo.py", line 25, in bar\n  x = d["k"]\nKeyError: "k"'
    fp1 = make_fingerprint("svc", "KeyError", "msg", traceback=tb1)
    fp2 = make_fingerprint("svc", "KeyError", "msg", traceback=tb2)
    assert fp1 == fp2  # same frames, different line numbers


def test_get_git_hash():
    from lib.doctor_signal import get_git_hash

    h = get_git_hash()
    # We're in a git repo, so this should return a hash
    assert h is not None
    assert len(h) == 40


def test_doctor_loguru_sink(signals_dir):
    """Test that the loguru sink function works when called with a mock record."""
    from lib.doctor_signal import doctor_loguru_sink, configure

    configure(signals_dir=signals_dir / "incoming", service="test-svc")

    # Duck-type a loguru message object
    class FakeMessage:
        def __init__(self):
            self.record = {
                "level": type("L", (), {"name": "ERROR", "no": 40})(),
                "exception": type("E", (), {
                    "type": KeyError,
                    "value": KeyError("user_id"),
                    "traceback": None,
                })(),
                "message": "KeyError: 'user_id'",
                "module": "auth",
                "function": "get_user",
                "name": "auth.views",
            }

    doctor_loguru_sink(FakeMessage())

    files = list((signals_dir / "incoming").glob("*.json"))
    assert len(files) == 1
    data = json.loads(files[0].read_text())
    assert data["service"] == "test-svc"
    assert data["type"] == "KeyError"
```

**Step 2: Run tests to verify they fail**

Run: `python -m pytest lib/tests/test_doctor_signal.py -v`
Expected: ImportError — `lib.doctor_signal` doesn't exist yet

**Step 3: Implement `lib/doctor_signal.py`**

```python
#!/usr/bin/env python
"""Doctor signal library — zero-dependency (stdlib only).

Any service can import this to push signals to doctor.
Does NOT import doctor, loguru, or any third-party package.
"""
import atexit
import hashlib
import json
import os
import re
import signal as signal_mod
import subprocess
import tempfile
import traceback as tb_mod
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4

# --- Configuration ---

MAX_INCOMING_FILES = 1000
RATE_LIMIT_SECONDS = 600  # 10 minutes per fingerprint

# Module-level state (set via configure())
_config = {
    "signals_dir": None,  # Path to incoming/ directory
    "service": None,      # Service name
}

# Rate limit tracking: {fingerprint: last_signal_timestamp}
_rate_limit: dict[str, float] = {}


def configure(*, signals_dir: Path | str | None = None, service: str | None = None):
    """Configure the signal library. Call once at service startup."""
    if signals_dir is not None:
        _config["signals_dir"] = Path(signals_dir)
    if service is not None:
        _config["service"] = service


def _default_signals_dir() -> Path | None:
    """Find signals/incoming/ relative to this file's location in a monorepo."""
    # Walk up from lib/ to find doctor/signals/incoming/
    here = Path(__file__).resolve().parent  # lib/
    candidate = here.parent / "doctor" / "signals" / "incoming"
    if candidate.is_dir():
        return candidate
    return _config.get("signals_dir")


# --- Fingerprinting ---

def _normalize_message(msg: str) -> str:
    """Strip line numbers, hex addresses, UUIDs, timestamps from error message."""
    msg = re.sub(r'\bline \d+\b', 'line N', msg)
    msg = re.sub(r'0x[0-9a-fA-F]+', '0xHEX', msg)
    msg = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', 'UUID', msg)
    msg = re.sub(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', 'TIMESTAMP', msg)
    msg = re.sub(r'\d{10,}', 'BIGNUM', msg)  # epoch timestamps, large IDs
    return msg.strip()


def _extract_top_frames(traceback_text: str, n: int = 3) -> list[str]:
    """Extract top N stack frames as 'filename:function_name' (no line numbers)."""
    frames = []
    for match in re.finditer(r'File "([^"]+)", line \d+, in (\w+)', traceback_text):
        filename = Path(match.group(1)).name  # just filename, not full path
        func = match.group(2)
        frames.append(f"{filename}:{func}")
    return frames[-n:]  # top N (most recent)


def make_fingerprint(
    service: str,
    error_type: str,
    message: str,
    traceback: str | None = None,
) -> str:
    """Content-based fingerprint for cross-tier dedup."""
    parts = [service, error_type, _normalize_message(message)]
    if traceback:
        frames = _extract_top_frames(traceback, n=3)
        parts.extend(frames)
    return hashlib.sha256("|".join(parts).encode()).hexdigest()[:16]


# --- Git helper ---

def get_git_hash() -> str | None:
    """Best-effort git rev-parse HEAD. Returns None on failure."""
    try:
        result = subprocess.run(
            ["git", "rev-parse", "HEAD"],
            capture_output=True, text=True, timeout=5,
        )
        if result.returncode == 0:
            return result.stdout.strip()
    except Exception:
        pass
    return None


# --- Signal writing ---

def signal_error(
    service: str | None = None,
    error_type: str = "unknown",
    message: str = "",
    *,
    traceback: str | None = None,
    exit_code: int | None = None,
    log_tail: list[str] | None = None,
    git_hash: str | None = None,
    extra: dict | None = None,
    signals_dir: Path | None = None,
) -> Path | None:
    """Write a signal file to doctor. Returns path or None if rate-limited/capped."""
    service = service or _config.get("service") or "unknown"
    signals_dir = Path(signals_dir) if signals_dir else _default_signals_dir()
    if signals_dir is None:
        return None  # no signals directory configured or found

    # Dir cap check
    try:
        if len(os.listdir(signals_dir)) >= MAX_INCOMING_FILES:
            return None
    except OSError:
        return None

    fp = make_fingerprint(service, error_type, message, traceback)

    # Rate limit check
    import time
    now = time.time()
    if fp in _rate_limit and (now - _rate_limit[fp]) < RATE_LIMIT_SECONDS:
        return None
    _rate_limit[fp] = now

    event_id = uuid4().hex[:12]
    ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

    details = {"error_msg": message}
    if traceback:
        details["traceback"] = traceback
    if exit_code is not None:
        details["exit_code"] = exit_code
    if log_tail:
        details["log_tail"] = log_tail
    if git_hash:
        details["git_hash"] = git_hash
    elif git_hash is None:
        h = get_git_hash()
        if h:
            details["git_hash"] = h
    if extra:
        details.update(extra)

    payload = {
        "event_id": event_id,
        "timestamp": ts,
        "service": service,
        "type": error_type,
        "severity": "med",
        "fingerprint": fp,
        "details": details,
        "source": "doctor_signal",
    }

    # Atomic write: tempfile in same dir → rename
    try:
        signals_dir.mkdir(parents=True, exist_ok=True)
        ts_short = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
        filename = f"{service}_{ts_short}_{event_id}.json"
        fd, tmp_path = tempfile.mkstemp(dir=str(signals_dir), suffix=".tmp")
        try:
            with os.fdopen(fd, "w") as f:
                json.dump(payload, f, indent=2)
            final_path = signals_dir / filename
            os.rename(tmp_path, str(final_path))
            return final_path
        except Exception:
            try:
                os.unlink(tmp_path)
            except OSError:
                pass
            raise
    except Exception:
        return None


# --- Loguru sink (duck-typed, no loguru import) ---

# Error types considered transient (not code bugs)
_TRANSIENT_PATTERNS = re.compile(
    r"(timeout|timed out|connection refused|connection reset|"
    r"broken pipe|temporary failure|rate limit|429|503|502|504)",
    re.IGNORECASE,
)

# Error types likely to be code bugs
_CODE_BUG_TYPES = {
    "KeyError", "AttributeError", "TypeError", "IndexError",
    "ValueError", "NameError", "ImportError", "ModuleNotFoundError",
    "AssertionError", "NotImplementedError", "RuntimeError",
    "ZeroDivisionError", "UnboundLocalError", "RecursionError",
}


def doctor_loguru_sink(message) -> None:
    """Loguru sink function. Duck-types message.record — no loguru import.

    Usage: logger.add(doctor_loguru_sink, level="ERROR")
    """
    record = message.record
    level = record.get("level")
    if level and hasattr(level, "no") and level.no < 40:  # below ERROR
        return

    exc = record.get("exception")
    if exc is None:
        return  # only signal exceptions

    exc_type = getattr(exc, "type", None)
    exc_value = getattr(exc, "value", None)
    exc_tb = getattr(exc, "traceback", None)

    if exc_type is None:
        return

    type_name = exc_type.__name__ if hasattr(exc_type, "__name__") else str(exc_type)
    msg = str(exc_value) if exc_value else record.get("message", "")

    # Skip transient errors
    if _TRANSIENT_PATTERNS.search(msg):
        return

    # Only signal likely code bugs
    if type_name not in _CODE_BUG_TYPES:
        return

    # Format traceback if available
    tb_text = None
    if exc_tb:
        try:
            tb_text = "".join(tb_mod.format_tb(exc_tb))
            tb_text += f"{type_name}: {msg}"
        except Exception:
            pass

    signal_error(
        error_type=type_name,
        message=f"{type_name}: {msg}",
        traceback=tb_text,
    )


# --- Exit signaling ---

def signal_on_exit(service: str | None = None) -> None:
    """Register atexit + SIGTERM handler for crash-on-exit signaling.

    Best-effort only. See design doc §1 crash coverage table for limitations.
    Launchd wrapper is the authoritative crash reporter for hard exits.
    """
    svc = service or _config.get("service") or "unknown"

    def _atexit_handler():
        import sys
        code = getattr(sys, "exitcode", None)
        # Only signal non-zero exits
        if code and code != 0:
            signal_error(
                service=svc,
                error_type="process_exit",
                message=f"Process exited with code {code}",
                exit_code=code,
            )

    atexit.register(_atexit_handler)

    # Also install SIGTERM handler (chains to previous)
    prev_handler = signal_mod.getsignal(signal_mod.SIGTERM)

    def _sigterm_handler(signum, frame):
        signal_error(
            service=svc,
            error_type="sigterm",
            message="Process received SIGTERM",
        )
        # Chain to previous handler
        if callable(prev_handler):
            prev_handler(signum, frame)
        else:
            raise SystemExit(128 + signum)

    try:
        signal_mod.signal(signal_mod.SIGTERM, _sigterm_handler)
    except (OSError, ValueError):
        pass  # can't set signal handler (not main thread, etc.)
```

**Step 4: Run tests**

Run: `python -m pytest lib/tests/test_doctor_signal.py -v`
Expected: All pass

**Step 5: Commit**

```bash
git add lib/doctor_signal.py lib/tests/test_doctor_signal.py
git commit -m "feat(doctor): add signal library — zero-dep, fingerprinting, loguru sink"
```

---

## Task 2: Signals Directory + Watchdog Reader

Wire doctor daemon to watch `doctor/signals/incoming/` via watchdog, process through a queue.

**Files:**
- Create: `doctor/signals/.gitkeep` (and `incoming/`, `processed/` subdirs)
- Create: `doctor/signal_reader.py`
- Modify: `doctor/daemon.py` — integrate signal reader into poll loop
- Create: `doctor/tests/test_signal_reader.py`

**Step 1: Create signals directory structure**

```bash
mkdir -p doctor/signals/incoming doctor/signals/processed
touch doctor/signals/.gitkeep doctor/signals/incoming/.gitkeep doctor/signals/processed/.gitkeep
```

**Step 2: Write failing tests for signal reader**

```python
# doctor/tests/test_signal_reader.py
"""Tests for doctor signal reader — watchdog + queue processing."""
import json
import time
from pathlib import Path

import pytest


@pytest.fixture
def signals_dir(tmp_path):
    incoming = tmp_path / "incoming"
    incoming.mkdir()
    processed = tmp_path / "processed"
    processed.mkdir()
    return tmp_path


def _write_signal(incoming: Path, service="svc", error_type="KeyError", fp="abc123"):
    """Helper to write a signal file."""
    data = {
        "event_id": f"evt-{time.time_ns()}",
        "timestamp": "2026-03-09T12:00:00Z",
        "service": service,
        "type": error_type,
        "severity": "med",
        "fingerprint": fp,
        "details": {"error_msg": f"{error_type}: test"},
        "source": "test",
    }
    path = incoming / f"{service}_{time.time_ns()}.json"
    path.write_text(json.dumps(data))
    return path


def test_process_signal_file(signals_dir):
    from doctor.signal_reader import process_signal_file

    path = _write_signal(signals_dir / "incoming")
    event = process_signal_file(path)
    assert event is not None
    assert event["service"] == "svc"
    assert event["fingerprint"] == "abc123"


def test_process_moves_to_processed(signals_dir):
    from doctor.signal_reader import process_signal_file

    incoming = signals_dir / "incoming"
    processed = signals_dir / "processed"
    path = _write_signal(incoming)
    name = path.name

    process_signal_file(path, processed_dir=processed)

    assert not path.exists()
    assert (processed / name).exists()


def test_scan_incoming_catches_all(signals_dir):
    from doctor.signal_reader import scan_incoming

    incoming = signals_dir / "incoming"
    _write_signal(incoming, fp="fp1")
    _write_signal(incoming, fp="fp2")
    _write_signal(incoming, fp="fp3")

    events = scan_incoming(incoming, signals_dir / "processed")
    assert len(events) == 3


def test_prune_processed(signals_dir):
    from doctor.signal_reader import prune_processed

    processed = signals_dir / "processed"
    # Create an "old" file (we'll mock the age check)
    old_file = processed / "old.json"
    old_file.write_text("{}")
    # Set mtime to 8 days ago
    import os
    old_mtime = time.time() - (8 * 86400)
    os.utime(old_file, (old_mtime, old_mtime))

    new_file = processed / "new.json"
    new_file.write_text("{}")

    prune_processed(processed, max_age_days=7)
    assert not old_file.exists()
    assert new_file.exists()
```

**Step 3: Run tests to verify they fail**

Run: `python -m pytest doctor/tests/test_signal_reader.py -v`
Expected: ImportError

**Step 4: Implement `doctor/signal_reader.py`**

```python
#!/usr/bin/env python
"""Doctor signal reader — watches signals/incoming/ for new signal files.

Processes via queue for contention-free SQLite writes.
"""
import json
import os
import queue
import threading
import time
from pathlib import Path

from loguru import logger

PROCESSED_MAX_AGE_DAYS = 7


def process_signal_file(
    path: Path,
    processed_dir: Path | None = None,
) -> dict | None:
    """Read and validate a signal file. Optionally move to processed/."""
    try:
        data = json.loads(path.read_text())
    except (json.JSONDecodeError, OSError) as e:
        logger.warning("invalid signal file {}: {}", path.name, e)
        if processed_dir:
            _move_to_processed(path, processed_dir)
        return None

    # Minimal validation
    required = {"event_id", "service", "type", "fingerprint"}
    if not required.issubset(data.keys()):
        logger.warning("signal file {} missing fields: {}", path.name, required - data.keys())
        if processed_dir:
            _move_to_processed(path, processed_dir)
        return None

    if processed_dir:
        _move_to_processed(path, processed_dir)

    return data


def _move_to_processed(src: Path, processed_dir: Path):
    """Move signal file to processed directory."""
    try:
        dest = processed_dir / src.name
        os.rename(str(src), str(dest))
    except OSError as e:
        logger.debug("failed to move {} to processed: {}", src.name, e)
        try:
            src.unlink(missing_ok=True)
        except OSError:
            pass


def scan_incoming(incoming_dir: Path, processed_dir: Path) -> list[dict]:
    """Scan incoming/ for all signal files. Returns list of parsed events."""
    events = []
    try:
        files = sorted(incoming_dir.glob("*.json"))
    except OSError:
        return events

    for path in files:
        event = process_signal_file(path, processed_dir=processed_dir)
        if event:
            events.append(event)

    return events


def prune_processed(processed_dir: Path, max_age_days: int = PROCESSED_MAX_AGE_DAYS):
    """Delete processed signal files older than max_age_days."""
    if not processed_dir.is_dir():
        return
    cutoff = time.time() - (max_age_days * 86400)
    count = 0
    for path in processed_dir.glob("*.json"):
        try:
            if path.stat().st_mtime < cutoff:
                path.unlink()
                count += 1
        except OSError:
            pass
    if count:
        logger.debug("pruned {} old processed signals", count)


class SignalWatcher:
    """Watches signals/incoming/ via watchdog, feeds events through a queue."""

    def __init__(self, signals_dir: Path, callback):
        """
        Args:
            signals_dir: Path to doctor/signals/ (parent of incoming/ and processed/)
            callback: Function called with each parsed event dict
        """
        self.incoming = signals_dir / "incoming"
        self.processed = signals_dir / "processed"
        self.callback = callback
        self._queue: queue.Queue = queue.Queue()
        self._stop = threading.Event()
        self._observer = None
        self._consumer = None

    def start(self):
        """Start watching. Non-blocking."""
        self.incoming.mkdir(parents=True, exist_ok=True)
        self.processed.mkdir(parents=True, exist_ok=True)

        # Start consumer thread (single writer to SQLite)
        self._consumer = threading.Thread(target=self._consume, daemon=True)
        self._consumer.start()

        # Start watchdog observer
        try:
            from watchdog.observers import Observer
            from watchdog.events import FileSystemEventHandler, FileCreatedEvent

            class Handler(FileSystemEventHandler):
                def __init__(self, q):
                    self._q = q

                def on_created(self, event):
                    if not event.is_directory and event.src_path.endswith(".json"):
                        self._q.put("scan")

            self._observer = Observer()
            self._observer.schedule(Handler(self._queue), str(self.incoming), recursive=False)
            self._observer.start()
        except ImportError:
            logger.warning("watchdog not installed, signal reader using poll-only mode")

        # Catch-up scan (after watchdog is registered — handles race window)
        self._queue.put("scan")

        logger.info("signal watcher started on {}", self.incoming)

    def stop(self):
        """Stop watching."""
        self._stop.set()
        self._queue.put(None)  # unblock consumer
        if self._observer:
            self._observer.stop()
            self._observer.join(timeout=5)
        if self._consumer:
            self._consumer.join(timeout=5)

    def _consume(self):
        """Consumer thread — processes queue items sequentially."""
        while not self._stop.is_set():
            try:
                item = self._queue.get(timeout=1)
            except queue.Empty:
                continue

            if item is None:
                break

            if item == "scan":
                events = scan_incoming(self.incoming, self.processed)
                for event in events:
                    try:
                        self.callback(event)
                    except Exception as e:
                        logger.error("signal callback error: {}", e)
```

**Step 5: Run tests**

Run: `python -m pytest doctor/tests/test_signal_reader.py -v`
Expected: All pass

**Step 6: Commit**

```bash
git add doctor/signals/ doctor/signal_reader.py doctor/tests/test_signal_reader.py
git commit -m "feat(doctor): signal reader with watchdog, queue, processed pruning"
```

---

## Task 3: Events Table

Add `doctor_events` table to the existing doctor DB schema.

**Files:**
- Modify: `doctor/doctor_db.py` — add events table + CRUD functions
- Create: `doctor/tests/test_events_db.py`

**Step 1: Write failing tests**

```python
# doctor/tests/test_events_db.py
"""Tests for doctor_events table."""
import pytest
from doctor.doctor_db import init_db, record_event, get_events


@pytest.fixture
def conn():
    c = init_db(":memory:")
    yield c
    c.close()


def test_record_event(conn):
    record_event(
        conn,
        event_type="signal_received",
        project="auth",
        summary="KeyError in auth.views",
        fingerprint="abc123",
        signal_type="code_bug",
    )
    events = get_events(conn)
    assert len(events) == 1
    assert events[0]["project"] == "auth"
    assert events[0]["fingerprint"] == "abc123"


def test_get_events_filters(conn):
    record_event(conn, event_type="fix_start", project="auth", summary="fixing")
    record_event(conn, event_type="signal_received", project="jobs", summary="error")
    record_event(conn, event_type="fix_complete", project="auth", summary="done")

    auth_events = get_events(conn, project="auth")
    assert len(auth_events) == 2

    fix_events = get_events(conn, event_type="fix_start")
    assert len(fix_events) == 1


def test_get_events_limit(conn):
    for i in range(20):
        record_event(conn, event_type="scan", project="svc", summary=f"scan {i}")

    events = get_events(conn, limit=5)
    assert len(events) == 5
```

**Step 2: Run to verify fail**

Run: `python -m pytest doctor/tests/test_events_db.py -v`
Expected: ImportError — `record_event` doesn't exist

**Step 3: Add to `doctor/doctor_db.py`**

Add the CREATE TABLE to `init_db()` and add `record_event()` + `get_events()` functions. Also add the fingerprint index.

Key additions to `init_db()`:
```python
conn.execute("""
    CREATE TABLE IF NOT EXISTS doctor_events (
        id          INTEGER PRIMARY KEY,
        ts          TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%f', 'now')),
        event_type  TEXT NOT NULL,
        project     TEXT NOT NULL,
        summary     TEXT NOT NULL,
        detail_json TEXT,
        status      TEXT DEFAULT 'info',
        fingerprint TEXT,
        branch      TEXT,
        commit_hash TEXT,
        duration_ms INTEGER,
        signal_type TEXT,
        correlation_id TEXT,
        actor       TEXT DEFAULT 'doctor'
    )
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_project_ts ON doctor_events(project, ts DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_type_ts ON doctor_events(event_type, ts DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_correlation ON doctor_events(correlation_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_fingerprint ON doctor_events(fingerprint)")
```

New functions:
```python
def record_event(conn, *, event_type, project, summary,
                 detail_json=None, status="info", fingerprint=None,
                 branch=None, commit_hash=None, duration_ms=None,
                 signal_type=None, correlation_id=None, actor="doctor"):
    conn.execute(
        """INSERT INTO doctor_events
           (event_type, project, summary, detail_json, status, fingerprint,
            branch, commit_hash, duration_ms, signal_type, correlation_id, actor)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (event_type, project, summary, detail_json, status, fingerprint,
         branch, commit_hash, duration_ms, signal_type, correlation_id, actor),
    )
    conn.commit()


def get_events(conn, *, project=None, event_type=None, fingerprint=None,
               since=None, limit=50):
    clauses, params = [], []
    if project:
        clauses.append("project = ?")
        params.append(project)
    if event_type:
        clauses.append("event_type = ?")
        params.append(event_type)
    if fingerprint:
        clauses.append("fingerprint = ?")
        params.append(fingerprint)
    if since:
        clauses.append("ts > ?")
        params.append(since)
    where = "WHERE " + " AND ".join(clauses) if clauses else ""
    rows = conn.execute(
        f"SELECT * FROM doctor_events {where} ORDER BY ts DESC LIMIT ?",
        (*params, limit),
    ).fetchall()
    return [dict(r) for r in rows]
```

**Step 4: Run tests**

Run: `python -m pytest doctor/tests/test_events_db.py -v`
Expected: All pass

**Step 5: Commit**

```bash
git add doctor/doctor_db.py doctor/tests/test_events_db.py
git commit -m "feat(doctor): add doctor_events table with CRUD"
```

---

## Task 4: Branch Naming + Commit Tagging

Update fix.py to use `dr/` prefix, fingerprint suffix, `[doctor]` commit tag.

**Files:**
- Modify: `doctor/fix.py` — branch naming, commit message format
- Modify: `doctor/daemon.py` — pass fingerprint to fix functions

**Step 1: Update branch naming in `fix.py`**

Find the branch name construction (currently `fix/{project}-{slug}-{timestamp}`). Change to:

```python
def _make_branch_name(project: str, error_slug: str, fingerprint: str = "") -> str:
    """Generate doctor fix branch name with provenance."""
    import re
    import secrets
    from datetime import datetime
    slug = re.sub(r'[^a-z0-9-]', '', error_slug.lower().replace(' ', '-'))[:30]
    ts = datetime.now().strftime("%H%M%S")
    fp8 = fingerprint[:8] if fingerprint else secrets.token_hex(4)
    return f"dr/{project}-{slug}-{ts}-{fp8}"
```

**Step 2: Update commit message format**

Find `commit_fix()` or wherever commit messages are built. Prefix with `[doctor]`:

```python
message = f"[doctor] fix: {description}"
```

**Step 3: Test manually**

Run: `python -c "from doctor.fix import _make_branch_name; print(_make_branch_name('auth', 'KeyError user_id', 'abc123def456'))"`
Expected: `dr/auth-keyerror-user-id-HHMMSS-abc123de`

**Step 4: Commit**

```bash
git add doctor/fix.py doctor/daemon.py
git commit -m "feat(doctor): dr/ branch naming with fingerprint, [doctor] commit tags"
```

---

## Task 5: Fix Attempt Limits + Timeout Safety

Add max attempts, cooldown, concurrent lock, and process group kill.

**Files:**
- Modify: `doctor/doctor_db.py` — add `fix_attempts`, `last_fix_at`, `fix_started_at` columns to issues table
- Modify: `doctor/daemon.py` — check limits before spawning fix, add timeout with killpg
- Create: `doctor/tests/test_fix_limits.py`

**Step 1: Add columns to issues table**

In `init_db()`, add migration:
```python
# Migration: add fix tracking columns
for col, default in [
    ("fix_attempts", "0"),
    ("last_fix_at", "NULL"),
    ("fix_started_at", "NULL"),
]:
    try:
        conn.execute(f"ALTER TABLE issues ADD COLUMN {col} DEFAULT {default}")
    except Exception:
        pass  # column already exists
```

Add helper functions:
```python
def can_attempt_fix(conn, fingerprint, max_attempts=2, cooldown_seconds=1800) -> bool:
    """Check if a fix attempt is allowed (under limits + cooldown)."""
    row = conn.execute(
        "SELECT fix_attempts, last_fix_at FROM issues WHERE fingerprint = ?",
        (fingerprint,),
    ).fetchone()
    if not row:
        return True
    if row["fix_attempts"] >= max_attempts:
        return False
    if row["last_fix_at"]:
        from datetime import datetime, timezone
        last = datetime.fromisoformat(row["last_fix_at"])
        elapsed = (datetime.now(timezone.utc) - last).total_seconds()
        if elapsed < cooldown_seconds:
            return False
    return True


def record_fix_attempt(conn, fingerprint):
    """Increment fix attempt counter and set timestamps."""
    conn.execute(
        """UPDATE issues SET
           fix_attempts = fix_attempts + 1,
           last_fix_at = strftime('%Y-%m-%dT%H:%M:%f', 'now'),
           fix_started_at = strftime('%Y-%m-%dT%H:%M:%f', 'now'),
           status = 'fixing'
           WHERE fingerprint = ?""",
        (fingerprint,),
    )
    conn.commit()


def clear_fix_lock(conn, fingerprint):
    """Clear the fixing lock (after fix completes or times out)."""
    conn.execute(
        "UPDATE issues SET fix_started_at = NULL WHERE fingerprint = ?",
        (fingerprint,),
    )
    conn.commit()


def recover_stale_locks(conn, timeout_seconds=900):
    """Reset fixing locks older than timeout. Called on startup + periodically."""
    conn.execute(
        """UPDATE issues SET status = 'new', fix_started_at = NULL
           WHERE status = 'fixing'
           AND fix_started_at < strftime('%Y-%m-%dT%H:%M:%f', 'now', ?)""",
        (f"-{timeout_seconds} seconds",),
    )
    conn.commit()
```

**Step 2: Update daemon.py fix dispatch**

In `_run_auto_fix()`, wrap the fix attempt:
```python
# Before spawning fix:
if not can_attempt_fix(conn, err["fingerprint"]):
    logger.info("fix skipped for {} (limit/cooldown)", err["fingerprint"][:12])
    return

record_fix_attempt(conn, err["fingerprint"])
try:
    # ... existing fix logic with timeout ...
    # Use os.killpg for process group kill on timeout
finally:
    clear_fix_lock(conn, err["fingerprint"])
```

Add `recover_stale_locks(conn)` call at daemon startup and every `CLEANUP_INTERVAL`.

**Step 3: Write tests**

```python
# doctor/tests/test_fix_limits.py
def test_can_attempt_fix_under_limit(conn):
    ...

def test_can_attempt_fix_over_limit(conn):
    ...

def test_cooldown_blocks_retry(conn):
    ...

def test_recover_stale_locks(conn):
    ...
```

**Step 4: Run tests, commit**

```bash
git add doctor/doctor_db.py doctor/daemon.py doctor/tests/test_fix_limits.py
git commit -m "feat(doctor): fix attempt limits, cooldown, stale lock recovery"
```

---

## Task 6: Integrate Signal Reader into Daemon

Wire the signal watcher into the daemon's main loop so signals flow through triage.

**Files:**
- Modify: `doctor/daemon.py` — start SignalWatcher, handle signal events

**Step 1: Add signal callback in daemon**

```python
def _handle_signal_event(event: dict):
    """Process a signal file event through the triage pipeline."""
    conn = init_db()
    fp = event["fingerprint"]
    project = event.get("service", "unknown")  # map service → project

    # Dedup: check if already seen
    existing = get_issues(conn, fingerprint=fp)
    if existing:
        # Update last_seen
        upsert_issue(conn, fp, project, "signal", event["type"],
                     event["details"].get("error_msg", ""), event.get("source", ""))
        return

    # New issue — record + triage
    upsert_issue(conn, fp, project, "signal", event["type"],
                 event["details"].get("error_msg", ""), event.get("source", ""))

    record_event(conn, event_type="signal_received", project=project,
                 summary=f"{event['type']}: {event['details'].get('error_msg', '')[:80]}",
                 fingerprint=fp, signal_type=event["type"])

    # Triage + dispatch (same as log-based errors)
    # ... reuse existing triage logic ...
```

**Step 2: Start watcher in poll_loop**

```python
async def poll_loop():
    conn = init_db()
    # ... existing setup ...

    # Start signal watcher
    signals_dir = RIVUS_ROOT / "doctor" / "signals"
    watcher = SignalWatcher(signals_dir, _handle_signal_event)
    watcher.start()

    # Recover stale locks on startup
    recover_stale_locks(conn)

    try:
        # ... existing loop (log scanning stays as baseline) ...
    finally:
        watcher.stop()
```

**Step 3: Test end-to-end**

Write a signal file manually, verify daemon picks it up:
```bash
python -c "
from lib.doctor_signal import signal_error, configure
configure(signals_dir='doctor/signals/incoming')
signal_error('test-svc', 'KeyError', 'KeyError: test', signals_dir='doctor/signals/incoming')
"
# Check doctor/signals/processed/ for the moved file
```

**Step 4: Commit**

```bash
git add doctor/daemon.py
git commit -m "feat(doctor): integrate signal reader into daemon loop"
```

---

## Task 7: Signal Writers (Jobs Framework)

Add `code_bug` → doctor signal in the jobs framework.

**Files:**
- Modify: `jobs/lib/runner.py` or `jobs/lib/diagnosis.py` — call `signal_error()` on code_bug classification

**Step 1: Find where errors are classified as code bugs**

Read `jobs/lib/diagnosis.py` and `jobs/lib/runner.py` to find where error classification happens. Add a call to `signal_error()` when the classification is `code_bug`.

```python
from lib.doctor_signal import signal_error

# After error is classified as code_bug:
if error_class == "code_bug":
    signal_error(
        service=f"jobs/{job_name}",
        error_type=error_type,
        message=error_message,
        traceback=traceback_text,
        extra={"job": job_name, "item_key": item_key},
    )
```

**Step 2: Commit**

```bash
git add jobs/lib/runner.py  # or wherever the change lands
git commit -m "feat(jobs): signal code_bug errors to doctor"
```

---

## Task 8: Signal Writers (launchd Wrapper)

Create a wrapper script for launchd-managed services that writes a signal on non-zero exit.

**Files:**
- Create: `bin/doctor-wrapper.sh`
- Modify: `infra/README.md` — document wrapper usage

**Step 1: Create wrapper**

```bash
#!/bin/bash
# bin/doctor-wrapper.sh — wraps a service for doctor crash signaling
# Usage in launchd plist: ProgramArguments: ["/path/to/doctor-wrapper.sh", "service-name", "real-binary", "args..."]

SERVICE="$1"
shift
SIGNALS_DIR="$(dirname "$0")/../doctor/signals/incoming"

"$@"
EXIT_CODE=$?

if [ $EXIT_CODE -ne 0 ]; then
    mkdir -p "$SIGNALS_DIR"
    EVENT_ID=$(python3 -c "import uuid; print(uuid.uuid4().hex[:12])" 2>/dev/null || echo "unknown")
    TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
    FILENAME="${SERVICE}_$(date -u +%Y%m%d-%H%M%S)_${EVENT_ID}.json"

    cat > "$SIGNALS_DIR/$FILENAME" <<ENDJSON
{
  "event_id": "$EVENT_ID",
  "timestamp": "$TS",
  "service": "$SERVICE",
  "type": "crash",
  "severity": "high",
  "fingerprint": "crash-${SERVICE}-${EXIT_CODE}",
  "details": {"exit_code": $EXIT_CODE, "error_msg": "Process exited with code $EXIT_CODE"},
  "source": "launchd"
}
ENDJSON
fi

exit $EXIT_CODE
```

**Step 2: Make executable, commit**

```bash
chmod +x bin/doctor-wrapper.sh
git add bin/doctor-wrapper.sh
git commit -m "feat(doctor): launchd crash wrapper script for signal generation"
```

---

## Tasks 9-12 (deferred — implement after signal pipeline is proven)

These should be implemented after tasks 1-8 are running and producing real signals:

- **Task 9**: Auto-merge Phase 1 (dry-run) — log what would merge
- **Task 10**: Notification deduplication + attention budget
- **Task 11**: Log staleness detection (§6 of design)
- **Task 12**: CLI integration (`ops doctor events`, `ops doctor status`)

Each follows the same TDD pattern. Defer until signal pipeline has run for a few days and we've seen real signal quality.

---

## Verification Checklist

After Tasks 1-8:
- [ ] `python -m pytest lib/tests/test_doctor_signal.py -v` — all pass
- [ ] `python -m pytest doctor/tests/test_signal_reader.py -v` — all pass
- [ ] `python -m pytest doctor/tests/test_events_db.py -v` — all pass
- [ ] `python -m pytest doctor/tests/test_fix_limits.py -v` — all pass
- [ ] Manual: `signal_error()` writes to `doctor/signals/incoming/`
- [ ] Manual: daemon picks up signal, triages, records event
- [ ] Manual: fix attempt respects limits (2 max, 30 min cooldown)
- [ ] Manual: branch name starts with `dr/`, commit has `[doctor]` tag
- [ ] `doctor/signals/processed/` accumulates handled signals
