# Jobs Framework Vario Critique (maxthink)

Models: opus, gpt-pro, grok, gemini


## GEMINI

**1. Schema decomposition: 6 tables**
The decomposition correctly separates queue state (`work_items`) from payload storage (`results`), which is excellent for performance—you don't want to drag large JSON payloads into memory when polling for pending work. 
However, I would make two changes:
*   **Merge `job_state` and `job_runs`**: `job_state` (pause status, errors) and `job_runs` (start/end times, counts) should just be a single `jobs` table. Right now, `job_state` acts as a pseudo-jobs table but lacks the core job definition. 
*   **Extract `item_stages`**: Storing stage states in `work_items.stages` (JSON) is a concurrency nightmare (see point 2). I'd drop the JSON column and add an `item_stages (job_id, item_key, stage_name, status, started_at, ended_at)` table.

**2. The `stages` JSON column**
It is a **maintenance and concurrency trap**. 
In `db.py`, `_update_stage` reads the JSON string, deserializes it, updates a key, serializes it, and writes it back (`UPDATE work_items SET stages = ?`). If you have two workers executing different stages (e.g., "price" and "ir") for the *same* item concurrently, the classic read-modify-write race condition will cause one worker to overwrite the other's status update. SQLite does support atomic JSON updates via `json_set()`, but the current Python-side manipulation completely bypasses that. You must normalize this to an `item_stages` table or rewrite the SQL to use SQLite's native `json_patch()`/`json_set()`.

**3. The `Pacer` class (35 lines)**
It is too naive and should be replaced with an off-the-shelf library like `aiolimiter`.
The current implementation enforces a strict, rigid sleep between every single call (`interval = 3600.0 / max_per_hour`). If `max_per_hour` is 60, it forces exactly a 60-second wait between executions. It does not allow for bursts (e.g., doing 5 fast API calls and then waiting). A true token-bucket algorithm is much more efficient for network I/O, and maintaining a custom async rate limiter is usually not worth the edge cases.

**4. `executor.py` (31 lines)**
Keep it, but **remove the hardcoded coupling**.
Right now, it forces `full_module = f"jobs.{module_path}"`. This tightly couples your generic execution framework to a specific folder structure (`jobs/`). If you drop that prefix and just let it resolve absolute dotted paths (e.g., `my_app.handlers.backfill`), it becomes a perfectly fine, reusable dynamic importer with much better error messages than raw `importlib`.

**5. `job.py` dataclasses**
This is the **right amount of structure**. 
Using 8 distinct dataclasses (`RunnerConfig`, `StageConfig`, `Job`, etc.) provides excellent IDE autocomplete, type safety, and a clear schema for your YAML files. The helper methods like `_parse_stages` and `_infer_stage_deps` provide great ergonomics, allowing users to write simple YAML while the framework gets strictly typed objects. The `update_from` method is slightly verbose but avoids the pitfalls of deep-merging dictionaries.

**6. Version hashing (`stage_version_hash` + `VERSION_DEPS`)**
It is **clever, but extremely fragile**. 
Using `inspect.getsource()` to hash function code means that fixing a typo in a comment, changing whitespace, or renaming a local variable will alter the hash. If this triggers `reprocess_stale`, a trivial cosmetic commit could accidentally kick off a massive, expensive backfill of millions of items. Furthermore, `getsource()` fails if the code is compiled (Cython/PyInstaller) or sometimes when decorators are used. 
**Fix:** Drop this entirely and use explicit semantic versioning (e.g., `VERSION = "1.2"` on the handler). Explicit is better than implicit for batch job invalidation.

**7. The `upsert_items` merge logic**
The complexity is justified for the business requirement, but **it is in the wrong layer**.
`db.py` contains 100 lines of hardcoded domain logic about `topics`, `queries`, and `discovery_sources`. A database adapter should not know about the business domain's specific metadata fields. This makes the DB layer completely non-reusable for other projects.
**Fix:** `upsert_items` should just accept a generic JSON patch or a merge callback function. The specific `scalar_to_list` domain logic should live in the discovery worker or a domain-specific wrapper.

**8. What's missing for a production framework**
*   **Worker Heartbeats / Zombie Recovery:** If a worker crashes mid-stage, the stage stays `in_progress` forever. You need a heartbeat mechanism and a sweeper that resets `in_progress` items to `pending` if no heartbeat is seen for X minutes.
*   **Dead Letter Queue (DLQ):** Items that fail repeatedly (e.g., 5 times) should be moved to a terminal `poison_pill` status so they stop clogging the queue.
*   **Distributed Locking:** SQLite WAL is great for a single node, but if this scales to multiple pods/containers, you'll get `database is locked` errors without a dedicated queueing layer (like Redis/RabbitMQ) or a strict primary-writer setup.

