# Jobs System -- Architecture

## Runner Architecture

Each job spawns independent long-lived tasks:

```
per job:
  |- discovery_task    -- independent timer, checks max_pending guard
  |- stage_worker x N  -- one per stage, event-driven (wakes instantly on upstream completion, 30s safety-net poll)
  +- guard_checker     -- every 60s, checks cost limit + validate
```

Stage workers wait on `asyncio.Event` signals. When stage N completes an item, it fires stage N+1's event -- waking it in <1ms (vs up to 5s with old polling). When idle, workers sleep on the event with a 30s safety-net timeout (zero DB queries, zero CPU). Discovery fires the first stage's event after finding new items. Workers have crash protection -- unhandled exceptions are caught and retried after 10s.

`--test` mode uses a watchdog that trips after 9s of no progress.
`--limit N` pauses the job after N items are processed.

### Stage Concurrency

Each stage runs as an independent async worker (not sequential iteration). Items completing stage N immediately feed stage N+1's worker. Per-stage concurrency is controlled by `StageConfig.concurrency`.

```yaml
stages:
  - name: price
    concurrency: 5      # 5 items in parallel
  - name: ib
    concurrency: 1      # serial (IB pacing constraint)
    resource: ib         # future: shared cross-job semaphore
```

Backward compat: `stages: [a, b, c]` still works (concurrency=1 default).

### Parallelism & Bottleneck Analysis

For each stage, answer:
- **Are items independent?** (different symbols, different URLs, different companies -> yes)
- **What external resource does it hit?** (API, database, local CPU, network download)
- **What's the rate limit?** (requests/min, connections, bandwidth)
- **Can multiple items share the resource concurrently?** (REST APIs usually yes; single-connection protocols like IB usually no)

Document this as `concurrency` per stage and `resource` for shared external constraints:

```yaml
stages:
  - name: price
    concurrency: 5        # Finnhub: 60 req/min, 5 concurrent is safe
    resource: finnhub     # shared across jobs hitting Finnhub
  - name: ir
    concurrency: 3        # yt-dlp searches -- no hard limit, be polite
  - name: ib
    concurrency: 1        # single IB gateway, 60 req/10 min pacing
    resource: ib           # shared with any job using IB data
  - name: transcript
    concurrency: 3        # yt-dlp downloads -- bandwidth-bound
  - name: chart
    concurrency: 4        # local CPU only, no external resource
```

**`concurrency`**: max items processed in parallel for this stage (default: 1 = sequential). The runner uses `asyncio.Semaphore(concurrency)` per stage.

**`resource`**: names a shared external resource. Multiple jobs/stages with the same `resource` value will coordinate through a shared semaphore (not yet implemented). For now, just document the resource name so the dependency is explicit.

**Rule of thumb**: if a stage calls an external API, it needs both a `concurrency` limit and a `resource` name. If it's pure local compute, just `concurrency`.

---

## Guards

Guards are optional safety limits configured per-job in `jobs.yaml`:

```yaml
guards:
  max_pending: 200        # skip discovery when queue is full (not a pause)
  daily_cost_limit: 5.00  # auto-pause when daily cost exceeds limit (USD)
  validate: true          # call handler's validate() after batches
```

- **max_pending**: Checked in `discovery_task` before discovering. Processing continues normally.
- **daily_cost_limit**: Handlers return `_cost` in result dict. Runner logs to `job_cost_log` table. Guard checker (60s) sums today's cost, pauses with reason if exceeded.
- **validate**: Handler module defines optional `validate(conn, job_id) -> str | None`. Returns None (ok) or reason string (pause).

Pause reason is stored in `job_state.pause_reason` and displayed in the dashboard.

---

## Output Versioning

Each stage result is stored with a `handler_version`. When handler code changes, items processed by the old version show as "stale" in the dashboard with a "Reprocess Stale" button.

