## Status: COMPLETE

All 14 tasks done as of 2026-03-10. Tracker split into `jobs/lib/db/` package, `RunContext` dataclass replaces 11 params, SQL view for derived status, discovery base class with template method pattern. 13 strategies use `fetch_items()`, 9 complex ones retain custom `discover()`.

---

# Jobs Refactor Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Split the tracker god object, clean up runner params, refactor discovery boilerplate. Same features, zero new deps.

**Architecture:** Extract `jobs/lib/tracker.py` (1,682 LOC, 66 functions) into a `jobs/lib/db/` package with focused modules. Replace Python-side item status derivation with a SQL view. Collapse `run_job()`'s 11 parameters into a `RunContext` dataclass. Refactor 22 discovery strategies to share a common base.

**Tech Stack:** Python 3.13, sqlite3, asyncio, pytest

**Key corrections from exploration:**
- No `simple_worker()` — already merged. Skip.
- `auto_unpause_count` is LIVE (wired in `helm/periodic.py`). Keep.
- `get_stages_dict()` is LIVE (used by dashboard `queries.py`). Keep.
- `stage_version_hash()` can't be deleted — most handlers lack `HANDLER_VERSION`. Keep for now.
- 22 strategies, not 17.
- 6 existing test files with good coverage of core state machine.

---

### Task 1: Verify existing tests pass

**Files:**
- Read: `jobs/tests/test_tracker.py`, `jobs/tests/test_stages.py`

**Step 1: Run all existing jobs tests**

Run: `python -m pytest jobs/tests/ -v --tb=short 2>&1 | tail -30`
Expected: All tests pass. Record the count.

**Step 2: Commit baseline (if needed)**

No code changes. Just verify green.

---

### Task 2: Add missing tracker tests for split safety

**Files:**
- Modify: `jobs/tests/test_tracker.py`

We need tests for functions that will move to different modules during the split. The existing tests cover upsert, workflow, events, stats, merge, and retry. Missing:

**Step 1: Add pause/unpause tests**

```python
class TestJobPause:
    def test_pause_and_check(self, conn):
        from jobs.lib.tracker import set_job_paused, is_job_paused, get_pause_reason
        set_job_paused(conn, "j1", True, reason="cost limit")
        assert is_job_paused(conn, "j1")
        assert get_pause_reason(conn, "j1") == "cost limit"

    def test_unpause(self, conn):
        from jobs.lib.tracker import set_job_paused, is_job_paused
        set_job_paused(conn, "j1", True, reason="test")
        set_job_paused(conn, "j1", False)
        assert not is_job_paused(conn, "j1")

    def test_get_all_paused(self, conn):
        from jobs.lib.tracker import set_job_paused, get_all_job_paused
        set_job_paused(conn, "j1", True, reason="r1")
        set_job_paused(conn, "j2", True, reason="r2")
        paused = get_all_job_paused(conn)
        assert "j1" in paused
        assert "j2" in paused
```

**Step 2: Add results/version tests**

```python
class TestResults:
    def test_store_and_get_result(self, conn):
        from jobs.lib.tracker import store_result, get_result
        upsert_items(conn, "j1", [{"key": "k1"}])
        store_result(conn, "j1", "k1", "fetch", {"url": "http://example.com"}, handler_version="1.0")
        r = get_result(conn, "j1", "k1", "fetch")
        assert r["url"] == "http://example.com"

    def test_get_stale_count(self, conn):
        from jobs.lib.tracker import store_result, get_stale_count
        upsert_items(conn, "j1", [{"key": "k1"}, {"key": "k2"}])
        store_result(conn, "j1", "k1", "fetch", {}, handler_version="1.0")
        store_result(conn, "j1", "k2", "fetch", {}, handler_version="1.0")
        assert get_stale_count(conn, "j1", "fetch", "2.0") == 2
        assert get_stale_count(conn, "j1", "fetch", "1.0") == 0
```

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/test_tracker.py -v --tb=short`
Expected: All pass including new tests.

**Step 4: Commit**

```bash
git add jobs/tests/test_tracker.py
git commit -m "test(jobs): add pause/unpause and results tests before tracker split"
```

---

### Task 3: Delete verified dead code

**Files:**
- Modify: `jobs/lib/tracker.py`

**Step 1: Remove dual cost table query in `get_daily_cost()`**

Find the `get_daily_cost()` function. It queries both `events` table and legacy `job_cost_log` table. Remove the `job_cost_log` branch — verify the table doesn't exist in production DB first:

Run: `python -c "from jobs.lib.tracker import open_raw_db; c = open_raw_db(); print([t['name'] for t in c.execute(\"SELECT name FROM sqlite_master WHERE type='table'\").fetchall()]); c.close()"`

If `job_cost_log` is not in the list, remove the branch. If it IS in the list, check if it has any rows: `SELECT COUNT(*) FROM job_cost_log`.

**Step 2: Remove `outputs:` field from StageConfig if present**

Check `jobs/lib/job.py` — if `StageConfig` has an `outputs` field, remove it. Check `jobs/jobs.yaml` — the `outputs:` YAML keys are documentation-only. Leave them as YAML comments or remove entirely.

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`
Expected: All pass.