**9. Extracting a generic reusable core**
If I were open-sourcing this:
*   **Stays in core:** `db.py`'s schema (minus the hardcoded `STAGES = ["price", "ir", "ib", "transcript"]`), the queueing logic, `job.py`'s DAG/dependency resolution, and the dataclass YAML parsing.
*   **Becomes a plugin/app-code:** The hardcoded `STAGES` list, the `upsert_items` domain-specific JSON merge logic, the `jobs.` prefix in `executor.py`, and `job_cost_log` (cost tracking is highly specific to LLM/API workflows and should be a generic `metrics` plugin instead).

**10. One concrete simplification today**
**Delete `stage_version_hash` and `VERSION_DEPS`.** 
Replace it with a simple requirement that handlers define a `STAGE_VERSIONS = {"extract": "v1"}` dictionary. It removes ~40 lines of brittle, magic introspection code, prevents accidental massive re-runs caused by code formatting (like running `black` or `ruff` on your codebase), and makes version bumping an intentional human decision rather than an unpredictable side effect.


## GPT-PRO

1) Schema decomposition (6 tables): mostly right; a few tweaks

What’s good:
- `work_items` is the correct “unit of work” table: `job_id + item_key` uniqueness, `status`, `attempts`, timestamps, `error`, `priority`.
- `results` separated from `work_items` is good: results tend to be large, optional, and versioned.
- `job_state` (pause/error) is a pragmatic control-plane table.
- `job_events` and `job_cost_log` are useful observability add-ons.

What I’d change:
- Merge/drop:
  - Consider dropping `job_runs` unless you actually have a “run-based” operational model. In a continuously running discovery/worker system, “runs” are often an artificial boundary. If it’s used for dashboards, you can derive similar metrics from `job_events` (e.g., `run_started` / `run_ended`) without a dedicated table.
  - Alternatively: keep `job_runs` but make it first-class: add `run_id` FK to `work_items` attempts/events to tie work to a run. Right now `job_runs` is weakly connected (no FK usage).
- Add:
  - A normalized `item_attempts` table (or `stage_attempts`) if you want real production-grade debugging:
    - `(job_id, item_key, stage, attempt_no, started_at, ended_at, status, error, worker_id, trace_id, metrics_json)`
    - This replaces the need to overload `work_items.stages` with `_started/_ended/_elapsed_s` keys and gives you history not just last state.
  - Add `created_at` / `updated_at` on `work_items` (you have `discovered_at` and `last_attempt_at`, but not a general update marker).
  - Add `next_retry_at` at item or stage level. You have `retry_later` status but no schedule; workers will keep selecting them unless the selection query avoids them (your `get_items_needing_stage()` includes `'retry_later'` as eligible).
- Constraints/indexing:
  - SQLite JSON queries can be slow. If you keep JSON for stages, consider generated columns or helper columns (e.g., `stage_price_status`), or at least additional indexes beyond `idx_work_items_job_status`.
  - `results`: index on `(job_id, stage)` if you frequently query stale counts by stage (`get_stale_count()` does).

2) `work_items.stages` JSON: elegant for prototypes; becomes a trap at scale

Why it’s elegant:
- You can add stages without schema migrations.
- Stage timing keys (`{stage}_started`, `{stage}_ended`, `{stage}_elapsed_s`) are convenient, and `_sync_item_status()` can derive item status.

Why it’s a maintenance trap:
- Queryability: `stage_stats()` loops through every stage and every `STAGE_STATUSES` and runs a query per pair (N_stages * N_status queries). That will hurt once you have many jobs/items.
- Correctness drift: JSON structure is informal. You already have to defensively backfill in `_get_stages()` because you can’t rely on shape consistency.
- No history: it stores only latest state; debugging “why did it fail twice and then succeed?” is painful.
- Concurrency: multiple workers updating different stages of the same item risks write contention and last-write-wins on the JSON blob. `_update_stage()` reads JSON → modifies → writes JSON; two workers can race and clobber each other’s updates.

What I’d do:
- If you expect >~10k items/job or real concurrency, normalize:
  - Create `work_item_stages(job_id, item_key, stage, status, started_at, ended_at, elapsed_s, error, attempts, updated_at, UNIQUE(job_id,item_key,stage))`
  - Keep `work_items.status` as the coarse aggregate, but compute it from normalized stage rows (or update it transactionally).