**Convention** (priority order):
1. **Source hash is primary** -- `stage_version_hash()` auto-discovers `_stage_{name}` or `stage_{name}` functions and hashes their source. Prompt changes auto-detected.
2. **`HANDLER_VERSION` is override** -- explicit version string bypasses source hashing. Use to force invalidation (external dep changed, model update) or suppress false positives (cosmetic change).
3. **LLM triage** -- when a source hash changes, `version_triage.py` uses an LLM to classify the change as cosmetic vs semantic. Cosmetic changes are auto-blessed.
4. **`jobctl bless`** -- manual override for false positives the LLM triage misses.
5. **`VERSION_DEPS` (legacy)** -- still supported for backward compat but no longer needed. Auto-discovery + LLM triage covers cross-module changes.

**Stage function naming**: `_stage_profile`, `_stage_extract`, or `stage_price`, `stage_ir`. Both prefixes auto-discovered.

```python
# Source hash covers inline logic -- no VERSION_DEPS needed
async def _stage_profile(company_name, data, job):
    prompt = f"Research {company_name}..."  # change = auto-detected
    return await _llm_json(prompt)

# HANDLER_VERSION for external dep changes (API, model updates)
HANDLER_VERSION = {
    "extract": "2026-03-09.1",  # upstream API changed response format
}
```

**How it works**:
1. `stage_version_hash(handler, stage)` checks `HANDLER_VERSION[stage]` first (override)
2. If not set, finds `_stage_{stage}` or `stage_{stage}` in handler module
3. Hashes source of stage function (+ any legacy `VERSION_DEPS[stage]` entries)
4. Hash stored in `results.handler_version` alongside each result
5. Dashboard compares current hash vs stored -> stale count + "Reprocess Stale" button
6. LLM triage auto-blesses cosmetic changes; `jobctl bless` handles the rest

**When to use what**:

| Changed                              | Detected | Action                        |
|--------------------------------------|----------|-------------------------------|
| Stage function body (prompt, logic)  | Auto     | None                          |
| Imported function                    | Auto     | LLM triage classifies it      |
| Module-level constant                | Auto     | LLM triage classifies it      |
| External data format / model update  | Not auto | Bump `HANDLER_VERSION`        |
| Log/comment only                     | Auto (false positive) | LLM triage auto-blesses, or `jobctl bless JOB STAGE` |

---

## SQLite Tracker

- DB: `jobs/data/jobs.db`
- Items: `pending` -> `in_progress` -> `done`/`failed`
- Staged jobs track per-stage status + timing in `stages` JSON column
- Resumable: done items are skipped, pending picked up on restart
- Stuck `in_progress` items need manual reset to `pending`

### Connection Discipline

**SQLite has a single write lock for the entire database** -- no table or row-level locking. In WAL mode, readers never block writers, but only one writer can hold the lock at a time.

**Rules:**
- **Short-lived connections**: `with closing(open_raw_conn()) as conn:` -- open, work, commit, close
- **Never hold singletons**: A long-lived connection in a batch script blocked the runner for 20 minutes
- **Commit per item**: Never batch hundreds of writes in one transaction
- **`timeout=30`**: So writes wait for the lock instead of failing immediately

**Pattern for handlers:**
```python
from contextlib import closing
from projects.vic.db import open_raw_conn

# Read
with closing(open_raw_conn()) as conn:
    row = conn.execute("SELECT ...").fetchone()

# Write (separate conn -- release lock between operations)
with closing(open_raw_conn()) as conn:
    conn.execute("UPDATE ...")
    conn.commit()
```

The tracker DB (`jobs/lib/tracker.py`) already follows this pattern with `open_db()`. Handler-specific DBs must follow it too.

### Schema

```sql
-- Main work items table
work_items (id, job_id, item_key, status, attempts, discovered_at,
            last_attempt_at, error, data, result_path, stages, priority)

-- Normalized per-stage tracking (replaces stages JSON)
item_stages (id, item_id, job_id, item_key, stage, status, started_at,
             ended_at, elapsed_s, error, attempts, updated_at)

-- Stage results (per item per stage)
results (id, job_id, item_key, stage, result, created_at, handler_version)

-- Job-level state (pause/resume)
job_state (job_id, paused, updated_at, pause_reason, retry_after,
           auto_unpause_count, last_auto_unpause_at)

-- Unified event log (runs, costs, errors, lifecycle)
events (id, job_id, event, stage, item_key, detail, data, ts)
-- Cost: event='cost', data.cost_usd; Errors: event='doctor_pause'
```

