# Jobs Framework Full-System Vario Critique (maxthink, run 2)

Run ID: 359c3a44
Input: all 8 source files + SQL schema (tagged individually)
Models: opus, gpt-pro, grok, gemini

---

## GEMINI

**Module: Tracker (`tracker.py` & SQL Schema)**
*   **What it does well (keep):** Uses SQLite with WAL mode effectively for a single-node concurrent system. The schema is generally flat and easy to reason about. The idempotent migration pattern is simple and robust.
*   **What's broken or fragile (fix):** The `stages` column in `work_items` is JSON, and `tracker.py` does heavy JSON patching (`json_extract`, `json.loads/dumps`) to update stage states, timings, and errors. This is highly prone to race conditions if multiple workers update different stages of the same item simultaneously.
*   **What's in the wrong layer (move):** `_merge_item_data` contains hardcoded business logic for specific domains (e.g., merging "topics" and "queries"). This belongs in the discovery plugins, not the core database tracker.

**Module: Runner (`runner.py`)**
*   **What it does well (keep):** The asyncio architecture using `asyncio.Semaphore` for concurrency and resource limits is clean. The hot-reloading of configurations via SIGHUP and file mtime is an excellent operational feature.
*   **What's broken or fragile (fix):** Orphan reclamation and "stuck in progress" logic is heavily duplicated and runs in a background heartbeat loop. If the runner crashes hard, items get stuck. The watchdog logic for `--once` mode is also race-condition prone.
*   **What's in the wrong layer (move):** The runner directly executes massive SQL queries to reset stuck items and backfill missing stages at startup. This state-reconciliation logic belongs inside `tracker.py` (e.g., `tracker.reconcile_job_state(job_id)`).

**Module: Discovery (`discovery.py`)**
*   **What it does well (keep):** The `@register` decorator pattern provides a clean interface for pluggable discovery strategies.
*   **What's broken or fragile (fix):** Error handling inside individual strategies is inconsistent. Some swallow exceptions, some return empty lists, some log. 
*   **What's in the wrong layer (move):** Almost the entire file. `FinnhubCalendarDiscovery`, `YouTubeChannelDiscovery`, `VICAtozDiscovery`, etc., are highly domain-specific business logic. They do not belong in the core job framework.

**Module: Doctor (`doctor.py`)**
*   **What it does well (keep):** The concept of intelligent, policy-driven error classification (transient vs. systemic vs. code_bug) is brilliant and prevents infinite retry loops.
*   **What's broken or fragile (fix):** The LLM call takes 1-2 seconds per error. While the DB connection is released, the worker is still blocked. A sudden spike in errors will bottleneck the runner on LLM API limits.
*   **What's in the wrong layer (move):** The system prompt contains domain-specific knowledge ("fetched HTML doesn't look like a VIC idea page", "stockloader serve"). This should be injected via job configuration, not hardcoded in the core error handler.

**Module: Job / Config (`job.py`, `executor.py`, `pacer.py`)**
*   **What it does well (keep):** `job.py` provides strongly-typed dataclasses for YAML config, making the rest of the system safer. `executor.py` is a standard, clean dynamic importer.
*   **What's broken or fragile (fix):** `Pacer` is extremely naive. It tracks a simple `_last_call` timestamp. If multiple concurrent workers hit the same pacer, `time.monotonic()` checks will race, causing bursts above the intended rate limit.
*   **What's in the wrong layer (move):** `StorageConfig` is defined in `job.py` but barely used by the core framework (it seems handlers use it). It should either be a generic `params: dict` or fully managed by the core.

**Top 5 Highest-Leverage Changes**
1.  **Normalize the `stages` Schema:** Drop the JSON `stages` column in `work_items`. Create a relational `item_stages (id, item_id, stage_name, status, started_at, ended_at, error)` table. This eliminates JSON-patching race conditions and simplifies queries for throughput/ETA metrics.
2.  **Extract Domain Logic into Plugins:** Rip out all specific discovery strategies (Finnhub, YouTube, Serper, VIC) from `discovery.py` and move them into a `plugins/` or `projects/` directory. The core framework should only ship with `BaseDiscovery` and maybe `ManualDiscovery`.
3.  **Centralize State Reconciliation:** Move the ~100 lines of SQL in `runner.py` that fixes "stuck in_progress" items and "skipped disabled stages" into a single `tracker.reconcile_state(job_id)` function. Call this once on startup and during config reloads.
4.  **Make Doctor Pluggable/Async-Queue based:** Instead of blocking workers to call an LLM, push errors to an internal `asyncio.Queue`. Have a dedicated background "Doctor Task" process them. Allow jobs to define their own Doctor strategies (e.g., `RegexDoctor` vs `LLMDoctor`).
5.  **Fix the Pacer Concurrency:** Update `Pacer` to use `asyncio.Lock` around the token-bucket calculation, or replace it with a proper leaky-bucket algorithm to prevent API rate-limit violations when 50 workers wake up simultaneously.

**The Clean Core vs. Plugins Line**
If mass-simplifying this framework, the line should be drawn strictly at **execution and state management vs. business logic**.

*   **The Core Framework:** `tracker.py` (SQLite state machine), `runner.py` (asyncio orchestrator, semaphores, resource limits), `job.py` (config parsing), `pacer.py` (rate limiting), and the *interfaces* for `BaseDiscovery` and `BaseDoctor`. 
*   **The Plugins (User Space):** Anything that knows about a specific website, API, or data shape. All Finnhub/YouTube/VIC discovery classes, the LLM prompts referencing specific tools, and the data-merging logic in `upsert_items` should be moved to user-defined plugins that the framework simply imports and executes.

---

## GPT-PRO

## schema (SQLite tables)
### 1) What it does well (keep)
- Clear separation of concerns: `work_items` (queue), `results` (outputs), `job_state` (pause/errors), `job_runs` (run history), `job_cost_log`, `job_events`.
- WAL + foreign keys enabled (good default for concurrent readers/writers).
- `UNIQUE(job_id, item_key)` and `UNIQUE(job_id, item_key, stage)` are the right dedupe primitives.

### 2) What’s broken or fragile (fix)
- **Stage state is stored as JSON in `work_items.stages` but queried heavily** (e.g., `json_extract(stages, '$.stage')`). This becomes a performance + correctness trap (hard to index, hard to evolve, easy to get “stuck” states).
- `work_items.result_path` is marked “DEAD” but is still written by runner/tracker code. This creates ambiguous “source of truth” between `results` and `result_path`.
- No schema-level support for “leases/claims” (no `locked_at`, `locked_by`, `lease_expires_at`). The runner currently “SELECT then UPDATE” which is race-prone if multiple processes ever run.

### 3) What’s in the wrong layer (move)
- The fact that `result_path` is dead is an application-layer decision—**schema should be migrated cleanly** (drop column or stop writing it) rather than leaving historical artifacts that code continues to touch.

## tracker.py
### 1) What it does well (keep)
- Provides a single place for DB open + migrations (`open_raw_db`, `_run_migrations`).
- `upsert_items()` is thoughtfully designed: dedupe, preserve provenance, merge discovery metadata without clobbering.
- Results versioning via `handler_version` + `stage_version_hash()` + `reprocess_stale()` is a strong idea and fits staged pipelines well.
- Good “ops tables”: cost log + events, plus helper queries (`get_daily_cost`, `get_events`, `prune_events`).