- If you want to keep JSON short-term:
  - Stop encoding timing as ad-hoc keys. Put timing under a nested object per stage, e.g. `{ "price": {"status":"done","started":...,"ended":...} }` to reduce key-munging bugs.
  - Ensure selection does not keep picking `retry_later` immediately; add `retry_after` per item/stage.

3) `Pacer` (≈35 LOC): keep the idea, but upgrade semantics

The current `Pacer` is fine as a minimal token-bucket-ish limiter. Reasons to keep:
- Tiny, readable, no dependencies.
- Your requirements are simple: “max_per_hour” spacing, not distributed, not multi-key.

But it’s missing a few things that production systems usually need:
- Jitter/backoff integration (to avoid thundering herd across jobs).
- Burst allowance (token bucket) vs fixed spacing. Right now it enforces near-uniform spacing.
- Concurrency safety if multiple tasks share a `Pacer` (you probably do): it’s not protected by a lock; two coroutines can pass `wait()` simultaneously and violate the rate.

Recommendation:
- Keep it, but add an `asyncio.Lock()` around the critical section, and optionally support burst capacity.
- Only use off-the-shelf if you need distributed rate limiting (Redis) or multi-tenant keys. For local-only, this is acceptable.

4) `executor.py` / `resolve_handler` (≈31 LOC): keep as a seam

This is a good abstraction point:
- It gives a clear error message boundary for configuration errors (bad dotted path, import failure, missing attribute, non-callable).
- It’s also where you can later add:
  - entry point discovery (setuptools entrypoints),
  - handler capability checks (does it expose `discover()` / `stage_*()`),
  - versioning metadata.

I would keep it as-is rather than inline it everywhere. It’s small but valuable as the plugin seam.

5) `job.py` dataclasses (≈8 dataclasses): reasonable structure; a couple of improvements

This is not over-typed; it’s about the right level for YAML → validated config. Positives:
- `StageConfig` and `ResourceConfig` are genuinely useful.
- `Job.stages` property provides backward compatibility for code expecting `list[str]`.
- `Job.update_from()` hints at hot-reload support.

What I’d refine:
- Validation: You’re not validating stage names, uniqueness, legal concurrency values, etc. Consider a `validate()` method on `Job` and/or use Pydantic for runtime validation if configs are user-edited.
- Defaults: `PacingConfig.circuit_breaker` default is 10 in the dataclass but YAML load uses default 3 (`circuit_breaker=pace.get("circuit_breaker", 3)`)—that inconsistency will confuse operators.
- The `_infer_stage_deps()` “depends on previous stage” is fine as a default, but you’ll want to represent fan-in/fan-out later; current structure supports it (`dict[str, list[str]]`), good.

So: keep the dataclasses; add validation and fix default inconsistencies.

6) Version hashing (`stage_version_hash` + `VERSION_DEPS`): clever but fragile in predictable ways

What’s clever:
- Hashing `inspect.getsource()` of `stage_{name}` plus declared deps is a nice “poor man’s build graph”.
- It integrates with `results.handler_version` and `reprocess_stale()` nicely.

What breaks / fragility:
- `inspect.getsource()` is not stable in many deployment modes:
  - zipped apps, compiled bytecode-only, editable installs vs packaged wheels, some serverless deploys.
- It won’t capture non-source runtime changes:
  - environment variables, prompt files loaded from disk, external templates, model versions, network behavior.
- It relies on humans to maintain `VERSION_DEPS` completeness. If a stage uses helper `foo()` but you forgot to list it, the hash won’t change.
- It can be too sensitive:
  - whitespace/comment changes change the hash; you might cause mass “stale” resets on harmless edits.

What I’d do:
- Keep the concept, but shift to explicit semantic versioning:
  - Each handler module exposes `HANDLER_VERSION = "2026-02-26.1"` and optionally per-stage versions.
  - Optionally support an “auto hash” fallback for dev mode only.
- If you keep hashing, make it more robust:
  - Hash normalized AST (strip comments/whitespace) instead of raw source.
  - Include package version pins (e.g., `importlib.metadata.version("yourpkg")`) in the hash.

7) `upsert_items` merge logic (≈100 LOC): justified if provenance matters; otherwise too complex

What it’s doing well:
- `_merge_item_data()` preserves discovery provenance across rediscovery:
  - accumulates `topics/queries/discovery_sources`,
  - preserves initial scalar `topic/query/discovery_source`,
  - doesn’t clobber existing non-empty fields.
- That’s valuable if you use discovery metadata for auditing/traceability.