**Note**: The table is `work_items`, not `items`.

---

## Stage Timing

The tracker automatically records **per-stage elapsed time** for every item:

- `mark_stage_in_progress` records `{stage}_started` timestamp
- `mark_stage_done/failed` records `{stage}_ended` and computes `{stage}_elapsed_s`
- All stored in the `stages` JSON column alongside status

**Dashboard shows timing at two levels**:
- **Job summary**: p50 time per stage (e.g., `1. fetch: 340/675 done ~12s ea`)
- **Item detail**: per-stage elapsed (e.g., `fetch: done (11.3s)`)

---

## Pipelines

Jobs can be part of a **pipeline** -- a multi-job workflow where one job's output feeds the next.

```yaml
pltr_discovery:
  pipeline: pltr_deep_dive
  pipeline_step: 1    # discovery finds content, scores it
  discovery:
    strategy: multi_source
    ...

pltr_content_processing:
  pipeline: pltr_deep_dive
  pipeline_step: 2    # reads discovery's done items via tracker_query
  discovery:
    strategy: tracker_query
    params:
      source_job_id: pltr_discovery
      min_score: 10
```

The dashboard shows pipeline context: `pltr_discovery (0/675) -> pltr_content_processing (0/0)` with the current job highlighted.

---

## Wake Mechanism

- **Config hot-reload**: `jobs.yaml` mtime watched + SIGHUP handler -- no restart needed for config changes
- **Wake files**: Touch `jobs/data/.wake/JOB_ID` to interrupt sleep and trigger immediate processing (checked every 2s)
- **CLI integration**: `jobctl run` clears errors, unpauses, and creates wake file
- **Stage transitions are event-driven**: completing stage N fires stage N+1's event instantly (<1ms)

---

## Doctor Error Intelligence (`jobs/lib/doctor.py`)

Every error gets LLM-classified (haiku, ~$0.001/call) the moment it happens.

**How it works**: Runner calls `classify_and_act(conn, job_id, stage, item_key, error)` on every exception. Doctor:
1. Calls haiku to classify the error
2. Returns a `Verdict` with `action` (retry_later / fail_item / pause_job) and `risk_tier` (low / medium / high)
3. Logs to `doctor_actions` table in `jobs.db`
4. Sends Pushover notification for medium/high risk actions

**Classifications and actions**:

| Classification  | Examples                          | Action                             | Risk   |
|-----------------|-----------------------------------|------------------------------------|--------|
| `transient`     | timeout, 429, SSL, connection reset | retry_later                      | low    |
| `item_specific` | bad HTML for one URL, missing field | fail_item (skip, keep going)     | low    |
| `temporal`      | market closed, outside hours      | pause_job (with retry_after)       | low    |
| `systemic`      | cookie expired, DNS fail, service down | pause_job (on FIRST occurrence) | medium |
| `code_bug`      | KeyError, AttributeError          | pause_job + high-pri Pushover      | high   |

**Key design**:
- Systemic errors pause on **first** occurrence (not after 3 consecutive)
- Item-specific errors never trigger a pause
- Transient errors: 10 in a row escalates to systemic pause
- Every error is logged with classification for post-mortem analysis
- **Fallback**: If LLM is unreachable, regex-based classification. See `_fallback_classify()`.

---

## Self-Healing Pipeline Pattern

Pipelines should detect their own failures, diagnose the root cause, and guide repair.

### Validator Stage

A **validator** is any stage whose role is to check upstream output. It can also enrich.

```
fetch -> extract (code) -> check_enrich (LLM validator + enricher)
              |                    |
              |   discrepancy log  |   <- "parser missed symbol"
              |   _invalidate      |   <- "wrong content, re-fetch"
              +--------------------+
```

**Diagnosis table**:

| Signal                                    | Meaning             | Action                       |
|-------------------------------------------|---------------------|------------------------------|
| `content_ok=false`, extract also empty    | Bad source data     | Nothing to fix               |
| `content_ok=true`, extract has data       | Everything works    | No action needed             |
| `content_ok=true`, extract missing fields | **Parser failure**  | Fix parser, rerun extract    |
| `symbol_matches=false`                    | Wrong content served | `_invalidate_stage: "fetch"` |

### Discrepancy Contract

Validators return:
```python
{
    "content_ok": True,
    "thesis_type": "value",
    "_discrepancies": [
        {"field": "symbol", "extract": "", "llm": "AAPL", "severity": "high"},
    ],
    "_invalidate_stage": "fetch",   # optional: signal upstream re-run
}
```

### Validation Circuit Breaker (TODO)

Counts *semantic failures* -- items where the validator says something is wrong. Trips when content_ok rate drops below threshold or discrepancy rate exceeds threshold.

### Implementation Status

| Component                                | Status   | Location                              |
|------------------------------------------|----------|---------------------------------------|
| Source hash + auto-discovery             | Done     | `jobs/lib/db/results.py`              |
| LLM version triage                       | Done     | `jobs/lib/version_triage.py`          |
| Stale detection + reprocessing           | Done     | `get_stale_count`, `reprocess_stale`  |
| Doctor error intelligence                | Done     | `jobs/lib/doctor.py`                  |
| `_discrepancies` in result dict          | TODO     | Convention defined                    |
| `_invalidate_stage` runner handling      | TODO     | Convention defined                    |
| Validation circuit breaker               | TODO     | Stage role + rate tracking            |
| Cascade reprocessing                     | TODO     | Reprocess stage N -> also reset N+1   |
| Dashboard discrepancy health view        | TODO     | Per-field discrepancy rates           |

---

## Resource Contention (Not Yet Implemented)

Jobs run concurrently, but some share external resources with their own rate limits (e.g. IB has a 60-request/10-min pacing rule). Currently each job has its own `Pacer` but there's no cross-job resource coordination.

**Future design**: Jobs that share a constrained resource could acquire a shared semaphore keyed by resource name. The `pacing` config could gain a `resource: ib` field, and the runner would create one pacer per resource instead of per job.

---

## External Services

| Service            | Required by                              | Start command       | Location                                           |
|--------------------|------------------------------------------|---------------------|-----------------------------------------------------|
| `stockloader serve`| `earnings_backfill_largecap` (ib stage)  | `stockloader serve` | `~/all-code/finance/mktmake/replay/stockloader.py` |
| IB TWS/Gateway     | stockloader                              | TWS app or IB Gateway| Desktop app                                        |

---

## LLM-in-Code Pattern

Rivus uses LLMs as inline decision-makers in infrastructure code — not just for user-facing features.
The pattern: replace a hardcoded heuristic or config-driven rule with a fast, cheap LLM call that
brings commonsense judgment to decisions that are hard to enumerate upfront.

### Examples

**1. Error classification (jobs/lib/doctor.py)**

Every stage error is classified by haiku (~$0.001/call) into 5 error classes (transient, systemic, code_bug,
item_specific, temporal). The classification drives the circuit breaker — retry vs fail-item vs pause-job.
This replaced a regex-based classifier that couldn't handle novel error messages.

**2. Notification gate (lib/notify/pushover.py)**

When 3+ notifications have been sent in a 5-minute window, a gemini-flash call reviews recent push history
and decides whether the new notification is worth buzzing the user's phone. Fails open (sends on LLM error).
This replaces what would otherwise be an ever-growing list of suppression rules.

### Design Principles for LLM-in-Code

| Principle | Why |
|-----------|-----|
| **Fail open** | LLM failure → default to the non-LLM behavior (send, retry, etc.) |
| **Use the cheapest model that works** | haiku/gemini-flash, not opus. ~$0.001/call max |
| **Keep prompts under 500 tokens** | Fast, cheap, cacheable |
| **Log decisions** | Always log what the LLM decided and why (for tuning) |
| **Never block critical paths** | LLM gate on notifications is fine; LLM gate on error handling is not |
| **Structured output** | "SEND/SUPPRESS + reason" not free-form prose |

## Vario Integration — The Execution Boundary

### The Relationship