### 2) What’s broken or fragile (fix)
- **Stage failure can deadlock an item** due to `_sync_item_status()` semantics:
  - If any stage is `"failed"` but other stages remain `"pending"`, `_sync_item_status()` sets item-level `status = 'pending'`.
  - `get_items_needing_stage()` only pulls stages that are `pending|retry_later|NULL`, not `failed`.
  - Net: an item can become **permanently “pending” but un-runnable** (failed stage blocks deps; failed stage never re-queued).
  - Fix: treat “any failed stage” as item failed/blocked, or allow “failed” stages to be selectable under certain conditions (retry policy), or introduce `blocked` status.
- **SQLite JSON as a primary state machine** (`stages` dict holding status + timing + errors) is brittle:
  - keys like `"{stage}_started"`/`_ended`/`_elapsed_s`/`_error` are easy to drift,
  - querying/aggregating is slow and un-indexed,
  - migrations are awkward (you can’t enforce constraints).
- Tracker API overlaps/conflicts with runner lifecycle logic:
  - Runner implements “reset stuck in_progress”, “skip disabled stages”, “reopen done items with new stages”, and heartbeat orphan reclaim directly in `runner.py` instead of calling tracker functions.
- `retry_failed()` only resets items with item-level `status='failed'`, but stage-level failures can leave item-level status as pending (see deadlock above), so “retry failed” may not find what humans think is “failed”.

### 3) What’s in the wrong layer (move)
- Runner-specific lifecycle operations should live in tracker:
  - “reclaim orphans”, “reset stuck”, “apply stage enable/disable policy”, “reopen items for new stages”.
  - Right now you have *two* sources of truth for lifecycle rules: `tracker._sync_item_status()` and runner’s startup/heartbeat mutations.

## runner.py
### 1) What it does well (keep)
- Clear orchestration model: discovery loop + stage workers + guard checker, with hot-reload via mtime/SIGHUP.
- Resource coordination via `ResourceRegistry` is a solid seam for cross-job throttling.
- Operational guards (daily cost limit, validate hook) are pragmatic and implemented end-to-end (pause + retry_after + events).
- Attempts to heal common operational failures:
  - reset stuck `in_progress` on startup,
  - heartbeat reclaim.

### 2) What’s broken or fragile (fix)
- **The biggest correctness issue: shared SQLite connection used concurrently across async tasks.**
  - In `stage_worker()`: `with open_db() as conn:` then `items = get_items_needing_stage(...)` then `asyncio.gather(*[process_one(i) ...])`.
  - Each `process_one()` calls `mark_stage_in_progress(conn, ...)`, `store_result(conn, ...)`, `mark_stage_done(conn, ...)`, etc. **on the same `conn` concurrently**.
  - `sqlite3.Connection` is not safe for concurrent use by overlapping coroutines; you will eventually see “database is locked”, partial writes, or interleaved transactions that break invariants.
  - Same pattern exists in `simple_worker()` (less concurrency, but still long-lived `conn` while doing slow work).
- **Doctor integration violates Doctor’s own contract**:
  - `doctor.classify_and_act()` explicitly says “don’t hold the runner’s connection during the LLM call”.
  - Runner calls `await classify_and_act(...)` while still inside `with open_db() as conn:` and inside the `async with sem:` critical section.
- Race-prone “claiming”:
  - `get_items_needing_stage()` selects rows; later `mark_stage_in_progress()` updates them.
  - Without an atomic “claim” (UPDATE…RETURNING), two workers/processes can select the same item.
- Disabled stage skipping + “reopen done items with new stages” is duplicated lifecycle logic (belongs in tracker), and it’s easy for it to diverge from tracker’s stage status model.

### 3) What’s in the wrong layer (move)
- All DB mutation policies for work items (stuck reset, disabled stage skip, reopen) should be tracker functions.
- Hot reload concerns (resources/stage concurrency updates) are runner concerns, but the implications on “how many items to claim” should be enforced by tracker-level claim primitives.

## discovery.py
### 1) What it does well (keep)
- Pluggable strategy registry (`register`, `get_strategy`) is the right plugin seam.
- Strategies return a consistent-ish shape: `{key, data, priority}`.
- Good coverage of real-world sources: RSS, Serper, Finnhub, YouTube, filesystem, and “chained” discovery (`tracker_query`, `multi_source`).

### 2) What’s broken or fragile (fix)
- **Mixed HTTP client patterns**: some strategies use `httpx.AsyncClient()` directly, others use `lib.http_client.client`. You lose consistent timeouts, retries, proxy policy, headers, and observability.
- Priority semantics are inconsistent across strategies and also collide with tracker’s “don’t override priority if (0.0, 50.0)” magic:
  - e.g., Finnhub sets unknown cap to `50.0`, and tracker treats `50.0` as “default-ish” and overwritable.
  - This is an implicit contract spanning discovery + tracker that isn’t codified anywhere.
- Some strategies embed “runner-ish” behavior:
  - backfill loops and rate-limits are partially handled in discovery (sleep in FinnhubCompanyNewsDiscovery) and partially in runner (pacer).

### 3) What’s in the wrong layer (move)
- “Priority policy” should be a framework-level concept:
  - discovery should emit *signals* (e.g., `source_priority`, `freshness_ts`, `market_cap`) and a core policy should compute final scheduling priority (or at least validate the range/meaning).
- HTTP and throttling policy should be centralized (framework-provided client + retry/backoff hooks), not re-implemented per strategy.

## doctor.py
### 1) What it does well (keep)
- Replaces brittle “consecutive failure” heuristics with classification + action mapping.
- Writes structured `doctor_actions` records for observability and dashboarding.
- Circuit-breaker buffering (`_apply_circuit_breaker`) is a good safeguard against LLM misclassification.

### 2) What’s broken or fragile (fix)
- The contract mismatch with runner is real (runner keeps DB connection open during LLM call).
- Schema management is ad-hoc: Doctor creates `doctor_actions` table lazily, outside the central tracker migrations. This makes DB state harder to reason about and reproduce.
- Risk tiers mention notifications and business hours, but only partially implemented (no “wait 10 min then act” mechanism; it always returns a verdict immediately and runner executes it).
- `reset_error_counts()` is invoked from runner on success, but runner calls it at different times for staged vs non-staged paths; easy to regress.

### 3) What’s in the wrong layer (move)
- Doctor’s table creation should be managed by the same migration system as everything else (tracker), not by a module-level `_ensure_schema()`.
- “When/how to pause/unpause” belongs to core lifecycle policy (tracker/runner). Doctor should recommend; core should enact consistently (and log events).

## job.py
### 1) What it does well (keep)
- Strong typed-ish config objects (`Job`, `StageConfig`, `GuardsConfig`, `ResourceConfig`).
- `_infer_stage_deps()` by ordering is a good default UX.
- `update_from()` supports hot reload without restarting runner.

### 2) What’s broken or fragile (fix)
- Hot reload is incomplete:
  - `update_from()` updates stage concurrency but not `enabled`, `resource`, `outputs`, `stage_deps`, stage additions/removals, `detail_columns`, etc.
  - Runner then tries to compensate by doing “skip disabled stages” at job start, but it won’t dynamically stop/start workers if `enabled` changes.