Concerns:
- This is mixing concerns inside the DB layer:
  - `upsert_items()` is now an opinionated “discovery record merger”, not just persistence.
- JSON in `work_items.data` becomes a semi-schema with ad-hoc rules—hard to evolve.
- It has subtle edge cases:
  - Treating `0` and `False` as “empty” (`if merged.get(k) in (None, "", 0, False)`) can incorrectly overwrite legitimate values.
- Performance: on conflict it does a `SELECT data` then parse JSON then update; for large upserts this will be slower.

What I’d do:
- If provenance is required: move this logic to the discovery layer and persist already-merged “canonical discovery record” (DB layer should stay thin).
- If provenance isn’t critical: drastically simplify:
  - store only first-seen data (immutable) + a separate `work_item_discovery_events` table for additional hits.
  - or store `data` as last-seen and put provenance lists in their own columns.

So: complexity is justified only if you truly need merged provenance for downstream decisions.

8) What’s missing for production

A non-exhaustive list of gaps relative to a production job framework:

Correctness & concurrency
- Atomic claiming: `get_items_needing_stage()` selects candidates, then separate `mark_stage_in_progress()` updates. Two workers can select the same item. You need an atomic “claim” update:
  - `UPDATE ... SET ... WHERE id=? AND stage_status IN ('pending','retry_later')` with rowcount check.
- Worker identity/leases: no `worker_id`, no “lease timeout” to recover stuck `in_progress`.

Retry semantics
- `retry_later` has no `retry_after` at item/stage level; it will be reselected immediately (your query includes retry_later).
- Backoff policy exists in config (`retry_backoff_base`) but no persistence of next retry time.

Observability
- Structured tracing: correlate events, stage updates, and results via `trace_id` / `span_id`.
- Metrics export (Prometheus): queue depth, throughput, error rates, cost, per-stage latency histograms.

Data lifecycle
- Pruning/archival for `results` and `work_items` (only `job_events` has pruning).
- Storage integrity: `result_path` points to filesystem; no checksums, no GC.

Security/robustness
- SQLite WAL is fine, but you’ll hit write contention under concurrency. Need clear concurrency expectations and possibly a path to Postgres.
- Input validation for YAML config and handler resolution.
- Handling schema migration failures: migrations are simple `ALTER TABLE`, but you don’t record migration state/version.

Operational controls
- Per-stage pause/disable at runtime (you have job pause only).
- Priority aging/starvation control.
- Idempotency guarantees per stage.

9) Extracting a generic reusable core: what stays vs becomes plugin

Core (generic):
- Persistence primitives:
  - `open_db/open_raw_db`, schema/migrations (but make schema adaptable to multiple backends).
  - `work_items` lifecycle functions: `upsert_items` (simplified), claim/lease, mark done/failed, retry scheduling.
  - `results` store/fetch with versioning.
  - `job_state` pause/resume + retry scheduling (but separate from any vendor-specific reasons).
  - `job_events` as an append-only log.
- Execution framework seams:
  - `resolve_handler` plugin loading.
  - Stage runner orchestration interfaces (not shown here, but implied).
  - Rate limiting interface (keep `Pacer` or a `RateLimiter` protocol).

Plugin / application-specific:
- Discovery provenance merge rules in `upsert_items` (topics/queries/discovery_sources are app semantics).
- Cost logging semantics (`log_cost`): keep the capability, but the meaning of “cost_usd” and what constitutes cost is app/model-specific.
- Version hashing rules: keep “handler_versioning” as an interface; implementation can vary (explicit version string vs hashing).
- `Job` config fields like `emoji`, `detail_columns`, `pipeline/pipeline_step`, `checks`, `tags`, `url`—these are dashboard/business concerns.

In short: core should define minimal job+item+stage lifecycle; anything about provenance, UI, pipelines, and audit checks is plugin/app-layer.

10) One concrete simplification I’d make today

Stop treating `retry_later` as immediately eligible work.

Specifically:
- Add `next_attempt_at` (TEXT timestamp) to `work_items` (or per-stage if you keep JSON), and change `get_items_needing_stage()` to only return items whose `next_attempt_at IS NULL OR next_attempt_at <= now`.
- Update `mark_stage_retry_later()` to set `next_attempt_at = now + backoff` (and record reason).
- Remove `'retry_later'` from the eligible set in `get_items_needing_stage()` unless `next_attempt_at` is due.

This is small, immediately improves system stability, and reduces pointless churn when an external dependency is down (the motivating example in `RetryLaterError` docstring).


## GROK