**Step 4: Commit**

```bash
git add jobs/lib/tracker.py jobs/lib/job.py
git commit -m "chore(jobs): remove dead cost table query and unused outputs field"
```

---

### Task 4: Create `jobs/lib/db/` package — connection module

**Files:**
- Create: `jobs/lib/db/__init__.py`
- Modify: `jobs/lib/tracker.py` (extract `open_db`, `open_raw_db`, `DB_PATH`, `WAKE_DIR`, `ensure_schema`)

**Step 1: Create `jobs/lib/db/__init__.py`**

Move connection factory and schema setup from `tracker.py`. The `__init__.py` should export:
- `open_db(db_path=None)` — WAL mode, row_factory, timeout=30, calls ensure_schema
- `open_raw_db(db_path=None)` — same but returns raw connection
- `DB_PATH` — default path constant
- `WAKE_DIR` — wake directory constant
- `ensure_schema(conn)` — CREATE TABLE IF NOT EXISTS for all tables + indexes

**Step 2: Update `tracker.py` to import from `db`**

Replace local definitions with:
```python
from jobs.lib.db import open_db, open_raw_db, DB_PATH, WAKE_DIR, ensure_schema
```

All existing `from jobs.lib.tracker import open_raw_db` imports continue to work because tracker re-exports.

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`
Expected: All pass.

**Step 4: Commit**

```bash
git add jobs/lib/db/__init__.py jobs/lib/tracker.py
git commit -m "refactor(jobs): extract db connection factory to jobs/lib/db/"
```

---

### Task 5: Extract `db/items.py` — work item CRUD

**Files:**
- Create: `jobs/lib/db/items.py`
- Modify: `jobs/lib/tracker.py`

**Step 1: Move work item functions to `db/items.py`**

Move these functions (keep signatures identical):
- `upsert_items()`
- `merge_discovery_data()`
- `get_pending()`, `get_failed()`
- `mark_in_progress()`, `mark_done()`, `mark_failed()`, `mark_skipped()`
- `retry_failed()`
- `inject_at_stage()`
- `update_item_priority()`
- `get_pending_count()`

Stage lifecycle functions:
- `mark_stage_in_progress()`, `mark_stage_done()`, `mark_stage_failed()`, `mark_stage_retry_later()`
- `get_items_needing_stage()`, `get_stage_status()`, `get_stages_dict()`
- `_sync_item_status()` (internal)

Recovery functions:
- `reset_in_progress_stages()`, `reclaim_stuck_items()`, `reclaim_stuck_stages()`
- `reopen_items_for_new_stages()`, `skip_disabled_stages()`
- `reset_stage()`, `reprocess_fast_async()`

**Step 2: Add re-exports to `tracker.py`**

```python
from jobs.lib.db.items import (
    upsert_items, merge_discovery_data, get_pending, get_failed,
    mark_in_progress, mark_done, mark_failed, mark_skipped,
    # ... all moved functions
)
```

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`
Expected: All pass — imports haven't changed for consumers.

**Step 4: Commit**

```bash
git add jobs/lib/db/items.py jobs/lib/tracker.py
git commit -m "refactor(jobs): extract work item CRUD to db/items.py"
```

---

### Task 6: Extract `db/jobs.py` — job state + stats

**Files:**
- Create: `jobs/lib/db/jobs.py`
- Modify: `jobs/lib/tracker.py`

**Step 1: Move job-level functions**