- `PacingConfig.circuit_breaker` exists but is effectively obsolete after Doctor; this is confusing and risks “two circuit breakers” evolving independently.

### 3) What’s in the wrong layer (move)
- “Stage outputs schema” (`StageConfig.outputs`) is a framework/schema concern if you want consistent results validation and dashboard display. Right now it’s config-only and not enforced anywhere.

## executor.py
### 1) What it does well (keep)
- Minimal and correct: resolves handler callables by dotted path; clear errors.

### 2) What’s broken or fragile (fix)
- No versioned interface checking: runner assumes handler supports staged calling (`handler(item_key, data, job, stage)`), but executor doesn’t validate signature or expose capability.
- No structured handler metadata (supported stages, dependencies, output schema) beyond ad-hoc conventions.

### 3) What’s in the wrong layer (move)
- Signature/introspection rules should be defined by the core framework (runner/tracker), not scattered as “try coroutine else to_thread” patterns inside runner.

## pacer.py
### 1) What it does well (keep)
- Simple token-bucket-ish pacing that is easy to reason about.
- Hot-update rate (`update_rate`) integrates well with config reload.

### 2) What’s broken or fragile (fix)
- It’s applied only to the “first stage” in `stage_worker()`:
  - that’s a reasonable heuristic, but it’s implicit policy and may not match real bottlenecks (e.g., transcript stage might be the expensive one).
- It doesn’t integrate with shared `ResourceRegistry` limits; you can end up with pacing allowing work that then blocks on resource semaphore, wasting claim slots.

### 3) What’s in the wrong layer (move)
- Pacing policy (which stages to pace, per-resource vs per-job) should be defined declaratively in job config (or derived), not hard-coded to “first stage only”.

## Top 5 highest-leverage changes (ordered by impact)
1) **Fix SQLite concurrency + atomic claiming (runner ⇄ tracker)**
   - Change `stage_worker()` and `simple_worker()` to stop sharing one `sqlite3.Connection` across concurrent coroutines.
   - Introduce tracker functions like:
     - `claim_items_for_stage(conn, job_id, stage, n, deps, worker_id) -> list[rows]`
     - implemented as an **atomic UPDATE…RETURNING** that sets stage to `in_progress` (and increments attempts / sets timestamps) in one statement.
   - Concrete hotspots:
     - `runner.stage_worker`: `with open_db() as conn: ... asyncio.gather(process_one(...))` (shared `conn`) → replace with per-task connections or a DB-op queue/lock.
     - `tracker.get_items_needing_stage` + `mark_stage_in_progress` split → replace with claim primitive.

2) **Fix the stage failure deadlock (tracker._sync_item_status + selection rules)**
   - Current behavior can strand items as item-level `pending` with a stage-level `failed`.
   - Fix options (pick one and enforce everywhere):
     - (A) If any stage is `failed`, set item `status='failed'` immediately (recommended simplest).
     - (B) Add `status='blocked'` and make dashboard/actions treat it distinctly.
     - (C) Allow `get_items_needing_stage()` to optionally include `failed` based on retry policy.
   - Concrete code to change:
     - `tracker._sync_item_status()` logic around `elif "failed" in statuses ...` and the else branch that sets `pending`.

3) **Normalize stage state out of JSON (schema + tracker + runner)**
   - Replace `work_items.stages JSON` with a table, e.g. `work_item_stages(job_id, item_key, stage, status, started_at, ended_at, elapsed_s, error)`.
   - Benefits:
     - Proper indexes (`(job_id, stage, status, priority)`),
     - simpler queries for `stage_stats`, `get_items_needing_stage`, throughput/ETA,
     - removes stringly-typed `"{stage}_ended"` keys and JSON_extract fragility.
   - This change collapses complexity across tracker (`stage_*` methods), runner (stage queries), and schema.

4) **Make “results” the single source of truth; delete `result_path` writes (schema ⇄ tracker ⇄ runner)**
   - Stop writing `work_items.result_path` in:
     - `tracker.mark_done()`, `tracker.mark_stage_done()`,
     - runner’s result-path handling branches.
   - Enforce handler return contract:
     - stage handlers should return `dict` (stored in `results`) + optional `_priority`, `_cost`, `_fail`.
     - If you need blob/files, store a structured pointer in `results.result` (e.g., `{artifact_path, artifact_type}`).

5) **Move lifecycle healing logic into tracker utilities (runner ⇄ tracker duplication)**
   - Create tracker APIs for:
     - `reset_stuck_in_progress(job_id)` (covers both item status and stage status),
     - `apply_disabled_stages(job_id, disabled_stage_names)`,
     - `reopen_items_for_new_stages(job_id, enabled_stage_names)`,
     - `reclaim_orphans(active_item_ids)` (heartbeat behavior).
   - Then runner becomes orchestration-only; tracker becomes lifecycle-policy + DB.
   - Concrete duplicated logic today:
     - runner: “Reset stuck in_progress items”, “Skip disabled stages”, “Reopen done items missing new stages”, heartbeat orphan reclaim.
     - tracker already has stage/state primitives but not these orchestrated policies.

## If mass-simplifying into a clean core framework + “rivus plugins”: where to draw the line
**Core framework boundary (keep in-core):**
- **DB schema + migrations + all state transitions**:
  - Work queue, claiming/leasing, stage state machine, results storage, events/cost logs, pause/unpause semantics.
  - Everything in `tracker.py` should become a well-defined API with *no runner-side SQL mutations*.
- **Runner orchestration**:
  - Scheduling loops, concurrency, resource semaphores, hot reload, guard loop wiring.
  - But runner should only call tracker APIs (“claim”, “complete”, “fail”, “pause”), never hand-edit `stages`/`status`.
- **Plugin interfaces + contracts**:
  - `DiscoveryStrategy.discover() -> list[WorkItemCandidate]`
  - `StageHandler.run(item, context) -> StageResult`
  - Explicit schemas for `StageResult` (`_fail`, `_cost`, `_priority`, `artifacts`).

**Plugins (“rivus”) boundary (out-of-core):**
- All of `discovery.py` strategies (Serper, Finnhub, RSS, YouTube, filesystem, etc.).
- All job-specific handlers resolved by `executor.py`.
- Optional add-ons:
  - Doctor classifier can be a plugin that produces `Verdict`s, but the **actuation** (pause/retry/fail + logging) should be core and consistent.

**The exact line:**
- Core owns: “What is the state? How is it stored? How does it transition safely under concurrency?”
- Plugins own: “How do I find items?” and “How do I process an item at stage X?” and “How do I classify an error?”

---

## GROK

**Module: Schema/Tracker (document1 + document2)**

1. **Does well (keep)**: Excellent separation of concerns as the single source of truth for queue state, results, costs, events. Stage-aware JSON tracking is flexible without schema explosion. Idempotent migrations via _run_migrations(). Comprehensive queries like get_items_needing_stage() with deps. upsert_items() smartly merges provenance data without clobbering.

2. **Broken/fragile (fix)**: Migrations are ad-hoc list in code; _run_migrations() parses SQL heuristically (risky if ALTER format changes). result_path column is "DEAD: replaced by results table" but still used in legacy mark_done(). Stages JSON parsing everywhere is error-prone (json.loads() on NULL/empty). No transactions in some multi-statement funcs (e.g., retry_failed()). get_pending() ignores stages for simple jobs — inconsistent with stage_workers.