**1. Schema**: The 6-table decomposition is solid and query-optimized (e.g., idx_work_items_job_status, idx_results_job_item enable fast dashboard queries). work_items is the central hub; results separates outputs cleanly; job_state/job_runs for orchestration; job_cost_log/job_events for auditing. No major merges needed—job_runs could be dropped if unused (no callers in code), but it's lightweight. Add: a `job_config` table to version job YAML snapshots (avoids drift in long-running jobs). Keep UNIQUE(job_id, item_key) as natural key.

**2. Stages JSON**: Maintenance trap. JSON in work_items.stages (used in _get_stages, get_items_needing_stage, stage_stats) kills query performance—no fast aggregates on "count pending price stage". Normalize to work_item_stages(id, work_item_id, stage, status, started_at, ended_at, elapsed_s, error)—~8 cols, indexed on (work_item_id, stage, status). Fixes get_stage_pipeline_metrics (currently scans all rows), enables "WHERE stage='price' AND status='pending' ORDER BY priority". _sync_item_status derives item status via triggers or view.

**3. Pacer (35 lines)**: Not worth it—reinvented wheel. Replace with aiolimiter.AsyncLimiter(max_per_hour / 3600) in ~3 lines (pip install aiolimiter). Handles bursts/backpressure better; Pacer's monotonic clock + sleep is racy in async multi-worker (no shared state across processes). Call pacer.acquire() before mark_in_progress.

**4. executor.py (31 lines)**: Inline into resolve_handler callers (e.g., job runner init). It's pure importlib with validation—bloats as separate file for no gain. Reduces indirection; error messages can be copy-pasted.

**5. job.py dataclasses**: Over-typed—8 classes (Job + 7 configs) for YAML that's mostly flat dicts. Merge into single Job dataclass with nested dicts for pacing/storage/stages (use @dataclass(field=field(default_factory=dict))). load_jobs parses once; validation via pydantic if needed. Cuts ~200 LOC; stages as list[dict] loses no type-safety (get_stage_config iterates anyway).

**6. Version hashing**: Clever but fragile. stage_version_hash relies on inspect.getsource (fails for lambdas, C extensions, frozen bytecode, __init__.py changes); VERSION_DEPS assumes module-level dict (misses dynamic imports). Breaks: Docker rebuilds (file mtimes), minifiers. Fix: Git hash of handler file(s) + deps globs (stable, SCM-integrated). Or drop—use manual "bump version" toggle.

**7. Upsert_items merge (100 lines)**: Complexity unjustified. _merge_item_data preserves "provenance" (topics/queries/sources) across discoveries, but dashboard likely shows latest. Simplify: always overwrite data JSON on upsert (lose multi-source lists); or store provenance in separate jsonb-like array col. 80% logic is dedup/_as_list—use setdefault + list.extend. Justified only if multi-discovery provenance is UI-critical (not shown).

**8. Missing for production**:
- **Distributed coordination**: SQLite WAL ok for single-node, but multi-process needs advisory locks (PRAGMA locking_mode=EXCLUSIVE) or Redis for job_state.paused/retry_after.
- **Idempotency**: No dedup on stage runs (e.g., ib stage retries clobber results).
- **Metrics/observability**: No Prometheus export (stage_timing_stats could feed); integrate Prometheus client.
- **Secrets**: Handler params/discovery.params hold API keys—externalize to env/vault.
- **Webhooks/Slack**: log_event lacks external emit (e.g., on pause/error).
- **Cleanup**: No auto-prune old jobs/results (prune_events exists, extend).
- **Validation**: GuardsConfig.validate unused—add schema checks on upsert_items.

**9. Generic core vs plugins**:
**Core stays** (~70%): DB schema/functions (open_db, upsert_items, get_pending, mark_*, stage_*—rename to pipeline_*); Job dataclass (minimal); Pacer (or aiolimiter).
**Plugins**:
- Handlers/resolvers (executor.py → per-job).
- Discovery strategies (DiscoveryConfig.strategy → pluggable classes).
- Stages/UI logic (STAGES, VERSION_DEPS, stage_version_hash → handler modules).
- YAML loaders (load_jobs → config plugin).
Extract: jobs_core.py (tracker + basic runner loop); jobs.runners.{backfill,monitor}; jobs.handlers.*.

