# Unified Work System V3: Jobs Cleanup + Autodo

## Context

V2 task system design reviewed by 4 frontier thinking models (opus, gpt-pro, grok, gemini). Key consensus: don't build a parallel system — extend the jobs framework. Deep review of the full jobs codebase (6 files, ~3600 LOC) + 4-model vario critique identified specific cleanup targets.

**This plan has three parts:**
1. Jobs tracker cleanup (schema + code, informed by 4-model consensus)
2. Core vs plugin separation (generic framework extraction)
3. Autodo extensions (`helm autodo` CLI for internal TODO management)

**Related docs:**
- `2026-02-26-jobs-review-findings.md` — deep agent review of all jobs modules
- `2026-02-26-jobs-vario-critique.md` — full 4-model vario critique (run ID f85a6336)

---

## Part 1: Jobs Tracker Cleanup

### 4-Model Consensus (what all models agree on)

| Issue | Verdict | Impact |
|-------|---------|--------|
| **Stages JSON** | Normalize to `item_stages` table | Fixes concurrency race (read-modify-write), enables indexed queries, eliminates suffix-convention parsing |
| **job_runs table** | Drop — dead weight | Nobody reads it; events absorb run tracking |
| **job_cost_log** | Absorb into events | Cost is just an event type with `data.cost_usd` |
| **job_state** | Simplify to 3-5 cols | Auto-unpause logic → runner (in-memory), errors → events |
| **Version hashing** | Replace `inspect.getsource()` with explicit semantic versioning | Source hashing breaks on formatting, comments, deployment; too sensitive |
| **upsert_items merge** | Extract domain logic out of tracker | DB layer shouldn't know about topics/queries/discovery_sources |
| **Pacer** | Fix or replace | Current impl is fixed-spacing not token-bucket; no burst; no shared state |
| **executor.py** | Keep, remove `jobs.` prefix | Useful error messages; just decouple the path prefix |
| **Atomic claiming** | Add `WHERE status = 'pending'` to mark_in_progress | Two workers can claim same item without this |

### Proposed Schema: 5 tables (+ 1 for autodo deps)

```sql
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;

-- ─── Work Items ──────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS work_items (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    item_key TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    priority REAL DEFAULT 0,           -- lower = higher priority
    data JSON,                         -- discovery metadata
    error TEXT,                        -- last error message
    attempts INTEGER DEFAULT 0,
    next_attempt_at TEXT,              -- NEW: retry scheduling (NULL = immediate)
    discovered_at TEXT DEFAULT (datetime('now')),
    last_attempt_at TEXT,

    -- Autodo extensions (NULL for regular jobs)
    parent_id INTEGER REFERENCES work_items(id),
    source_file TEXT,                  -- e.g. "learning/TODO.md"
    heading TEXT,                      -- heading context (H1 > H2 > H3)
    anchor TEXT,                       -- stable inline ID <!-- TODO:WI-042 -->
    notes TEXT,                        -- human-authored context
    origin TEXT,                       -- scan | check | manual | external

    UNIQUE(job_id, item_key)
);

CREATE INDEX IF NOT EXISTS idx_wi_job_status ON work_items(job_id, status);
CREATE INDEX IF NOT EXISTS idx_wi_job_key ON work_items(job_id, item_key);
CREATE INDEX IF NOT EXISTS idx_wi_parent ON work_items(parent_id);

-- ─── Item Stages (normalized from stages JSON) ──────────────────────
-- Replaces the stages JSON column. Each row = one stage for one item.
-- Enables indexed queries, eliminates read-modify-write race condition.
CREATE TABLE IF NOT EXISTS item_stages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    item_key TEXT NOT NULL,
    stage TEXT NOT NULL,
    status TEXT DEFAULT 'pending',     -- pending | in_progress | done | failed | retry_later | skipped
    started_at TEXT,
    ended_at TEXT,
    elapsed_s REAL,
    error TEXT,
    attempts INTEGER DEFAULT 0,
    updated_at TEXT DEFAULT (datetime('now')),
    UNIQUE(job_id, item_key, stage)
);

CREATE INDEX IF NOT EXISTS idx_is_job_stage_status ON item_stages(job_id, stage, status);
CREATE INDEX IF NOT EXISTS idx_is_job_item ON item_stages(job_id, item_key);

-- ─── Results (per-item per-stage output) ─────────────────────────────
CREATE TABLE IF NOT EXISTS results (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    item_key TEXT NOT NULL,
    stage TEXT NOT NULL,
    result JSON NOT NULL,
    handler_version TEXT,              -- explicit semantic version (e.g. "2026-02-26.1")
    created_at TEXT DEFAULT (datetime('now')),
    UNIQUE(job_id, item_key, stage)
);

CREATE INDEX IF NOT EXISTS idx_results_job_item ON results(job_id, item_key);
CREATE INDEX IF NOT EXISTS idx_results_job_stage ON results(job_id, stage);

-- ─── Job State (pause/resume) ────────────────────────────────────────
CREATE TABLE IF NOT EXISTS job_state (
    job_id TEXT PRIMARY KEY,
    paused INTEGER DEFAULT 0,
    pause_reason TEXT,
    retry_after TEXT,                  -- auto-unpause time
    updated_at TEXT DEFAULT (datetime('now'))
);

-- ─── Events (unified: cost, runs, diagnostics, lifecycle) ────────────
CREATE TABLE IF NOT EXISTS events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    event TEXT NOT NULL,               -- discovery, cost, run_start, run_end, pause, error, ...
    stage TEXT,
    item_key TEXT,
    detail TEXT,
    data JSON,                         -- structured payload (cost_usd for cost events, etc.)
    ts TEXT DEFAULT (datetime('now'))
);

CREATE INDEX IF NOT EXISTS idx_events_job_ts ON events(job_id, ts);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event, ts);

-- ─── Autodo: Cross-item dependencies ─────────────────────────────────
CREATE TABLE IF NOT EXISTS work_item_deps (
    item_id INTEGER NOT NULL REFERENCES work_items(id) ON DELETE CASCADE,
    depends_on_id INTEGER NOT NULL REFERENCES work_items(id) ON DELETE CASCADE,
    reason TEXT,
    PRIMARY KEY (item_id, depends_on_id)
);
```