3. **Wrong layer (move)**: Cost logging/events are DB ops but runner/doctor log them — fine, but stage_version_hash() and reprocess_stale() are runner-specific versioning logic (move to runner or new versioning module). _merge_item_data() discovery merging is discovery-specific (move to discovery lib).

**Module: Runner (document3)**

1. **Does well (keep)**: Persistent asyncio workers per stage with semaphores/resources. Excellent hot-reload (SIGHUP/mtime watch) updating Jobs in-place. Heartbeat/orphan reclaim prevents stuck queues. Wake files for interruptible sleep. Doctor integration decouples error handling. Once mode + limit pausing for testing.

2. **Broken/fragile (fix)**: Massive 1000+ line file — god object. Duplicate discovery logic (run_discovery() here vs. discovery_task()). Stuck item reset at startup duplicates heartbeat reclaim. simple_worker() for non-staged jobs ignores stages entirely (legacy). CLI singleton via heartbeat PID check but no graceful shutdown. _interruptible_sleep() checks files every 2s — CPU waste.

3. **Wrong layer (move)**: Discovery entirely belongs in separate module (discovery.py already exists but unused here). Guard_checker() + validate() loading is runner-specific but could be pluggable. Orphan reclaim/heartbeat to separate monitor task/module. Job startup logic (reset stuck, skip disabled, reopen done) too DB-heavy — hoist to tracker.init_job().

**Module: Discovery (document4)**

1. **Does well (keep)**: Pluggable @register() strategies — clean, extensible (serper, finnhub, youtube, etc.). Params dict per-strategy. Dedup/priority in upsert_items() handles overlaps well. MultiSource/TrackerQuery chaining composable.

2. **Broken/fragile (fix)**: Many strategies hardcode paths/DBs (e.g., supplychain_seed hardcoded db_path). SerperYouTube keys by video_id but SerperSearch by url_hash — inconsistent across strategies. No built-in freshness (e.g., filter discovered_at > X). yt-dlp cmds duplicated across youtube_channel/llm_search.

3. **Wrong layer (move)**: get_strategy()/register() fine here, but run_discovery() is in runner — consolidate. _merge_item_data() provenance logic duplicated from tracker. Some strategies (vic_atoz, moltbook) fetch/parse inline — should be thin wrappers calling handlers early?

**Module: Doctor (document5)**

1. **Does well (keep)**: LLM classification smartly replaces dumb counters. Circuit breaker per-class with thresholds. Own DB conn avoids runner blocking. Logs to doctor_actions for dashboard. Notify integration.