**10. One concrete simplification**: Normalize stages JSON to work_item_stages table (as in #2). Rewrite _get_stages → get_stages(item_id); get_items_needing_stage uses JOINs (faster); drop _update_stage/_sync_item_status (use triggers: AFTER UPDATE ON work_item_stages → derive work_items.status via CASE on subquery). Saves 200+ LOC, fixes query perf (stage_stats scans 100x less), enables "SELECT AVG(elapsed_s) FROM work_item_stages WHERE stage='ib'".


## OPUS

**The 6 tables are mostly right, but `job_runs` is dead weight and `job_state` is under-pulling.**

`job_runs` tracks start/end/counts, but nothing in the codebase reads it meaningfully — `start_run` and `end_run` exist but there's no dashboard query, no "show me the last 10 runs" function, no alerting off it. It's write-only telemetry. I'd drop it and let `job_events` absorb that signal (you already log events with `log_event`).

`job_state` is doing too many jobs. It's the pause flag, the error store, the retry scheduler, and the auto-unpause counter — all on a single row per job. That's fine *today*, but the `retry_after` / `auto_unpause_count` / `last_auto_unpause_at` / `pause_reason` cluster is really a "pause event" that should be a row in `job_events` with structured `data` JSON. You already have the events table; use it.

What I'd **add**: a `job_locks` or `job_leases` table. There's nothing here preventing two runner processes from processing the same item simultaneously. `mark_in_progress` does a bare UPDATE with no optimistic locking — no `WHERE status = 'pending'` check on the return value. Two workers calling `get_pending` → `mark_in_progress` on the same item will both succeed. This is the most operationally dangerous gap.

**Verdict**: Drop `job_runs`, slim `job_state` to just `(job_id, paused, updated_at)`, add a lease/lock mechanism.

**It's a maintenance trap that's already sprung.**

The `stages` column stores `{"price": "done", "price_started": "2024-...", "price_elapsed_s": 3.2, "price_error": "...", "ir": "pending", ...}` — mixing status, timing, and error data in a flat namespace distinguished only by suffix conventions. Every function that touches it has to do the same suffix filtering:

```python
stage_statuses = {k: v for k, v in stages.items()
                  if not k.endswith(("_started", "_ended", "_elapsed_s", "_error"))}
```

This pattern appears in `_sync_item_status`, `stage_timing_stats`, `retry_failed`, and `reprocess_stale`. It's a parsing convention masquerading as a data model.

The real cost: **every query against stage status requires `json_extract`**, which means SQLite can't use indexes effectively. `get_items_needing_stage` does `json_extract(stages, ?) IN ('pending', 'retry_later')` with additional `json_extract` per dependency — that's a full table scan per stage per query. At 10K items × 4 stages × polling every few seconds, this becomes the bottleneck.

**What I'd do**: Normalize to an `item_stages` table: `(item_id, stage, status, started_at, ended_at, elapsed_s, error)` with a composite index on `(job_id, stage, status)`. The queries become trivial JOINs, the suffix convention disappears, and `get_items_needing_stage` becomes a fast indexed lookup. The `_sync_item_status` derivation becomes a simple aggregate query. Migration is straightforward since you already have the data — it's just unpacking the JSON.

The JSON approach would be defensible if stages were truly dynamic and unpredictable. But they're declared in YAML config and known at job load time. You're paying the cost of schemaless storage for data that has a schema.

**Keep it, but it's wrong as written.**

35 lines is fine. The problem isn't that it exists — it's that it's a single-token pacer, not a token bucket. It enforces minimum inter-request spacing (`interval = 3600 / max_per_hour`), which means if you set `max_per_hour=3600`, you get exactly 1 request/second with no bursting. A real token bucket allows `batch_size` requests immediately then refills. For a system that processes items in batches (`get_pending(..., limit=10)`), you want to fire 10 requests quickly then wait, not space them 1 second apart.

More critically: it's per-Pacer-instance, not per-resource. If two stage workers share a rate-limited API, they each get their own `Pacer` and collectively exceed the limit. The `ResourceConfig` / semaphore system handles concurrency but not rate — those are different constraints.

I wouldn't reach for `aiolimiter` or similar. I'd fix this to be an actual token bucket (add `tokens` and `max_tokens` fields, refill on `wait()`) and make it shared via the same `ResourceRegistry` pattern you use for semaphores. That's maybe 15 more lines.

**Keep it. Rename it to `loader.py` and stop second-guessing it.**

31 lines with clear error messages for the three failure modes of dynamic import (bad path format, missing module, missing attribute). Every time someone typos a handler path in YAML, this gives them an actionable error instead of a raw `AttributeError`. The alternative — inlining `importlib.import_module` at the call site — means reproducing those error messages everywhere or getting worse diagnostics.

The one thing I'd change: cache the resolved callable. Right now every call to `resolve_handler` re-imports. Add an `@lru_cache` or a module-level dict. Handler resolution happens at job load time so it's not hot-path, but caching makes the intent clear: this is resolution, not reloading.

**Right amount of structure, wrong organization.**

8 dataclasses is fine for the config surface area. The issue is that `Job` is a 30-field god object that mixes identity (`id`, `emoji`), behavior (`handler`, `stages`), operational tuning (`pacing`, `guards`), and display (`detail_columns`, `tags`, `url`). The `update_from` method shows the pain — it manually copies 10+ fields with change tracking because there's no separation between "immutable definition" and "tunable parameters."

I'd split into:
- `JobSpec`: identity + handler + stages + discovery (immutable after load)
- `JobTuning`: pacing + guards + limits + enabled (hot-reloadable)
- `Job` composes both

Then `update_from` becomes `self.tuning = other.tuning` with a diff on the two frozen dataclasses.

The `StageConfig.outputs` field (`dict[str, str]` for `field_name → description`) is declared but never read anywhere in this codebase. Either it's for a future schema-validation feature (in which case, document that) or it's dead code.

`_infer_stage_deps` deriving linear dependencies from list order is pragmatic and correct for most pipelines. Good call keeping `stage_deps` overridable in YAML for diamond dependencies.

**Clever, fragile, and it will break in exactly two ways.**

First: `inspect.getsource` returns source as formatted, including comments and whitespace. Add a comment to a handler function? Every item shows as stale. Reformat with black? Mass reprocessing. The hash should be over the AST or bytecode (`handler_func.__code__.co_code`), not source text. Bytecode is stable across formatting changes and comment edits.

Second, and worse: the `VERSION_DEPS` mechanism requires developers to manually declare dependencies. When someone adds a call to a new helper in `stage_extract` but forgets to add it to `VERSION_DEPS`, the version hash doesn't change, stale items don't get flagged, and you silently serve results from the old logic. This is a **correctness** bug that's invisible until someone notices wrong outputs. The manual dependency list is the kind of thing that's correct on day one and wrong by month three.

What I'd do instead: version the *outputs*, not the code. Hash the result JSON stored in `results`. When you want to reprocess, run the handler on a sample item and compare the output hash to what's stored. If they differ, flag the stage as stale. This is slower but actually correct — it catches dependency changes, prompt changes, model version changes, API behavior changes, everything.

If you keep source hashing, at minimum switch to `co_code` and drop `VERSION_DEPS` in favor of module-level hashing (hash the entire handler module file). Coarser but honest about what it can track.

**The complexity is justified. The implementation is not.**

The *problem* is real: items get discovered multiple times from different sources (serper, manual add, different queries), and you need to preserve provenance without clobbering. The `_merge_item_data` function handles scalar-to-list promotion (`topic` → `topics`), deduplication, and fill-not-overwrite semantics. This is genuinely tricky merge logic.

But 100+ lines with three nested helper functions (`_as_list`, `_dedup_preserve_order`, `_merge_item_data`) inside `upsert_items` is doing too much in one place. The merge logic is also tightly coupled to a specific metadata schema (`topics`, `queries`, `discovery_sources` and their scalar counterparts) — that's domain knowledge baked into what should be a generic tracker function.

I'd extract `_merge_item_data` to a standalone tested function (it has no DB dependency — pure dict→dict), and make the scalar↔list field mapping configurable per job rather than hardcoded. The `scalar_to_list` dict (`{"topic": "topics", "query": "queries", "discovery_source": "discovery_sources"}`) should live in job config or handler code, not in the tracker.

The other issue: the `except sqlite3.IntegrityError` pattern means every duplicate does an INSERT (fails) then a SELECT + merge + UPDATE. For bulk rediscovery of 1000 already-known items, that's 3000 queries. An `INSERT OR IGNORE` followed by a batch UPDATE for existing items would be significantly faster.

**Five things, in order of operational pain they'll cause:**

1. **Atomic claim / optimistic locking.** As noted above, `mark_in_progress` doesn't verify the item is still pending. The fix is one line: `UPDATE work_items SET status = 'in_progress' ... WHERE id = ? AND status = 'pending'` and check `cursor.rowcount == 1`. Without this, concurrent workers will double-process items.

2. **Dead item recovery.** If a worker crashes after `mark_in_progress` but before `mark_done`/`mark_failed`, the item is stuck as `in_progress` forever. There's no reaper. You need a timeout — items `in_progress` for longer than N minutes get reset to `pending`. The `last_attempt_at` column is already there; you just need a periodic `UPDATE ... WHERE status = 'in_progress' AND last_attempt_at < datetime('now', '-30 minutes')`.

3. **Backpressure / max-attempts.** `retry_failed` resets *all* failed items with no cap. An item that fails deterministically (bad URL, deleted page) will retry forever. The `attempts` column is tracked but never checked — there's no `WHERE attempts < max_attempts` guard. Add a `max_attempts` to `PacingConfig` (you have `retry_max_attempts` but it's never enforced in the tracker).

