# Jobs Refactor: Clean Module Boundaries

**Date**: 2026-02-28
**Status**: Approved
**Context**: Multi-model evaluation (Codex 3, Gemini 3.1 Pro, Grok Code) + hands-on Prefect 3 spike concluded that refactoring the custom system beats adopting off-the-shelf. See `jobs/docs/prefect-evaluation.html` for the full analysis.

## Goal

Reduce accidental complexity in the jobs core. Same features, zero new dependencies, clear module boundaries. Each file should have a single responsibility and no function should exceed ~50 LOC of actual logic.

## Current State

| File | LOC | Functions | Problem |
|------|-----|-----------|---------|
| `runner.py` | 1,162 | 18 | `run_job()` has 11 params, test-mode flags leak everywhere |
| `lib/tracker.py` | 1,682 | 66 | God object: schema + CRUD + queries + events + control + versioning |
| `lib/discovery.py` | 1,805 | 17 strategies | Heavy boilerplate duplication, `hash_url()` × 5 |
| `lib/job.py` | 281 | — | Fine as-is |
| `lib/doctor.py` | 493 | — | Fine as-is |
| `lib/pacer.py` | 60 | — | Fine as-is |
| **Total** | **5,483** | | |

## Changes

### 1. Split `tracker.py` → `jobs/lib/db/` package

**Priority**: Highest — this is the god object causing most pain.

**Prerequisite**: Write integration tests covering the core state machine (pending → in_progress → done/failed, stage transitions, claiming, reconciliation) BEFORE splitting. Refactoring 1,682 LOC of stateful code without regression coverage is how you introduce subtle bugs.

Proposed structure:

```
jobs/lib/db/
├── __init__.py      # open_db(), open_raw_db() — WAL mode, timeout=30
├── schema.py        # ensure_schema(), migrations, DDL
├── items.py         # upsert_items(), get_pending(), mark_in_progress/done/failed,
│                    # claim logic, _sync_item_status(), stage lifecycle
├── jobs.py          # pause_job(), unpause_job(), wake_job(), set_job_paused(),
│                    # get_job_stats(), stage_stats(), stage_timing_stats()
├── events.py        # log_event(), get_events(), log_cost(), get_daily_cost()
└── results.py       # store_result(), get_result(), get_results(),
                     # get_handler_version(), get_stale_count(), reprocess_stale()
```

**Critical rule**: All modules import `open_db` / `open_raw_db` from `db/__init__.py`. No module creates its own connections. This prevents transaction isolation bugs when splitting a file that manages a SQLite WAL connection.

**Migration**: `from jobs.lib.tracker import X` → `from jobs.lib.db.items import X` (etc). Add re-exports in a compat shim `tracker.py` initially, remove after all consumers are updated.

### 2. `_sync_item_status()` → SQL view

Replace the Python read-modify-write pattern with a SQL view:

```sql
CREATE VIEW item_derived_status AS
SELECT
    wi.id AS item_id,
    wi.job_id,
    CASE
        WHEN EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status = 'failed')
            THEN 'failed'
        WHEN NOT EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status NOT IN ('done', 'skipped'))
            THEN 'done'
        WHEN EXISTS(SELECT 1 FROM item_stages s WHERE s.item_id = wi.id AND s.status = 'in_progress')
            THEN 'in_progress'
        ELSE 'pending'
    END AS derived_status
FROM work_items wi;
```

Stop writing to `work_items.status` directly. Dashboard and queries join against the view. Eliminates a class of race conditions and ~20 LOC of fragile Python logic.

**Performance**: For our scale (~10k items total), the correlated subqueries are fine. Add covering index `CREATE INDEX idx_item_stages_status ON item_stages(item_id, status)` if needed.

### 3. Merge `simple_worker()` into `stage_worker()`

A "simple" job (no explicit stages) is just a pipeline with one implicit stage named after the handler. `stage_worker()` handles both — delete `simple_worker()` entirely.

~200 LOC reduction + eliminates divergent behavior between the two worker paths.

### 4. `run_job()` → `RunContext` dataclass

Current 11-parameter signature:
```python
async def run_job(job, runner_cfg, pacer, resources,
                  single_pass, discover_only, limit,
                  active_items, test_mode, no_reset,
                  test_stage, test_item, dry_run)
```

Proposed:
```python
@dataclass
class RunContext:
    mode: Literal["production", "test", "dry_run"] = "production"
    limit: int | None = None
    test_stage: str | None = None
    test_item: str | None = None
    no_reset: bool = False
    discover_only: bool = False
    single_pass: bool = False
    active_items: set[int] | None = None

async def run_job(job: Job, runner_cfg: RunnerConfig, pacer: Pacer,
                  resources: ResourceRegistry, ctx: RunContext) -> None:
```

5 parameters. Production code checks `ctx.mode` instead of threading test flags through the call stack.

### 5. Discovery protocol + shared base

Create a `DiscoveryProtocol` with shared helpers:

```python
class BaseDiscovery:
    """Shared infrastructure: HTTP fetch, retry, normalization, key hashing."""

    async def discover(self, job: Job) -> list[dict]:
        raw = await self.fetch_items(job)
        return [self.normalize(item) for item in raw]

    async def fetch_items(self, job: Job) -> list[dict]:
        raise NotImplementedError

    def normalize(self, item: dict) -> dict:
        """Ensure item has 'key' and 'data' fields."""
        ...

    def hash_url(self, url: str) -> str:
        """MD5 truncation — single implementation replacing 5+ copies."""
        ...
```

Each strategy implements only `fetch_items()`. Target: 1,805 → ~700 LOC.

### 6. Delete dead code

| Dead code | Location | Why |
|-----------|----------|-----|
| `stage_version_hash()` | tracker.py | Deprecated, replaced by `HANDLER_VERSION`. Still used as fallback for ~12 handlers — those handlers should declare `HANDLER_VERSION` first. |
| Dual cost table query | `get_daily_cost()` | Legacy compat querying both `events` and `job_cost_log`. Migration is done, remove the shim. |
| `get_stages_dict()` | tracker.py | Reconstructs old suffix-key format from normalized `item_stages`. Dashboard should read from `item_stages` directly. |
| `outputs:` field in YAML | jobs.yaml + StageConfig | Decorative documentation, no code reads it. Remove from YAML, add as comments if needed. |
| `auto_unpause_count` / `last_auto_unpause_at` | job_state table | V3 plan marked for removal. `set_job_paused()` still writes them. Drop columns. |

## Order of Operations

1. **Write tracker integration tests** (prerequisite for everything)
2. **Delete dead code** (#6) — free complexity reduction, no risk
3. **Split tracker.py** (#1) — biggest structural improvement
4. **SQL view for item status** (#2) — can do during or after tracker split
5. **RunContext dataclass** (#4) — quick win, touch runner.py only
6. **Merge workers** (#3) — requires understanding both paths well
7. **Discovery protocol** (#5) — largest LOC reduction, can be incremental (strategy by strategy)

## Success Criteria

- Each file in `jobs/lib/db/` has <400 LOC and a single responsibility
- `run_job()` has ≤5 parameters
- No deprecated code paths remain
- All existing tests pass
- `inv jobs.test -j dumb_money_live` works identically before and after
- Dashboard shows same data

## Non-Goals

- No new features (self-healing pipelines, validation circuit breaker — separate effort)
- No framework adoption (Prefect, Celery, etc.)
- No schema changes beyond dropping dead columns
- No handler modifications (they stay as-is)