2. **Broken/fragile (fix)**: LLM latency (1-2s) per error slows queue (every exception → classify_and_act()). _llm_classify() strips ``` fences heuristically — fragile. CLASSIFY_SYSTEM prompt huge/static. No caching (e.g., same error 10x → same class). Thresholds hardcoded dict.

3. **Wrong layer (move)**: doctor_actions table/schema should be in tracker migrations. classify_and_act() called from runner/workers — but Verdict dataclass/verdict building here. Could be tracker-integrated (e.g., tracker.classify_error()).

**Module: Job/Config (document6)**

1. **Does well (keep)**: YAML-driven, hot-reloadable Jobs with update_from(). StageConfig/resource separation scales concurrency. Guards/pacing/limits flexible. Infer deps from order — pragmatic.

2. **Broken/fragile (fix)**: _parse_stages() supports str/list but inconsistent (stages: ["price", "ir"]). No validation (e.g., handler exists?). Priority inferred nowhere. limit=0 pauses after N done — but counter shared poorly via job._item_counter hack.

3. **Wrong layer (move)**: RunnerConfig/batch_size unused (pacing.batch_size shadows). ResourceConfig loading duplicated in runner. Guards.validate() loads handler's validate() — handler coupling.

**Module: Executor (document7) + Pacer (document8)**

1. **Does well (keep)**: resolve_handler() simple/dynamic. Pacer token-bucket hot-updatable.

2. **Broken/fragile (fix)**: None major — tiny utils.

3. **Wrong layer (move)**: Pacer could be runner-internal. stage_version_hash() in tracker but uses inspect.getsource() — handler coupling, belongs in executor.

**Top 5 highest-leverage changes (ordered by impact)**

1. **Extract dedicated DiscoveryManager (runner ↔ discovery interaction)**: Runner's discovery_task()/run_discovery() duplicate strategy loading — create DiscoveryManager with its own loop (like stage_worker). Use get_strategy(job.discovery.strategy).discover(job). Call from runner as single task. Fixes duplication, enables runner --discover-only to just spawn manager. Consolidate _merge_item_data() here. Impact: Shrinks runner 200+ lines, pluggable discovery scales independently.

2. **Centralize schema/migrations in tracker (schema fragility)**: Move MIGRATIONS/EXTRA_SCHEMA to tracker.py as VERSIONED_SCHEMA_STMTs array with sqlite_master checks. Run on every open_raw_db() idempotently. Add doctor_actions to EXTRA_SCHEMA. Drop dead result_path. Add txn decorators (@tracker.txn) for multi-stmt funcs like retry_failed(), _update_stage(). Impact: Schema robust, no parse hacks, supports doctor/runner DB sharing.

3. **Hoist job init/reset logic to tracker.init_job(job_id, stages: list[str]) (runner startup bloat)**: Move reset_stuck(), skip_disabled(), reopen_done() from runner.run_job() startup to single tracker func. Call once per job on runner start. Heartbeat orphan_reclaim() → tracker.reclaim_orphans(active_items=set[int]). Impact: Runner slims to orchestration only; tracker owns state consistency.

4. **Cache Doctor classifications + faster fallback (latency bottleneck)**: Add tracker.get_cached_class(error_hash: str) → Verdict|None (doctor_actions.error_text sha1[:12]). Cache LLM results 1h. Fallback to rule-based (e.g., if "timeout|429|reset" in error_str → transient). Thresholds/PAUSE_THRESHOLD to job.guards.doctor_thresholds: dict. Impact: Drops avg classify_and_act() from 1.5s→0.1s, sustains high-throughput error rates.

5. **Unify simple/stage workers + drop legacy (inconsistent queueing)**: simple_worker() ignores stages — make all jobs staged (stages: ["process"] default). get_pending() → get_items_needing_stage("process"). Drop mark_done/in_progress etc. Impact: Single worker path (stage_worker), tracker APIs consistent, no legacy result_path.

**Mass-simplification to core framework + rivus plugins**:

**Core framework (tracker.py + minimal_runner.py ~500 LOC)**: Tracker DB (work_items minimal: id/job_id/item_key/status/priority/stages/data) + upsert/get_needing/mark_stage_* APIs. Basic Runner: YAML→Jobs, asyncio.gather([discovery_task(job), [stage_worker(sc) for sc in stages]]). Heartbeat/orphans. No doctor/hot-reload/wakes (add later).

**Rivus plugins (separate packages)**: 
- discovery/: strategies → DiscoveryManager plugin.
- doctor/: classify_and_act() → optional error handler plugin.
- handlers/: per-job (executor.resolve_handler()).
- monitors/: guards/heartbeat/ctl.py dashboard.
- config/: Job YAML parsing + hot-reload.

Line: Core owns queue + worker loop (universal). Plugins own verticals (discovery/error/job-specific). Runner imports plugin managers dynamically (e.g., if job.doctor_enabled: tasks.append(doctor_task())).

---

## OPUS

## Module-by-Module Review

### 1. Schema (`schema.sql` / embedded in `tracker.py`)

**What it does well:**
- WAL mode + foreign keys pragmas are correct for concurrent read/write from async workers.
- The `work_items` + `results` separation is sound — discovery metadata vs. stage outputs live in different tables.
- `UNIQUE(job_id, item_key, stage)` on results is the right constraint for idempotent stage output.
- The `job_events` table is a proper append-only audit log with good indexes.

**What's broken or fragile:**
- **Schema is defined in THREE places** — the `SCHEMA` string, `EXTRA_SCHEMA` string, and `MIGRATIONS` list in `tracker.py`, plus the extracted `schema.sql` document. There's no single source of truth. The `SCHEMA` string creates tables without the columns that `MIGRATIONS` adds, meaning the "current" schema is only knowable by mentally replaying migrations.
- **`result_path` is dead but still written to.** `mark_stage_done()` at line ~accepts `result_path` and writes it; `mark_done()` writes it; the schema comment says "DEAD: replaced by results table." Yet `stage_worker` in the runner still passes `result_path=str(result)` for non-dict results. This is ghost state that will confuse anyone reading the DB.
- **`stages` JSON column is a schema-within-a-schema.** It stores `{stage_name: status, stage_name_started: iso, stage_name_ended: iso, stage_name_elapsed_s: float, stage_name_error: str}` — a flattened key-value store with naming conventions instead of structure. Every function that touches it has to manually filter `_started`, `_ended`, `_elapsed_s`, `_error` suffixes. This is the single biggest source of bugs in the codebase.
- **No `doctor_actions` table in the schema.** Doctor creates its own table lazily via `_ensure_schema()` with a module-global `_schema_applied` flag. This means the schema document is incomplete, and there's a race condition if two connections try to create it simultaneously (SQLite handles this, but the `_schema_applied` flag is per-process, not per-connection).
- **`job_cost_log` has no index on `item_key`.** If you ever want per-item cost lookups, it's a full scan.

**What's in the wrong layer:**
- The `stages` JSON manipulation logic (suffix filtering, timing computation, status derivation) should be a `StageTracker` class or at minimum a set of pure functions operating on a typed dict, not scattered across `_get_stages`, `_update_stage`, `_sync_item_status`, `retry_failed`, and the runner's stuck-item reset code.

---

### 2. Tracker (`tracker.py`)

**What it does well:**
- `upsert_items()` with `_merge_item_data()` is thoughtfully designed — the provenance merging (topics, queries, discovery_sources) handles the real-world case of the same item discovered by multiple strategies without clobbering metadata.
- `stage_version_hash()` with `VERSION_DEPS` is clever — it catches changes in imported helpers, not just the stage function itself.
- `get_items_needing_stage()` with dependency checking via `json_extract` is the right query pattern for a stage DAG.
- `reprocess_stale()` is a clean implementation of version-aware reprocessing.

**What's broken or fragile:**
- **`_sync_item_status` has a logic gap.** The condition for setting `failed` is `"failed" in statuses and "pending" not in statuses and "in_progress" not in statuses and "retry_later" not in statuses`. But if stages are `{a: "done", b: "failed", c: "skipped"}`, it correctly marks failed. However, if stages are `{a: "done", b: "failed", c: "done"}`, it ALSO marks the item failed — meaning one failed stage poisons the whole item even if all other stages succeeded. This might be intentional, but it means there's no concept of "partially failed" items.
- **`get_items_needing_stage()` returns items with `stages IS NULL`**, which means uninitialized items. But the query also checks `status != 'done'`. If an item has `status='pending'` and `stages IS NULL`, the dependency check `json_extract(stages, ?) IN ('done', 'skipped')` will return NULL (not in the set), so items with deps will never be returned if stages is NULL. This is a silent bug — the first stage works (no deps), initializes stages, and then subsequent stages work. But if `_get_stages` is never called for the first stage (e.g., the first stage is disabled), the item is stuck forever.
- **`retry_failed()` sets `priority = -1` unconditionally.** This means retried items jump ahead of everything, including items that were deliberately prioritized by discovery (e.g., by market cap). After one retry cycle, all priority information is destroyed.
- **`stage_stats()` is O(stages × statuses) queries** — 4 stages × 6 statuses = 24 separate COUNT queries. This should be a single query with `GROUP BY json_extract(stages, ?)` or a single scan with Python-side aggregation.
- **Connection management is inconsistent.** Some functions call `conn.commit()` internally (e.g., `mark_done`, `store_result`, `log_event`), while others expect the caller to commit. `_update_stage` commits, but `_sync_item_status` (called within it) does not — it relies on the caller's commit. This works because `_update_stage` commits after calling `_sync_item_status`, but it's fragile.
- **`get_job_stats` and `job_stats` are duplicate functions** doing the same thing with slightly different implementations (one uses `dict(row)` pattern, the other uses `row[0]/row[1]`).

**What's in the wrong layer:**
- `STAGES = ["price", "ir", "ib", "transcript"]` is a hardcoded domain constant that doesn't belong in the generic tracker. It's never actually used by any function — everything takes `job_stages` as a parameter. Dead code.
- `stage_timing_stats()` and `get_stage_pipeline_metrics()` are dashboard/reporting concerns, not tracking concerns. They do complex aggregation that should live in a reporting module.
- The `_merge_item_data` function with its `scalar_to_list` mapping and provenance logic is discovery-layer business logic embedded in the tracker.

---

### 3. Runner (`runner.py`)

**What it does well:**
- The architecture of independent `discovery_task` + `stage_worker` + `guard_checker` per job, all coordinated via `asyncio.Event` (tripped), is clean and correct.
- `ResourceRegistry` for cross-job semaphore sharing is well-designed — stages in different jobs that hit the same external service (e.g., YouTube) share a concurrency limit.
- The heartbeat loop with orphan reclamation is a real-world necessity done correctly — it checks `active_items` set membership before resetting, avoiding false reclamation of genuinely in-progress items.
- Hot-reload via SIGHUP + mtime detection with `job.update_from()` is production-grade.
- The singleton lock via heartbeat file with PID liveness check is correct (handles stale heartbeats from crashed processes).

**What's broken or fragile:**
- **`stage_worker` holds a DB connection for the entire batch processing loop.** The `with open_db() as conn:` block at line ~wraps the item fetch AND all `process_one` coroutines (which run concurrently via `asyncio.gather`). If a stage handler takes 30 seconds (e.g., downloading a video), the connection is held open the entire time. With multiple stage workers per job and multiple jobs, this can exhaust SQLite's practical connection limit. The doctor module correctly avoids this pattern ("Does NOT take a DB connection"), but the runner itself doesn't follow this principle.
- **`process_one` is a closure over `conn`** — all concurrent `process_one` invocations for a batch share the same `sqlite3.Connection`. SQLite connections are not thread-safe, and while `asyncio.gather` is single-threaded, `asyncio.to_thread(handler, ...)` pushes handler execution to a thread pool. If two handlers in the same batch both finish at the same time and their `mark_stage_done` calls interleave on the shared connection, you get `sqlite3.ProgrammingError: Recursive use of cursors not allowed`. This is a **concurrency bug** that would manifest under load.
- **The stuck-item reset at job startup duplicates logic from the heartbeat loop.** Both reset `in_progress → pending` with stage JSON manipulation. The startup version also handles `stages LIKE '%"in_progress"%'` (string matching on JSON), while the heartbeat version uses Python-side JSON parsing. Different approaches to the same problem.
- **`_interruptible_sleep` polls every 2 seconds by checking file existence.** This works but is wasteful — an `asyncio.Event` per job would be cleaner and instant.
- **The `once_watchdog` uses a 9-second idle timeout** (3 ticks × 3 seconds) to decide "no more work." But if a stage handler takes >9 seconds and nothing else is in the queue, the watchdog will trip `tripped` while a handler is still running. The `item_counter["active"]` check mitigates this, but there's a TOCTOU race: the watchdog reads `active == 0` just before `process_one` increments it.
- **`retry_later_counts` is a module-global dict** that persists across hot-reloads and is never cleaned up for jobs that are removed from config. Memory leak in long-running processes.
- **Error handling in `process_one` calls `classify_and_act` (which takes 1-2s for LLM) while holding the batch's DB connection.** The doctor module's docstring explicitly says "Does NOT take a DB connection" and "the runner should release its connection before calling this," but the runner doesn't follow this advice — `classify_and_act` is called inside the `with open_db() as conn:` block.

**What's in the wrong layer:**
- The stuck-item reset, disabled-stage skipping, and done-item reopening logic at the top of `run_job()` are **schema migration/repair operations**, not runtime concerns. They should run once at startup in a dedicated `repair_job_state()` function in the tracker, not in the runner's hot path.
- `_retry_after_2h()` and `_tomorrow_midnight_pt()` are scheduling policy that should live in the job config or a scheduling module, not hardcoded in the runner.
- The `BackoffTimer` class is generic infrastructure that belongs in a utils module.

---

### 4. Discovery (`discovery.py`)

**What it does well:**
- The registry pattern (`@register("name")` + `get_strategy()`) is clean and extensible.
- Each strategy is self-contained with clear `discover() → list[dict]` contracts.
- `SerperYouTubeDiscovery` correctly deduplicates by video_id across topics/queries and merges provenance metadata.
- `VICWaybackDiscovery` with CDX cache + paywall cutoff detection is sophisticated and correct.
- `VICAtozDiscovery` with disk-cached HTML listings avoids redundant fetches.

**What's broken or fragile:**
- **`FinnhubCalendarDiscovery` makes N sequential HTTP calls** for company profiles (one per symbol) with no concurrency. For 200 earnings events with 150 unique symbols, this is 150 sequential HTTP calls with no rate limiting beyond the 10-second timeout. Should use `asyncio.gather` with a semaphore.
- **`YouTubeChannelDiscovery` and `LLMSearchDiscovery` shell out to `yt-dlp`** with `asyncio.create_subprocess_exec`. The `|||` separator in `--print` format strings will break if a video title contains `|||`. Unlikely but not impossible — should use a separator that can't appear in YouTube metadata (e.g., `\x00` or a UUID).
- **`SerperSearchDiscovery` and `SerperYouTubeDiscovery` have overlapping functionality.** Both search Serper, both handle backfill with monthly date ranges, both deduplicate. The difference is keying strategy (url_hash vs video_id). This should be one strategy with a `key_by` parameter.
- **`VICWaybackDiscovery.discover()` imports from `projects.vic.db`** — a project-specific module. This means the generic discovery framework has a hard dependency on a specific project. If `projects.vic.db` isn't installed, importing `discovery.py` works (lazy import), but running this strategy fails with an unhelpful ImportError.
- **`CompanySourcesDiscovery` uses `json.loads(article.read_text())` without size limits.** A malformed or huge JSON file could OOM the discovery process.
- **`StaticEINListDiscovery` has no retry logic** for the ProPublica API calls. A single timeout kills discovery for that org.
- **`MultiSourceDiscovery` catches all exceptions from sub-strategies** and continues, but logs at `warning` level. If 4 out of 5 sources fail, discovery "succeeds" with 20% of expected items and no alarm is raised.

**What's in the wrong layer:**
- `VICAtozDiscovery`, `VICWaybackDiscovery`, `CompanySourcesDiscovery`, `StaticEINListDiscovery`, `VCDataSourcesDiscovery`, `MoltbookFeedDiscovery`, and `SemisupplySeedDiscovery` are **project-specific strategies** registered in the framework's core discovery module. The framework should only contain `BaseDiscovery`, the registry, and maybe `ManualDiscovery`. Everything else should be in project-specific modules that register themselves on import.
- The `http_client` import and `auth_args` import at module level mean the framework depends on project-specific HTTP and YouTube auth infrastructure.

---

### 5. Doctor (`doctor.py`)

**What it does well:**
- The LLM classification approach is genuinely clever — instead of pattern-matching on error strings, it uses an LLM to understand context (job, stage, item, error message) and classify. The system prompt is well-crafted with clear rules and examples.
- The circuit breaker with per-class thresholds is a good design — transient errors get 10 chances, systemic gets 3, code bugs get 2. This prevents a single misclassification from pausing a job.
- The `_CLASS_TO_ACTION` mapping is clean and extensible.
- Graceful degradation when LLM is unavailable (falls back to `fail_item`).

**What's broken or fragile:**
- **Every single error triggers an LLM call.** At $0.001-0.01 per call, processing 10,000 items with a 5% error rate = 500 LLM calls just for error classification. There's no caching of classifications for identical error patterns. If 200 items fail with `ConnectionTimeout to api.example.com`, that's 200 identical LLM calls.
- **`_error_counts` is a module-global dict with no TTL.** If a job has 2 systemic errors, then runs fine for a week, then has 1 more systemic error, the counter is at 3 and triggers a pause. The counts should decay over time or reset on successful runs (which `reset_error_counts` does, but only for the specific job+stage combo that succeeded).
- **The `_schema_applied` flag is process-global** but connections can be to different databases (the `db_path` parameter in `open_db`). If you open a connection to a test DB, the schema won't be applied because `_schema_applied` is already `True` from the production DB.
- **`_llm_classify` truncates error to 800 chars** but `log_action` truncates to 500 chars. If the classification depends on content between chars 500-800, the log won't contain the information needed to understand the classification.
- **The `notify` action exists in the `Action` enum but is never used.** `_CLASS_TO_ACTION` maps everything to either `retry_later`, `fail_item`, or `pause_job`. The notification is a side effect of `pause_job` for medium/high risk, not a standalone action.

**What's in the wrong layer:**
- The `_CLASSIFY_SYSTEM` prompt contains hardcoded fix commands (`ctl.py clear-errors JOB`, `stockloader serve`). These are deployment-specific and should be configurable per-job or per-project.
- The `CLASSIFY_MODEL = "xai/grok-4-1-fast-non-reasoning"` is hardcoded. Should be configurable.
- The Pushover notification in `_notify_if_needed` imports `lib.notify` — a project-specific notification module. The framework should have a pluggable notification interface.

---

### 6. Job (`job.py`)

**What it does well:**
- Clean dataclass hierarchy with sensible defaults.
- `_infer_stage_deps` from order is a smart convention — linear pipelines don't need explicit dep declaration.
- `update_from()` for hot-reload returns change descriptions, enabling the runner to log exactly what changed.
- `_parse_stages` supporting both `list[str]` and `list[dict]` is good backward compatibility.

**What's broken or fragile:**
- **`update_from()` doesn't update `stage_configs` for new/removed stages.** It only updates `concurrency` for existing stages. If you add a new stage via YAML reload, it won't appear until restart. The runner's "reopen done items missing new stages" logic at startup handles this, but only on restart — not on hot-reload.
- **`update_from()` doesn't update `stage_deps`.** If you change the stage order or add explicit deps, they won't take effect until restart.
- **`update_from()` silently updates `pacing.circuit_breaker` and `pacing.batch_size`** as side effects of the `max_per_hour` check (they're updated unconditionally inside the `if` block). If only `batch_size` changes but `max_per_hour` doesn't, the change is missed.
- **No validation on load.** A job with `handler: ""` and stages defined will fail at runtime when `resolve_handler("")` is called. A job with `discovery.strategy: "nonexistent"` will fail on first discovery. These should fail at load time.

**What's in the wrong layer:**
- `StorageConfig` with `resolved_dir` is never used by the runner or tracker — it's presumably used by handlers directly. If so, it's handler configuration that happens to be loaded with the job.

---

### 7. Executor (`executor.py`)

**What it does well:**
- Simple, correct, good error messages.

**What's broken or fragile:**
- **No caching.** Every call to `resolve_handler()` does `importlib.import_module()`. In the runner, this is called once per job at startup, so it's fine. But if it were called per-item, it would be slow.
- **No validation that the handler signature matches expectations.** The runner expects `handler(item_key=, data=, job=, stage=)` for staged jobs and `handler(item_key=, data=, job=)` for simple jobs. A handler with the wrong signature will fail at runtime with a confusing `TypeError`.

**What's in the wrong layer:**
- This is fine where it is. It's a thin utility.

---

### 8. Pacer (`pacer.py`)

**What it does well:**
- Simple token-bucket implementation that works.
- `update_rate()` for hot-reload is correct.

**What's broken or fragile:**
- **Not actually a token bucket.** It's a minimum-interval enforcer. A true token bucket allows bursts up to bucket size. This enforces uniform spacing — if `max_per_hour=60`, every call waits exactly 60 seconds, even if the system was idle for an hour. For bursty workloads (discovery finds 50 items, you want to process them quickly then idle), this is suboptimal.
- **`_last_call` is per-Pacer, but multiple stage workers share the same Pacer** (via `pacers[job.id]`). However, only the first stage's worker calls `pacer.wait()` (`is_first_stage` flag). This means pacing only applies to the first stage — subsequent stages run as fast as their semaphore allows. This is probably intentional (pace discovery/first-stage, let pipeline flow freely), but it's not documented.
- **Race condition under concurrent access.** If two coroutines call `wait()` simultaneously, both read `_last_call`, both compute the same delay, both sleep, both update `_last_call`. Result: two calls happen at the same time instead of being spaced. In practice, the `is_first_stage` guard means only one worker calls `wait()`, but the class itself isn't safe for concurrent use.

**What's in the wrong layer:**
- Fine where it is.

## Top 5 Highest-Leverage Changes

### 1. Fix the shared-connection concurrency bug in `stage_worker` (Critical — data corruption risk)

**The problem:** `stage_worker` opens one `sqlite3.Connection` via `with open_db() as conn:`, then launches N concurrent `process_one` closures via `asyncio.gather`. Each closure calls `mark_stage_in_progress`, `mark_stage_done`/`mark_stage_failed`, `store_result`, `update_item_priority`, and `log_cost` — all on the shared `conn`. When handlers use `asyncio.to_thread()`, the thread-pool callback and the main loop can interleave writes on the same connection.

**The fix:** Each `process_one` should open its own connection:

```python
async def process_one(item):
    # ... handler call ...
    with open_db() as item_conn:
        mark_stage_done(item_conn, item_id, stage, job.stages)
        store_result(item_conn, job.id, item_key, stage, result, handler_version=version)