4. **Graceful shutdown.** The `Pacer.wait()` does `asyncio.sleep` but there's no cancellation token or shutdown signal. When the process gets SIGTERM, in-flight items will be abandoned as `in_progress` (see point 2). You need a shutdown flag that breaks the processing loop and lets current items finish.

5. **Observability hook.** `log_event` is fire-and-forget to SQLite. For production you want a way to emit metrics (items/sec, queue depth, error rate) to an external system. A simple callback/hook pattern in the runner would suffice — `on_stage_complete(job_id, stage, elapsed_s, success)`.

**The generic reusable core is smaller than you think.**

**Core (framework):**
- `tracker.py`: schema, `work_items` + `results` tables, `upsert_items` (without the merge logic), `mark_*` functions, `get_pending`, stage state machine (`_update_stage`, `_sync_item_status`, `get_items_needing_stage`). This is your job engine.
- `job.py`: `Job`, `StageConfig`, `PacingConfig`, `GuardsConfig`, `load_jobs`. The dataclass skeleton.
- `executor.py`: handler resolution.
- `pacer.py`: rate limiting (fixed to be a real token bucket).

**Plugin / application layer (rivus-specific):**
- `_merge_item_data` and the entire scalar↔list provenance logic → moves to a `MergeStrategy` interface that handlers provide.
- `stage_version_hash` / `VERSION_DEPS` / `reprocess_stale` → moves to a `Versioning` plugin. Most users won't want source-hash-based staleness.
- `job_cost_log` / `log_cost` / `get_daily_cost` → `CostTracker` plugin. Not every job system tracks dollar costs.
- `job_events` / `log_event` / `get_events` → `EventLog` plugin. Core emits hooks; plugin decides storage.
- `DiscoveryConfig.strategy` + `params` → `DiscoveryPlugin` interface. Discovery is inherently domain-specific.
- `detail_columns`, `tags`, `url`, `emoji`, `pipeline`, `pipeline_step`, `checks` → all application-layer metadata. Core `Job` shouldn't know about these.