### What changed vs V2 plan

**New in V3 (from vario consensus):**
- `item_stages` table replaces `stages` JSON column — fixes the concurrency trap all 4 models flagged
- `next_attempt_at` on work_items — retry scheduling instead of immediate re-selection of retry_later items
- `idx_results_job_stage` index — enables fast stale count queries
- `handler_version` now explicitly documented as semantic version string (not inspect.getsource hash)

**Dropped (same as V2):**
- `job_runs` table → events (`run_start`/`run_end`)
- `job_cost_log` table → events (`cost` type with `data.cost_usd`)
- `result_path` column on work_items (dead)
- `auto_unpause_count`, `last_auto_unpause_at`, `last_error` from job_state

**Added (same as V2, for autodo):**
- `parent_id`, `source_file`, `heading`, `anchor`, `notes`, `origin` on work_items
- `work_item_deps` table

### Code changes in tracker.py

Beyond schema, the tracker needs these code-level fixes:

1. **Atomic claiming**: `mark_stage_in_progress()` must use `UPDATE ... WHERE status = 'pending'` and check `cursor.rowcount == 1`. Currently two workers can claim the same item.

2. **Dead item recovery**: Periodic sweep of items `in_progress` for >30 min → reset to `pending`. The `last_attempt_at` column is already there.

3. **Max attempts enforcement**: `get_items_needing_stage()` must include `WHERE attempts < max_attempts`. Currently `attempts` is tracked but never checked.

4. **Extract merge logic**: Move `_merge_item_data()` out of tracker into a `MergeStrategy` callback. The tracker should accept a generic merge function, not hardcode knowledge of `topics`/`queries`/`discovery_sources`.

5. **Derive item status from stages**: Replace `_sync_item_status()` with a query that derives `work_items.status` from `item_stages` rows. Can be a view or a trigger. Eliminates write-time derivation bugs.

6. **Version hashing → explicit versioning**: Delete `stage_version_hash()` and `VERSION_DEPS`. Handlers expose `HANDLER_VERSION = "2026-02-26.1"` dict. `reprocess_stale` compares stored `handler_version` in results table against current handler version.

### Migration strategy

The existing `jobs.db` has data. Non-destructive migration:

1. Create `item_stages` table
2. Migrate data: `INSERT INTO item_stages SELECT ... FROM work_items` (unpack stages JSON)
3. Add new columns via ALTER TABLE (autodo extensions, `next_attempt_at`)
4. Create `work_item_deps` table
5. Migrate `job_cost_log` → `events` (INSERT INTO events SELECT...)
6. Rename `job_events` → `events`
7. Leave dead columns/tables in place (SQLite ALTER TABLE limitations), stop using them
8. After migration verified, drop stages JSON column usage from all code