```

Additionally, move the `classify_and_act` call OUTSIDE the connection scope, as its own docstring demands. The current code violates the doctor module's contract.

**Impact:** Prevents `sqlite3.ProgrammingError` under concurrent load and connection exhaustion during slow handlers.

---

### 2. Extract the `stages` JSON into a proper `stage_status` table (High — eliminates the largest bug surface)

**The problem:** The `stages` JSON column in `work_items` is a denormalized key-value store with naming conventions (`{stage}_started`, `{stage}_ended`, `{stage}_elapsed_s`, `{stage}_error`). Every function that touches it must manually filter suffixes. The runner's stuck-item reset, `retry_failed`, `_sync_item_status`, `_update_stage`, `_get_stages`, `reprocess_stale`, and the heartbeat orphan reclaimer ALL contain hand-rolled JSON manipulation with the same suffix-filtering logic. This is the #1 source of subtle bugs.

**The fix:** Create a `stage_status` table:

```sql
CREATE TABLE stage_status (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    work_item_id INTEGER NOT NULL REFERENCES work_items(id),
    job_id TEXT NOT NULL,
    item_key TEXT NOT NULL,
    stage TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    started_at TEXT,
    ended_at TEXT,
    elapsed_s REAL,
    error TEXT,
    UNIQUE(work_item_id, stage)
);
CREATE INDEX idx_stage_status_job_stage ON stage_status(job_id, stage, status);
```

`get_items_needing_stage` becomes a JOIN instead of `json_extract`. `_sync_item_status` becomes `SELECT status FROM stage_status WHERE work_item_id = ? GROUP BY status`. Stuck-item reset becomes `UPDATE stage_status SET status='pending' WHERE status='in_progress'`.

**Impact:** Eliminates ~200 lines of JSON manipulation, makes queries indexable, and removes the entire class of suffix-filtering bugs.

---

### 3. Add error-pattern caching to Doctor (High — cost and latency reduction)

**The problem:** Every error triggers an LLM call (~1-2s, ~$0.001-0.01). Identical errors (same exception type + message pattern) get reclassified every time. 200 items failing with `ConnectionTimeout` = 200 identical LLM calls.

**The fix:** Cache classifications by `(job_id, stage, error_signature)` where `error_signature = f"{type(error).__name__}:{str(error)[:200]}"`. Use an LRU dict with TTL (e.g., 30 minutes). On cache hit, return the cached verdict directly. On cache miss, call LLM and cache the result.

```python
from functools import lru_cache
import hashlib