The litmus test: if you deleted a piece and the stage pipeline still runs items from pending→done, it's core. Everything else is a plugin. By that test, `job_runs`, `job_cost_log`, `job_events`, version hashing, and the merge logic are all plugins.

**Delete `_sync_item_status` and derive item status at read time.**

`_sync_item_status` is called after every stage update to derive the item-level `status` from the stages dict. It's 20 lines of conditional logic (`if statuses <= {"done", "skipped"}` ... `elif "failed" in statuses and "pending" not in ...`), it runs on every write, and it's the source of subtle bugs — the `elif` chain has an implicit priority order that's easy to get wrong when adding new statuses.

Replace it with a computed property or a view:

```sql
CREATE VIEW item_status_v AS
SELECT id, job_id, item_key,
  CASE
    WHEN NOT EXISTS (SELECT 1 FROM item_stages WHERE item_id = w.id AND status NOT IN ('done','skipped'))
      THEN 'done'
    WHEN EXISTS (SELECT 1 FROM item_stages WHERE item_id = w.id AND status = 'in_progress')
      THEN 'in_progress'
    WHEN EXISTS (SELECT 1 FROM item_stages WHERE item_id = w.id AND status = 'failed')
      AND NOT EXISTS (SELECT 1 FROM item_stages WHERE item_id = w.id AND status IN ('pending','in_progress','retry_later'))
      THEN 'failed'
    ELSE 'pending'
  END as status
FROM work_items w;
```

This eliminates the write-time derivation, removes the possibility of item status and stage status getting out of sync (which currently happens if `_sync_item_status` throws or if someone updates stages directly), and makes the status logic declarative and queryable. It pairs naturally with the `item_stages` normalization from point 2 — do both together.

If you're not ready to normalize stages yet, the minimal version: remove `_sync_item_status`, keep the `status` column, but compute it lazily in `get_pending` / `get_job_stats` queries rather than eagerly on every write. The write path gets simpler (just update stages JSON + commit), the read path adds one derivation step, and you eliminate the entire class of "status out of sync with stages" bugs.
