# Jobs Framework Action Plan

> Synthesized from: 2-agent deep review + 4-model vario maxthink critique (opus, gpt-pro, grok, gemini)

## Priority tiers

Changes ranked by: operational risk eliminated × confidence (all 4 models agree) × effort.

---

### Tier 1 — Fix before anything else (correctness bugs)

**1. Atomic claiming** — prevents double-processing

Currently `mark_in_progress` does a bare UPDATE. Two workers calling `get_pending` → `mark_in_progress` on the same item both succeed silently.

```python
# Fix: one-line change in tracker.py
cursor.execute("""
    UPDATE work_items SET status = 'in_progress', last_attempt_at = ?
    WHERE id = ? AND status = 'pending'
""", (now, item_id))
if cursor.rowcount == 0:
    return False  # someone else claimed it
```

Same pattern needed in item_stages after normalization: `WHERE status IN ('pending', 'retry_later')`.

| Models agreeing | Effort | Risk if skipped |
|-----------------|--------|-----------------|
| 4/4 (opus, gpt-pro, grok, gemini) | 30 min | Items processed twice, duplicate results, wasted API spend |

**2. Dead item recovery** — prevents items stuck forever as in_progress

If a worker crashes mid-stage, the item stays `in_progress` forever. No reaper exists.

```python
# Periodic sweep (add to runner main loop or as a cron-like task)
UPDATE work_items SET status = 'pending', attempts = attempts + 1
WHERE status = 'in_progress'
  AND last_attempt_at < datetime('now', '-30 minutes')
```

| Models agreeing | Effort | Risk if skipped |
|-----------------|--------|-----------------|
| 3/4 (opus, gpt-pro, gemini) | 1 hr | Items silently abandoned after crashes |

**3. Max attempts enforcement** — prevents infinite retry of poison items

`attempts` is tracked but never checked. An item with a bad URL retries forever.

```python
# In get_items_needing_stage / get_pending:
WHERE attempts < ?  -- max_attempts from PacingConfig (default 5)
```

Items exceeding max_attempts → status `'poison'` or `'exhausted'`.

| Models agreeing | Effort | Risk if skipped |
|-----------------|--------|-----------------|
| 3/4 (opus, gpt-pro, grok) | 30 min | Poison items churn forever, waste resources |

---

### Tier 2 — Normalize stages JSON → item_stages table (biggest structural win)

**All 4 models flag this as the #1 structural issue.** The stages JSON column is a concurrency trap, performance bottleneck, and maintenance burden.

**Problem**: `_update_stage` does read-modify-write on JSON blob. Two workers updating different stages of the same item race and clobber each other. Every query requires `json_extract` — no index use, full table scans.

**Solution**: New `item_stages` table.

```sql
CREATE TABLE item_stages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    item_key TEXT NOT NULL,
    stage TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    started_at TEXT,
    ended_at TEXT,
    elapsed_s REAL,
    error TEXT,
    attempts INTEGER DEFAULT 0,
    updated_at TEXT DEFAULT (datetime('now')),
    UNIQUE(job_id, item_key, stage)
);
CREATE INDEX idx_is_job_stage_status ON item_stages(job_id, stage, status);
```

**Functions that change**:

| Function | Before | After |
|----------|--------|-------|
| `_update_stage` | Read JSON → modify → write JSON | `UPDATE item_stages SET status=?, started_at=? WHERE ...` |
| `_get_stages` | Parse JSON, filter suffixes | `SELECT * FROM item_stages WHERE job_id=? AND item_key=?` |
| `get_items_needing_stage` | `json_extract(stages, ?) IN (...)` | `JOIN item_stages ON ... WHERE stage=? AND status IN (...)` |
| `_sync_item_status` | 20 lines of Python conditionals | SQL view or derived query (see below) |
| `stage_stats` | N_stages × N_status queries with json_extract | `SELECT stage, status, COUNT(*) GROUP BY stage, status` |
| `stage_timing_stats` | Parse JSON suffixes | `SELECT stage, AVG(elapsed_s), MAX(elapsed_s) FROM item_stages` |
| `retry_failed` | JSON manipulation | `UPDATE item_stages SET status='pending' WHERE status='failed'` |
| `reprocess_stale` | JSON manipulation | `UPDATE item_stages SET status='pending' WHERE stage=?` |

**Derive item status from stages** (replaces `_sync_item_status`):