_classification_cache: dict[str, tuple[Verdict, float]] = {}  # key → (verdict, timestamp)
CACHE_TTL = 1800  # 30 minutes

def _cache_key(job_id: str, stage: str, error: Exception) -> str:
    sig = f"{type(error).__name__}:{str(error)[:200]}"
    return hashlib.md5(f"{job_id}:{stage}:{sig}".encode()).hexdigest()
```

**Impact:** 10-100x reduction in doctor LLM costs under error storms. Faster error handling (cache hit = 0ms vs 1-2s LLM call).

---

### 4. Move project-specific discovery strategies out of the framework (Medium — maintainability)

**The problem:** `discovery.py` contains 18 strategy classes. Of these, only `BaseDiscovery`, `ManualDiscovery`, and arguably `MultiSourceDiscovery` are framework-generic. The rest (`VICAtozDiscovery`, `FinnhubCalendarDiscovery`, `MoltbookFeedDiscovery`, `SemisupplySeedDiscovery`, etc.) are project-specific. They import project modules (`projects.vic.db`), reference project-specific file paths, and contain domain logic. This makes the framework impossible to reuse without carrying all projects.

**The fix:** Use entry-point-style auto-registration. Each project defines its strategies in its own module:

```
jobs/lib/discovery.py          → BaseDiscovery, registry, ManualDiscovery, MultiSourceDiscovery
jobs/strategies/youtube.py     → YouTubeChannelDiscovery, YouTubeVideoListDiscovery
jobs/strategies/serper.py      → SerperSearchDiscovery, SerperYouTubeDiscovery
projects/vic/discovery.py      → VICAtozDiscovery, VICWaybackDiscovery
projects/earnings/discovery.py → FinnhubCalendarDiscovery
```

The job YAML's `handler:` field already uses dotted paths. Discovery strategies should too: `discovery.strategy: "projects.vic.discovery.vic_atoz"` instead of `discovery.strategy: "vic_atoz"`.

**Impact:** Framework becomes reusable. Adding a new project doesn't touch framework code. Removes ~800 lines from `discovery.py`.

---

### 5. Consolidate the three places that reset stuck/stale/disabled items (Medium — correctness)

**The problem:** Item state repair happens in three places with different logic:
1. **`run_job()` startup** (lines ~reset stuck in_progress, skip disabled stages, reopen done items missing new stages) — uses `stages LIKE '%"in_progress"%'` string matching.
2. **`heartbeat()` loop** — uses Python-side JSON parsing of stages, checks `active_items` set.
3. **`retry_failed()`** — resets failed stages to pending with priority=-1.

Each reimplements the same stages-JSON manipulation with slightly different approaches and different edge case handling.

**The fix:** Create a single `repair_item_states(conn, job_id, job_stages, active_item_ids=None)` function in the tracker that handles all three cases:

```python
def repair_item_states(conn, job_id: str, job: Job, active_ids: set[int] | None = None) -> dict[str, int]:
    """Repair all item state inconsistencies. Returns counts of each repair type."""
    counts = {"stuck_reset": 0, "disabled_skipped": 0, "reopened": 0}
    # 1. Reset stuck in_progress (not in active_ids)
    # 2. Skip disabled stages
    # 3. Reopen done items missing new stages
    # All in one pass over work_items
    ...