**Vario is the Domain AND the Engine.** It defines what operations exist (produce, score, revise, reduce, run_op) and how they execute (queued pipeline, per-stage concurrency, adaptive routing, anytime access).

**Jobs is the Operational Shell.** It answers: when to run, where items come from, what to do when things break, and where to look.

```
┌──────────────────────────────────────────────┐
│  jobs.yaml  (declarative job definitions)    │
└──────────────┬───────────────────────────────┘
               │
┌──────────────▼───────────────────────────────┐
│  Runner (operational wrapper)                │
│                                              │
│  Discovery ──► items as Source               │
│  Scheduler ──► pacing, idle gates, guards    │
│  Resource  ──► cross-job semaphores          │
│       │                                      │
│       ▼                                      │
│  ┌───────────────────────────────────────────┐
│  │ vario.run(recipe, source)                 │ ← vario owns execution
│  │   on_op_done → DB persistence              │
│  │   on_item  → heap / dashboard             │
│  └───────────────────────────────────────────┘
│       │                                      │
│       ▼                                      │
│  Doctor ──► error triage (retry/fail/pause)  │
│  State DB ──► items, stages, events, results │
└──────────────────────────────────────────────┘
```

### What Vario Provides (don't reimplement in jobs)

| Capability | Vario mechanism | Jobs equivalent (legacy) |
|---|---|---|
| Stage sequencing | `run()` inter-stage queues | Runner stage worker loop |
| Per-stage concurrency | `Stage.concurrency` + semaphore | `StageConfig.concurrency` + semaphore |
| Adaptive routing | `_next` prop (`"_done"`, `"stage_name"`) | Not possible |
| Item fan-out | Ops yielding multiple items | `_spawn_items` in handler result |
| Rate limiting | `limits` dict on Context | `Pacer` (token bucket) |
| Progress tracking | `Trace` + `on_op_done` callback | DB `item_stages` writes |
| Live results | `Heap` + `on_item` callback | Dashboard polling DB |

### The Bridge: `run_op()`

`vario/ops/run_op.py` wraps any jobs handler into a vario `ConfiguredOp`:

```python
from vario.ops.run_op import run_op

op = run_op(handler=process_stage, stage="fetch", job=job_obj, on_result=store_to_db)
```

This means handlers work in both worlds — the traditional jobs runner OR a vario pipeline — without modification. Handler signature is unchanged: `process_stage(*, item_key, data, job, stage)`.

### What Jobs Must Still Provide (vario cannot do these)

1. **Discovery** — finding work items (Finnhub API, yt-dlp scrapes, tracker queries, manual CLI)
2. **State persistence** — surviving restarts, tracking history across days/weeks in SQLite
3. **Error triage** — Doctor LLM classification (transient/item_specific/temporal/systemic/code_bug) driving retry/fail/pause decisions
4. **Scheduling & guards** — pacing, eligibility gates, cost limits, cross-job resource semaphores
5. **Dashboard** — operational visibility and controls (pause/resume/run N/reprocess stale)

### Migration Path

The runner currently manages its own stage workers, queues, and concurrency (~500 lines). With `run()`, this collapses to: build a `Recipe` from `jobs.yaml` stages, wrap each handler via `run_op()`, call `run(recipe, source)`, persist results via callbacks.

See `docs/superpowers/specs/2026-03-19-thin-jobs-layer.md` for the full design spec and phased migration plan.

---

## Backfill vs Monitor Jobs (Not Yet Implemented)

Backfill and monitoring are **separate jobs**, not modes of the same job. A source might have `dumb_money_backfill` (historical, finite) and `dumb_money_monitor` (ongoing). They share a handler but have different discovery strategies, pacing, and dashboard needs.

**Dashboard implications**: Backfill jobs show progress (pending/done/%, ETA). Monitor jobs show freshness (last new item, items today/week). These should be **separate dashboard tabs**.

**jobs.yaml**: Add a `kind: backfill | monitor` field. No lifecycle transition.

**Related**: `investor/sourcehub/SPEC.md` defines a unified source abstraction with `sample`, `backfill(since=...)`, and `monitor()` as separate operations.