```sql
-- Option A: View (read-time derivation, always consistent)
CREATE VIEW item_status_v AS
SELECT w.id, w.job_id, w.item_key,
  CASE
    WHEN NOT EXISTS (SELECT 1 FROM item_stages s WHERE s.job_id=w.job_id AND s.item_key=w.item_key AND s.status NOT IN ('done','skipped'))
      THEN 'done'
    WHEN EXISTS (SELECT 1 FROM item_stages s WHERE s.job_id=w.job_id AND s.item_key=w.item_key AND s.status = 'in_progress')
      THEN 'in_progress'
    WHEN EXISTS (SELECT 1 FROM item_stages s WHERE s.job_id=w.job_id AND s.item_key=w.item_key AND s.status = 'failed')
      AND NOT EXISTS (SELECT 1 FROM item_stages s WHERE s.job_id=w.job_id AND s.item_key=w.item_key AND s.status IN ('pending','in_progress','retry_later'))
      THEN 'failed'
    ELSE 'pending'
  END as derived_status
FROM work_items w;

-- Option B: Trigger (write-time, faster reads but more moving parts)
-- Grok and Gemini prefer triggers; Opus prefers view. Start with view, optimize if needed.
```

**Migration**: Unpack existing JSON into rows. One-time script:

```python
for row in conn.execute("SELECT job_id, item_key, stages FROM work_items WHERE stages IS NOT NULL"):
    stages = json.loads(row["stages"])
    for key, val in stages.items():
        if key.endswith(("_started", "_ended", "_elapsed_s", "_error")):
            continue  # timing metadata, handled below
        stage = key
        status = val
        started = stages.get(f"{stage}_started")
        ended = stages.get(f"{stage}_ended")
        elapsed = stages.get(f"{stage}_elapsed_s")
        error = stages.get(f"{stage}_error")
        conn.execute("""INSERT OR IGNORE INTO item_stages
            (job_id, item_key, stage, status, started_at, ended_at, elapsed_s, error)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (row["job_id"], row["item_key"], stage, status, started, ended, elapsed, error))
```

| Models agreeing | Effort | Impact |
|-----------------|--------|--------|
| 4/4 | 1-2 days | Fixes concurrency bug, eliminates ~200 LOC of suffix parsing, enables indexed queries, makes stage_stats O(1) instead of O(items×stages) |

---

### Tier 3 — Drop dead tables, unify events

**4. Drop job_runs** — write-only table nobody reads