```

Call it at startup AND in the heartbeat loop. This eliminates the three separate implementations and ensures consistent behavior.

**Impact:** Eliminates duplicated repair logic, prevents the class of bugs where one repair path handles an edge case and another doesn't.

## Framework vs. Plugin Boundary

If I were mass-simplifying this into a **clean core framework** + **project plugins**, here's exactly where I'd draw the line:

### Core Framework (`jobs/`)

**Keep in framework — these are generic batch-processing primitives:**

```
jobs/
├── lib/
│   ├── tracker.py        # SQLite state management (BUT with stage_status table, not JSON)
│   ├── runner.py          # Async worker orchestration (stage_worker, simple_worker, discovery_task)
│   ├── job.py             # Job/Stage/Pacing/Guards dataclasses + YAML loading
│   ├── executor.py        # Handler resolution by dotted path
│   ├── pacer.py           # Rate limiting
│   ├── discovery.py       # BaseDiscovery + registry + ManualDiscovery + MultiSourceDiscovery ONLY
│   └── doctor.py          # Error classification (BUT with pluggable classifier interface)
├── schema.sql             # Single source of truth for DB schema
└── runner.py              # CLI entry point
```

**Framework contracts (interfaces that plugins implement):**

```python
# Discovery plugin interface
class BaseDiscovery:
    async def discover(self) -> list[dict[str, Any]]:  # {"key": str, "data": dict, "priority": float}
        ...

# Handler interface  
async def handler(*, item_key: str, data: str, job: Job, stage: str) -> dict | None:
    ...
    # Return dict with results; special keys: _fail, _priority, _cost
    # Raise RetryLaterError for temporal unavailability

# Validator interface (optional, for guards)
def validate(conn: sqlite3.Connection, job_id: str) -> str | None:
    ...  # Return error string to pause, None to continue

# Doctor classifier interface (pluggable, defaults to LLM)
class ErrorClassifier:
    async def classify(self, job_id: str, stage: str, item_key: str, error: str) -> dict:
        ...
```

### Plugin Boundary — Everything Below This Line Moves Out

**Move to `projects/` or `plugins/`:**

```
projects/
├── vic/
│   ├── discovery.py       # VICAtozDiscovery, VICWaybackDiscovery (register on import)
│   ├── handler.py         # stage_extract, stage_check_enrich, etc.
│   └── db.py              # VIC-specific DB
├── earnings/
│   ├── discovery.py       # FinnhubCalendarDiscovery, FinnhubCompanyNewsDiscovery
│   └── handler.py         # price, ir, ib, transcript stages
├── youtube/
│   ├── discovery.py       # YouTubeChannelDiscovery, SerperYouTubeDiscovery, YouTubeVideoListDiscovery
│   └── handler.py         # download, transcribe, summarize stages
├── supplychain/
│   ├── discovery.py       # SemisupplySeedDiscovery, SemisupplyExpandDiscovery
│   └── handler.py
└── moltbook/
    ├── discovery.py       # MoltbookFeedDiscovery
    └── handler.py
```

**The registration mechanism:** Each project's `discovery.py` uses `@register("vic_atoz")` at import time. The `jobs.yaml` references strategies by name. The runner imports project modules based on the `handler:` dotted path (which already works via `executor.py`). Discovery strategies auto-register when their module is imported. Add a `plugins:` key to `jobs.yaml`:

```yaml
plugins:
  - projects.vic.discovery
  - projects.earnings.discovery
  - projects.youtube.discovery
```

The runner imports these at startup, triggering registration.

**What specifically gets cut from the framework:**

| Current location | Lines | Destination |
|---|---|---|
| `discovery.py` strategies (15 of 18) | ~900 | `projects/*/discovery.py` |
| `tracker.py` `STAGES` constant | 1 | Delete (unused) |
| `tracker.py` `stage_timing_stats`, `get_stage_pipeline_metrics` | ~80 | `jobs/lib/reporting.py` |
| `tracker.py` `_merge_item_data` | ~50 | Keep but simplify (it's generic enough) |
| `doctor.py` `_CLASSIFY_SYSTEM` fix commands | 5 | Configurable per-deployment |
| `doctor.py` `CLASSIFY_MODEL` | 1 | `jobs.yaml` `doctor:` config section |
| `doctor.py` Pushover notification | ~15 | Pluggable notifier interface |
| `runner.py` `_tomorrow_midnight_pt` | 5 | `jobs.yaml` guard config |
| `runner.py` stuck-item reset (startup) | ~50 | `tracker.py` `repair_item_states()` |
| `runner.py` disabled-stage skip (startup) | ~30 | `tracker.py` `repair_item_states()` |
| `runner.py` done-item reopen (startup) | ~25 | `tracker.py` `repair_item_states()` |

**Net result:** The framework shrinks from ~3,500 lines to ~1,800 lines. Each project is self-contained. Adding a new project means: write a discovery strategy, write a handler, add a YAML entry. Zero framework changes.