- `set_job_paused()`, `is_job_paused()`, `get_pause_reason()`, `get_retry_info()`
- `get_all_job_paused()`, `get_jobs_due_unpause()`, `increment_auto_unpause()`
- `get_job_stats()`, `all_job_stats()`, `stage_stats()`, `stage_timing_stats()`
- `get_stage_pipeline_metrics()`
- `get_job_error()`
- `show_status()` (CLI display helper)
- `wake_job()`, `pause_job()`, `unpause_job()`, `clear_errors()`

**Step 2: Re-export from `tracker.py`**

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`

**Step 4: Commit**

```bash
git add jobs/lib/db/jobs.py jobs/lib/tracker.py
git commit -m "refactor(jobs): extract job state and stats to db/jobs.py"
```

---

### Task 7: Extract `db/events.py` — event log + cost

**Files:**
- Create: `jobs/lib/db/events.py`
- Modify: `jobs/lib/tracker.py`

**Step 1: Move event functions**

- `log_event()`, `get_events()`
- `log_cost()`, `get_daily_cost()`

**Step 2: Re-export from `tracker.py`**

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`

**Step 4: Commit**

```bash
git add jobs/lib/db/events.py jobs/lib/tracker.py
git commit -m "refactor(jobs): extract events and cost tracking to db/events.py"
```

---

### Task 8: Extract `db/results.py` — results + versioning

**Files:**
- Create: `jobs/lib/db/results.py`
- Modify: `jobs/lib/tracker.py`

**Step 1: Move results functions**

- `store_result()`, `get_result()`, `get_results()`
- `get_handler_version()`, `stage_version_hash()` (deprecated but still used)
- `get_stale_count()`, `reprocess_stale()`

**Step 2: Re-export from `tracker.py`**

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`

**Step 4: Commit**

```bash
git add jobs/lib/db/results.py jobs/lib/tracker.py
git commit -m "refactor(jobs): extract results and versioning to db/results.py"
```

---

### Task 9: Verify `tracker.py` is now a thin re-export shim

**Files:**
- Read: `jobs/lib/tracker.py`

**Step 1: Verify tracker.py is small**

After tasks 4-8, `tracker.py` should be mostly import re-exports. Verify:

Run: `wc -l jobs/lib/tracker.py`
Expected: <100 LOC (just imports + re-exports + maybe `_parse_retry_in` and `RetryLaterError`)

**Step 2: Run ALL tests across the project that touch tracker**

Run: `python -m pytest jobs/tests/ -v --tb=short`

Also verify the runner can parse jobs.yaml and start (dry check):
Run: `python -c "from jobs.lib.tracker import open_raw_db, upsert_items, get_pending, mark_done; print('all imports OK')"`

**Step 3: Verify the dashboard imports work**

Run: `python -c "from jobs.ui.queries import get_overview_data; print('dashboard imports OK')"`

**Step 4: Verify runner imports work**

Run: `python -c "from jobs.runner import cli; print('runner imports OK')"`

**Step 5: Verify helm imports work**

Run: `python -c "from helm.hooks.handler import handle_hook; print('helm imports OK')" 2>&1 || true`

---

### Task 10: SQL view for derived item status

**Files:**
- Modify: `jobs/lib/db/__init__.py` (add view to schema)
- Modify: `jobs/lib/db/items.py` (simplify `_sync_item_status`)

**Step 1: Write test for view-based status**

Add to `jobs/tests/test_tracker.py`:

```python
class TestDerivedStatus:
    def test_all_stages_done_means_item_done(self, conn):
        from jobs.lib.tracker import mark_stage_in_progress, mark_stage_done
        upsert_items(conn, "j1", [{"key": "k1"}])
        item_id = get_pending(conn, "j1")[0]["id"]
        stages = ["fetch", "extract"]
        mark_stage_in_progress(conn, item_id, "fetch", stages)
        mark_stage_done(conn, item_id, "fetch", stages)
        mark_stage_in_progress(conn, item_id, "extract", stages)
        mark_stage_done(conn, item_id, "extract", stages)

        row = conn.execute(
            "SELECT derived_status FROM item_derived_status WHERE item_id = ?",
            (item_id,)
        ).fetchone()
        assert row["derived_status"] == "done"

    def test_any_failed_means_item_failed(self, conn):
        from jobs.lib.tracker import mark_stage_in_progress, mark_stage_done, mark_stage_failed
        upsert_items(conn, "j1", [{"key": "k1"}])
        item_id = get_pending(conn, "j1")[0]["id"]
        stages = ["fetch", "extract"]
        mark_stage_in_progress(conn, item_id, "fetch", stages)
        mark_stage_done(conn, item_id, "fetch", stages)
        mark_stage_in_progress(conn, item_id, "extract", stages)
        mark_stage_failed(conn, item_id, "extract", stages, "timeout")

        row = conn.execute(
            "SELECT derived_status FROM item_derived_status WHERE item_id = ?",
            (item_id,)
        ).fetchone()
        assert row["derived_status"] == "failed"

    def test_in_progress_stage_means_in_progress(self, conn):
        from jobs.lib.tracker import mark_stage_in_progress
        upsert_items(conn, "j1", [{"key": "k1"}])
        item_id = get_pending(conn, "j1")[0]["id"]
        mark_stage_in_progress(conn, item_id, "fetch", ["fetch", "extract"])

        row = conn.execute(
            "SELECT derived_status FROM item_derived_status WHERE item_id = ?",
            (item_id,)
        ).fetchone()
        assert row["derived_status"] == "in_progress"