---

## Part 2: Core vs Plugin Architecture

### Litmus test (from Opus)

> If you deleted a piece and the stage pipeline still runs items from pending→done, it's core. Everything else is a plugin.

### Core (generic work item engine)

| Module | What stays |
|--------|-----------|
| `tracker.py` | Schema (work_items + item_stages + results + job_state + events), lifecycle CRUD (upsert, claim, mark_done/failed, get_pending), stage state machine |
| `job.py` | `Job`, `StageConfig`, `PacingConfig`, `GuardsConfig` dataclasses, `load_jobs` YAML parser |
| `executor.py` | Handler resolution via importlib (without `jobs.` prefix) |
| `pacer.py` | Rate limiter (upgraded to token bucket with asyncio.Lock) |
| Runner loop | Stage worker poll → claim → process → backoff cycle |

### Plugin (rivus-specific, stays in rivus codebase)

| Module | What moves to plugin layer |
|--------|---------------------------|
| `_merge_item_data` | Domain-specific merge logic → `MergeStrategy` interface per job |
| `VERSION_DEPS` / `stage_version_hash` | → `Versioning` plugin (most users won't need source hashing) |
| `log_cost` / `get_daily_cost` | → cost tracking as event type (not every system tracks dollar costs) |
| Discovery strategies | All 17 strategies are rivus-specific |
| Handlers | All handler modules |
| Doctor | LLM error classification is elegant but rivus-specific |
| Dashboard | Gradio UI |
| `ctl.py` business logic | add-url routing, newsflow |
| Job config fields | `emoji`, `detail_columns`, `tags`, `url`, `pipeline`, `pipeline_step`, `checks` |

### Not extracting now

The core/plugin split is a design boundary for the cleanup — we're NOT creating a separate package. The goal is clean separation within the rivus codebase so core functions don't import rivus-specific logic. Actual extraction to a standalone package is a future phase if needed.

---

## Part 3: Autodo — `helm autodo` CLI

### File layout

```
helm/autodo/
├── __init__.py
├── scanner.py      # TODO.md scanning → upsert into work_items
├── scope.py        # JIT LLM scoping (single item, on demand)
├── cli.py          # helm autodo scan/list/pick/done/scope/note/split/tree
└── tests/
    ├── test_scanner.py
    └── test_cli.py
```

### Scanner (`scanner.py`)

Port + improve from `helm/autodo/planner.py`:
- Extract: `- [ ] text`, `- **text**`, `TODO:`, `FIXME:`, `HACK:`
- Heading context: full path (H1 > H2 > H3)
- Identity: anchor > (source_file + heading + text_hash) > fuzzy
- Calls `upsert_items()` with `job_id="autodo"`, populating `source_file`, `heading`, `anchor`, `origin="scan"`
- Stale: items not seen in 3 scans → status='stale'

### CLI (`cli.py`)

```
helm autodo scan                              # run scanner
helm autodo list [--status X] [--file X]      # filtered list
helm autodo pick [N]                          # top N candidates
helm autodo show ID                           # full detail
helm autodo done ID / wontfix ID              # status changes
helm autodo note ID "text"                    # append to notes
helm autodo prio ID P1                        # set priority (P0-P3 → mapped to float)
helm autodo scope ID                          # JIT LLM scope → print + store in data JSON
helm autodo split ID                          # decompose into children (parent_id)
helm autodo dep ID OTHER_ID "reason"          # add dependency
helm autodo tree [ID]                         # show hierarchy + deps
```

### JIT Scope (`scope.py`)

Single-item, on demand:
1. Read item from DB
2. Ripgrep for symbols mentioned in TODO text → candidate files
3. Read nearby code (source_file ± context)
4. Call LLM (haiku): "what files need changes, what effort, what's the first step?"
5. Print to stdout + store in `data` JSON field

### Integration with jobs runner

Autodo items live in `work_items` with `job_id="autodo"`. The jobs runner CAN process them if we define stages — but initially, autodo is CLI-driven (human picks and does work). Runner integration is Phase 2.

---

## Implementation Order

1. **item_stages normalization** — create table, migrate stages JSON data, rewrite all stage functions
2. **Schema cleanup** — drop job_runs/cost_log usage, simplify job_state, add events table, add next_attempt_at
3. **Atomic claiming + dead item recovery** — fix mark_in_progress, add reaper sweep
4. **Extract merge logic** — move _merge_item_data to callback pattern
5. **Version hashing → explicit versioning** — delete stage_version_hash, add HANDLER_VERSION to handlers
6. **Absorb cost_log into events** — update log_cost/get_daily_cost
7. **Add autodo columns + deps table** — ALTER TABLE for parent_id, source_file, etc.
8. **Add autodo CRUD methods** — list_todos(), pick_next(), add_note(), add_child(), add_dep(), get_tree()
9. **Scanner** (`helm/autodo/scanner.py`) — port from planner.py
10. **CLI** (`helm/autodo/cli.py`) — helm autodo commands
11. **JIT Scope** (`helm/autodo/scope.py`)
12. **Migration** — import queue.yaml items, run full scan
13. **Tests** — tracker tests, scanner tests, CLI tests
14. **Cleanup** — deprecate helm/autodo/planner.py, helm/todos/queue.yaml

## Verification

1. Existing jobs still work (no regression from schema changes)
2. Stage queries use indexes (EXPLAIN QUERY PLAN shows no full scans)
3. Concurrent workers can't double-claim items
4. `helm autodo scan` imports TODOs
5. `helm autodo list/pick/scope/split/tree` all work
6. Tests pass
7. Queue.yaml migrated

## Part 4: Gym Jobs — Quality Feedback Loops

A **gym** is a steering module on top of a job. It doesn't produce deliverables — it produces quality signals about another job's output, closing the gen-eval-learn loop.

### `kind: gym` in jobs.yaml

| Field | Purpose |
|-------|---------|
| `kind: gym` | Distinguishes from `monitor`, `backfill`, `internal` |
| `parent_job` | Which job's output this gym evaluates |
| `tags: [gym, ...]` | Enables filtering in dashboard, CLI, budget |

### How gym jobs differ

| Dimension | Regular job | Gym job |
|-----------|------------|---------|
| Discovery | External sources (APIs, feeds) | Samples from parent job's output |
| Items | Entities (companies, videos, URLs) | (entity, extraction) pairs to evaluate |
| Stages | fetch → extract → score | score → review → learn |
| Output | Data, analysis, deliverables | Quality scores, misidentifications |
| Cadence | Own schedule or continuous | Triggered after parent produces output |
| Budget | Production LLM spend | Eval LLM spend (cheaper models, or opus for QC) |

### Quality loop

```
Parent job produces output
  → Gym job samples + scores (gemini-lite)
  → Low scores surface as review items
  → Human reviews / opus QC with screenshots confirms
  → Corrections grow the gym corpus
  → Gym re-evaluates → parameters adjust
  → Parent job gets better
```

### First gym: extraction quality

`learning/gyms/extraction/` — evaluates HTML extraction + LLM cleaning.
Currently a standalone CLI; will become a `kind: gym` job tracking extraction quality
across all URLs processed by any job that uses `load_url()`.

Corpus: `learning/gyms/extraction/corpus/sites.yaml` (seeds, grows from production).
Scoring: intrusion removal, anchor preservation, must-not-contain checks.

## Key Files

| Action | File |
|--------|------|
| Modify | `jobs/lib/tracker.py` — schema + migration + item_stages + autodo methods |
| Modify | `jobs/runner.py` — use item_stages, atomic claiming, dead item recovery |
| Modify | `jobs/lib/pacer.py` — upgrade to token bucket |
| Modify | `jobs/lib/executor.py` — remove `jobs.` prefix |
| Modify | `jobs/handlers/*.py` — add HANDLER_VERSION, remove VERSION_DEPS |
| Create | `helm/autodo/__init__.py` |
| Create | `helm/autodo/scanner.py` |
| Create | `helm/autodo/cli.py` |
| Create | `helm/autodo/scope.py` |
| Create | `helm/autodo/tests/test_scanner.py` |
| Create | `helm/autodo/tests/test_cli.py` |
| Modify | `helm/cli.py` — wire in autodo subcommand |
| Port from | `helm/autodo/planner.py` (scanner logic) |
| Update | `CLAUDE.md` (CLI tools table) |
| Deprecate | `helm/autodo/planner.py`, `helm/todos/queue.yaml` |