`start_run` and `end_run` exist but no dashboard, alert, or query reads from this table. Replace with events: `log_event(job_id, "run_start")` / `log_event(job_id, "run_end", data={"counts": ...})`.

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 | 30 min (stop writing, don't bother dropping the table) |

**5. Absorb job_cost_log into events**

```python
# Before
log_cost(conn, job_id, stage, cost_usd, item_key)

# After
log_event(conn, job_id, "cost", stage=stage, item_key=item_key,
          data={"cost_usd": cost_usd})
```

`get_daily_cost` becomes: `SELECT SUM(json_extract(data,'$.cost_usd')) FROM events WHERE event='cost' AND ...`

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 | 1 hr |

**6. Simplify job_state** — from 8 cols to 4

Drop: `auto_unpause_count`, `last_auto_unpause_at`, `last_error`. Auto-unpause counter → in-memory in runner. Errors → events table.

Keep: `job_id`, `paused`, `pause_reason`, `retry_after`, `updated_at`.

| Models agreeing | Effort |
|-----------------|--------|
| 3/4 | 30 min (stop using the columns; SQLite can't easily drop them) |

**7. Rename job_events → events**

Unified event log absorbs cost_log + run tracking. Add index: `(event, ts)`.

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 | 15 min (ALTER TABLE RENAME) |

---

### Tier 4 — Code quality (extract, simplify, fix)

**8. Extract upsert merge logic from tracker**

`_merge_item_data` (100+ LOC) hardcodes knowledge of `topics`/`queries`/`discovery_sources`. DB layer shouldn't know domain semantics.

Fix: `upsert_items` accepts a `merge_fn: Callable[[dict, dict], dict]` parameter. Default = simple overwrite. Rivus discovery passes the provenance-aware merger. Extract `_merge_item_data` to standalone tested function.

Also: switch from `INSERT → IntegrityError → SELECT → UPDATE` to `INSERT OR IGNORE` + batch update for better bulk performance.

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 | 2-3 hrs |

**9. Replace version hashing with explicit semantic versioning**

Delete `stage_version_hash()`, `VERSION_DEPS`. Each handler declares `HANDLER_VERSION = {"extract": "2026-02-26.1", "analyze": "v3"}`. `reprocess_stale` compares stored `results.handler_version` against current handler version.

Why: `inspect.getsource()` breaks on formatting changes, comments, decorators, compiled code. `VERSION_DEPS` requires manual maintenance (always drifts). Explicit versioning makes reprocessing an intentional human decision.

Opus alternative (worth considering): version *outputs* not code — hash result JSON, compare against stored. Catches model version changes, prompt changes, API behavior changes. Slower but more correct.

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 | 2-3 hrs (update every handler module) |

**10. Upgrade pacer to token bucket**

Current: fixed-spacing (`interval = 3600 / max_per_hour`). No bursting. Per-instance, not per-resource.

Fix (Opus approach — no new dependency): add `tokens`/`max_tokens` fields, refill on `wait()`, share via `ResourceRegistry`. ~15 extra lines.

Alternative (Gemini/Grok): replace with `aiolimiter.AsyncLimiter` — 3 lines.

Either way: add `asyncio.Lock` for concurrency safety.

| Models agreeing | Effort |
|-----------------|--------|
| 4/4 (disagree on approach, agree on problem) | 1 hr |

**11. Remove hardcoded `jobs.` prefix in executor.py**

One line: `full_module = f"jobs.{module_path}"` → `full_module = module_path`. Let YAML specify full dotted path. Also add `@lru_cache` (Opus suggestion).

| Models agreeing | Effort |
|-----------------|--------|
| 3/4 | 15 min |

**12. Add next_attempt_at for retry scheduling**

`retry_later` items currently re-selected immediately. Add `next_attempt_at TEXT` column. `mark_stage_retry_later` sets `next_attempt_at = now + backoff`. `get_items_needing_stage` adds `WHERE next_attempt_at IS NULL OR next_attempt_at <= datetime('now')`.

GPT-Pro's "one concrete simplification" — small change, big stability improvement.

| Models agreeing | Effort |
|-----------------|--------|
| 3/4 | 1 hr |

---

### Tier 5 — Structural improvements (lower urgency)

**13. Split Job dataclass** (Opus)

`Job` is a 30-field god object. Split into `JobSpec` (immutable: identity + handler + stages) and `JobTuning` (hot-reloadable: pacing + guards + limits). Then `update_from` becomes `self.tuning = other.tuning`.

Also: delete dead `StageConfig.outputs` field, fix `PacingConfig.circuit_breaker` default inconsistency (10 in dataclass, 3 in YAML loader).

| Effort | Urgency |
|--------|---------|
| 2 hrs | Low — current structure works, just messy |

**14. Discovery.py dedup** (deep review finding, not vario)

1750 lines, 17 strategies with copy-paste. Extract: yt-dlp wrapper, URL hash util (`md5[:10]` repeated 5x), async subprocess helper. Merge: `company_sources` + `company_watchlist` + `static_ein_list`.

| Effort | Urgency |
|--------|---------|
| 4-6 hrs | Low — works fine, just long |

**15. ctl.py concern separation** (deep review finding)

608 lines mixing CLI dispatch with business logic. Extract `add_urls`, `reprocess_fast` to lib modules. CLI becomes ~200 lines of pure dispatch.

| Effort | Urgency |
|--------|---------|
| 2-3 hrs | Low |

---

## Execution sequence

```
Week 1: Correctness (Tier 1 + Tier 2)
  1. Atomic claiming (30 min)
  2. Dead item recovery (1 hr)
  3. Max attempts enforcement (30 min)
  4. item_stages table + migration + rewrite stage functions (1-2 days)
  5. Derive item status from item_stages (1 hr)

Week 2: Schema cleanup (Tier 3 + top of Tier 4)
  6. Drop job_runs usage (30 min)
  7. Absorb cost_log into events (1 hr)
  8. Simplify job_state (30 min)
  9. Rename job_events → events (15 min)
  10. Add next_attempt_at (1 hr)
  11. Remove jobs. prefix in executor (15 min)

Week 3: Code quality (rest of Tier 4)
  12. Extract upsert merge logic (2-3 hrs)
  13. Replace version hashing (2-3 hrs)
  14. Upgrade pacer (1 hr)

Tier 5: As time permits / during other refactors
```

## Model disagreements (for reference)

| Topic | Divergence |
|-------|-----------|
| **Pacer fix** | Opus: fix in-place (token bucket, 15 lines). Grok/Gemini: replace with aiolimiter. GPT-Pro: keep, just add Lock + burst. |
| **executor.py** | Grok: inline it. Others: keep as plugin seam. |
| **job.py dataclasses** | Grok: merge to single class + dicts. Opus: split into JobSpec + JobTuning. Others: keep as-is with minor fixes. |
| **Version hashing replacement** | Gemini/GPT-Pro: explicit semantic version string. Opus: hash result outputs (version the data, not the code). Grok: git hash of handler files. |
| **Status derivation** | Opus: SQL view (read-time). Grok/Gemini: SQL trigger (write-time). GPT-Pro: either, but normalize first. |
| **Events as plugin** | Opus: events should be plugin (not every system needs them). Others: keep in core. |