```

**Step 2: Run tests — expect FAIL (view doesn't exist yet)**

Run: `python -m pytest jobs/tests/test_tracker.py::TestDerivedStatus -v`

**Step 3: Add the view to `ensure_schema()`**

In `jobs/lib/db/__init__.py`, add to `ensure_schema()`:

```sql
CREATE VIEW IF NOT EXISTS item_derived_status AS
SELECT
    wi.id AS item_id,
    wi.job_id,
    CASE
        WHEN EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status = 'failed')
            THEN 'failed'
        WHEN NOT EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status NOT IN ('done', 'skipped'))
            THEN 'done'
        WHEN EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status = 'in_progress')
            THEN 'in_progress'
        ELSE 'pending'
    END AS derived_status
FROM work_items wi;
```

Also add covering index if not present:
```sql
CREATE INDEX IF NOT EXISTS idx_item_stages_item_status ON item_stages(item_id, status);
```

**Step 4: Run tests — expect PASS**

Run: `python -m pytest jobs/tests/test_tracker.py::TestDerivedStatus -v`

**Step 5: Commit**

```bash
git add jobs/lib/db/__init__.py jobs/tests/test_tracker.py
git commit -m "feat(jobs): add SQL view for derived item status"
```

Note: `_sync_item_status()` stays for now — dashboard and runner read `work_items.status` in many places. Switching consumers to the view is a separate follow-up.

---

### Task 11: `RunContext` dataclass

**Files:**
- Create: `jobs/lib/context.py`
- Modify: `jobs/runner.py`

**Step 1: Create `RunContext` dataclass**

```python
# jobs/lib/context.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal

@dataclass
class RunContext:
    """Runtime configuration — separates test/prod concerns from core orchestration."""
    mode: Literal["production", "test", "dry_run"] = "production"
    limit: int | None = None
    test_stage: str | None = None
    test_item: str | None = None
    no_reset: bool = False
    discover_only: bool = False
    single_pass: bool = False
    active_items: set[int] = field(default_factory=set)

    @property
    def is_test(self) -> bool:
        return self.mode == "test"

    @property
    def is_dry_run(self) -> bool:
        return self.mode == "dry_run"
```

**Step 2: Update `run_job()` signature in `runner.py`**

Replace the 11 parameters with `ctx: RunContext`. Update all internal references:
- `test_mode` → `ctx.is_test`
- `dry_run` → `ctx.is_dry_run`
- `limit` → `ctx.limit`
- `single_pass` → `ctx.single_pass`
- `discover_only` → `ctx.discover_only`
- `test_stage` → `ctx.test_stage`
- `test_item` → `ctx.test_item`
- `no_reset` → `ctx.no_reset`
- `active_items` → `ctx.active_items`

**Step 3: Update callers**

`run_job()` is called from:
- `main_loop()` in runner.py — construct `RunContext(mode="production", ...)`
- CLI `cli()` at bottom of runner.py — construct from Click args

**Step 4: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`

Also verify runner starts:
Run: `python -m jobs.runner --help`

**Step 5: Commit**

```bash
git add jobs/lib/context.py jobs/runner.py
git commit -m "refactor(jobs): collapse run_job params into RunContext dataclass"
```

---

### Task 12: Discovery base class enhancement

**Files:**
- Modify: `jobs/lib/discovery.py`

This is the largest change. Do it incrementally — enhance the base class first, then migrate strategies one group at a time.

**Step 1: Enhance `BaseDiscovery` with shared infrastructure**

```python
class BaseDiscovery:
    def __init__(self, params: dict):
        self.params = params

    async def discover(self) -> list[dict]:
        """Template method: fetch → deduplicate → log."""
        raw = await self.fetch_items()
        items = self._deduplicate(raw)
        logger.info(f"{self.__class__.__name__}: {len(items)} items")
        return items

    async def fetch_items(self) -> list[dict]:
        """Override this. Return list of {"key": str, "data": dict}."""
        raise NotImplementedError

    def _deduplicate(self, items: list[dict]) -> list[dict]:
        """Remove items with duplicate keys."""
        seen: set[str] = set()
        unique = []
        for item in items:
            key = item.get("key", "")
            if key and key not in seen:
                seen.add(key)
                unique.append(item)
        return unique

    @staticmethod
    def hash_url(url: str, length: int = 10) -> str:
        return hashlib.md5(url.encode()).hexdigest()[:length]

    def require_param(self, name: str, default=None):
        """Get a required param, log warning if missing."""
        val = self.params.get(name, default)
        if not val:
            logger.warning(f"{self.__class__.__name__}: missing required param '{name}'")
        return val
```

**Step 2: Extract shared constants**

```python
TBS_MAP = {"daily": "qdr:d2", "weekly": "qdr:w"}

def extract_video_id(url_or_id: str) -> str | None:
    """Extract YouTube video ID — shared across serper_youtube and youtube_video_list."""
    # ... move from SerperYouTubeDiscovery._extract_video_id
```

**Step 3: Migrate 3-5 simple strategies first**

Convert `tracker_query`, `manual`, `newsflow_sources`, `company_watchlist`, `rss_feeds` to use the new base. Each should override only `fetch_items()`.

**Step 4: Run tests**

Run: `python -m pytest jobs/tests/test_discovery.py -v --tb=short`

**Step 5: Commit**

```bash
git add jobs/lib/discovery.py
git commit -m "refactor(jobs): enhance discovery base class, migrate simple strategies"
```

**Step 6: Migrate remaining strategies in batches**

Do 5-6 strategies per commit. Run tests between each batch. Complex strategies (`serper_youtube`, `vic_atoz`, `vic_wayback_cdx`) may need to override `discover()` entirely — that's fine, the base class is a convenience not a requirement.

---

### Task 13: Update consumers to import from `db/` directly

**Files:**
- Modify: All files that import from `jobs.lib.tracker`

**Step 1: Update imports across the codebase**

Use grep to find all `from jobs.lib.tracker import` lines and update them to import from the specific `db/` module. This is optional — the re-export shim works — but reduces coupling.

Priority consumers (high-traffic imports):
- `jobs/runner.py`
- `jobs/ctl.py`
- `jobs/ui/queries.py`
- `helm/hooks/handler.py`

Lower priority (can wait):
- `jobs/handlers/*.py`
- `jobs/diagnose.py`, `jobs/failures.py`

**Step 2: Keep `tracker.py` shim for now**

Don't delete — external consumers may exist. Mark with a deprecation comment.

**Step 3: Run tests**

Run: `python -m pytest jobs/tests/ -v --tb=short`

**Step 4: Commit**

```bash
git add -A
git commit -m "refactor(jobs): update consumers to import from db/ modules"
```

---

### Task 14: Final verification

**Step 1: Run full test suite**

Run: `python -m pytest jobs/tests/ -v`

**Step 2: Verify runner starts and can process**

Run: `inv jobs.test -j dumb_money_live --dry-run 2>&1 | head -20`

**Step 3: Verify dashboard imports**

Run: `python -c "from jobs.app import demo; print('dashboard OK')"`

**Step 4: Check LOC**

Run: `wc -l jobs/lib/db/*.py jobs/lib/tracker.py jobs/runner.py jobs/lib/discovery.py jobs/lib/context.py`

Expected: db/ modules total ~1,700 LOC, tracker.py ~80 LOC (shim), runner.py ~1,100 LOC, discovery.py ~1,200 LOC.

**Step 5: Final commit if needed**

```bash
git add -A
git commit -m "refactor(jobs): complete tracker split and discovery cleanup"
```
