A pipeline framework for processing thousands of entities through multi-stage research workflows — with versioning, staleness detection, and automatic backfill.
You're building a system that monitors 18,000 investment theses scraped from a members-only club. For each thesis, you need to: fetch the HTML, extract structured fields (ticker, thesis type, quality score), run an LLM to validate the extraction, compute returns at 5 horizons, and generate embeddings for ML prediction. That's 6 stages × 18,000 items = 108,000 stage-executions.
Halfway through, you improve your extraction prompt. Now 12,000 items have stale extractions. You don't want to re-fetch the HTML (it hasn't changed and the source is rate-limited). You want to re-run just the extraction and its downstream stages on the cached data.
A week later, you add a 7th stage: embed the thesis text for ML training. You need it to run on all 18,000 items — but only on items that already have a valid extraction. And you want the system to figure that out automatically.
This is not a scheduling problem (Airflow). It's not an asset materialization problem (Dagster). It's not a durable function problem (Temporal). It's a stage-per-item tracking problem, and no mainstream framework provides it natively.
Make each entity a first-class row that progresses independently through a shared stage pipeline. Track status per entity per stage. Version each stage's code. When code changes, mark affected entity-stage pairs as stale — not the whole pipeline.
Think of it like a factory assembly line, but where each product moves at its own pace, any station can be upgraded independently, and upgrading a station automatically flags products that passed through the old version for re-inspection.
The key properties that distinguish this from existing approaches:
| # | Property | What it means |
|---|---|---|
| 1 | Declarative stage schema | Stages defined in YAML, not in code. Adding a stage is a config change that triggers backfill. |
| 2 | Independent entity progress | Entity A can be at stage 5 while entity B is at stage 2. No batch synchronization. |
| 3 | Per-entity-per-stage staleness | Code change to stage 3 only marks entities that already passed stage 3. Entities still at stage 1 are unaffected. |
| 4 | Dynamic entity set | New entities appear at any time and start at stage 1. No pre-enumeration required. |
| 5 | Stage dependencies with caching | Raw data cached on disk. Downstream stages re-run on cached data without re-fetching. |
Three tables capture all pipeline state:
| Table | Grain | Key columns | Purpose |
|---|---|---|---|
work_items | One row per entity | job_id, item_key, status, data (JSON) | Entity registry & queue |
item_stages | One row per entity × stage | item_id, stage, status, elapsed_s, attempts | Per-stage progress tracking |
results | One row per entity × stage | result (JSON), handler_version | Stage outputs + version provenance |
Plus an events table for the audit trail (circuit breaker trips, backfill pauses, error classifications).
A handler is a Python module exporting one function: process_stage(). It dispatches on stage name via match/case:
async def process_stage(*, item_key, data, job, stage) -> dict | None:
match stage:
case "fetch": return await _fetch(item_key, data, job)
case "extract": return _extract(item_key, data, job)
case "enrich": return await _enrich(item_key, data, job)
HANDLER_VERSION = {"fetch": "1", "extract": "2026-02-27.1", "enrich": "2"}
VERSION_DEPS = {
"enrich": [_ENRICH_PROMPT, _ENRICH_SYSTEM, str(ENRICH_LLM)],
}
The runner calls process_stage() once per entity per stage, stores the return dict in the results table with the current handler_version, and advances item_stages.
Two complementary mechanisms detect when results become stale:
| Mechanism | How it works | When to use |
|---|---|---|
| HANDLER_VERSION | Explicit semantic version per stage. Bump manually when logic changes. | Most handlers. Simple, explicit. |
| VERSION_DEPS | Hash of inspect.getsource(func) + declared dependency strings/functions. Auto-detects changes. | LLM stages where prompt strings and model config affect output. |
The dashboard compares each result's stored handler_version against the current version. Mismatches surface as a stale count. One click resets stale items to pending for that stage — re-running on cached upstream data.
Each stage has a dedicated async worker. When stage N finishes an item, it fires stage N+1's asyncio.Event immediately (<1ms). Workers sleep on the event when idle — zero DB queries, zero CPU. A 30-second safety poll prevents stalls.
This matters for pipelines where the first stage is fast (fetch cached HTML) but the third is slow (LLM call). Items flow through without waiting for the entire batch.
Every exception passes through an LLM classifier (~$0.001 per call) that categorizes it and selects an action:
| Classification | Examples | Auto-action |
|---|---|---|
transient | Timeout, 429, SSL reset | Retry with backoff |
item_specific | Bad HTML, missing field | Fail item, continue pipeline |
temporal | Market closed, outside hours | Pause job, retry after time |
systemic | Cookie expired, DNS failure | Pause on first occurrence |
code_bug | KeyError, AttributeError | Pause + Pushover alert |
The critical insight: an item-specific parse error should never pause the entire pipeline. Only systemic issues (affecting all items) warrant a pause. Traditional circuit breakers can't distinguish these — they just count consecutive failures.
Scenario: You've processed 18,000 investment theses through fetch → extract → enrich. Your LLM enrichment prompt isn't catching catalysts well enough.
Before (typical workflow): Re-run the entire pipeline. Re-fetch all 18,000 pages from a rate-limited source. Wait 6 hours. Cost: $50 in API calls + bandwidth.
After (stage-per-item): Edit the prompt string in VERSION_DEPS. The hash changes automatically. Dashboard shows "18,000 stale" for the enrich stage. Click "Reprocess Stale." The enrich stage re-runs on cached extract results. Fetch is untouched. Time: 20 minutes. Cost: $3 in LLM calls.
Scenario: You have 18,000 theses with extractions. Now you want to add text embeddings for ML prediction.
Before: Write a one-off script. Track which items you've already embedded. Handle failures. Build your own retry logic. Maintain a separate "embedded_ids" set.
After: Add embed to the stages list in jobs.yaml. The framework auto-inserts pending rows in item_stages for all existing items that have completed the prerequisite stage. Embeddings generate using the existing runner with rate-limiting, retries, and progress tracking.
Scenario: An earnings backfill pipeline has 6 stages: price → IR lookup → IB tick data → transcript → diarization → chart. The IB API goes down.
Before: Circuit breaker trips after 3 failures. Entire pipeline pauses. 200 items that don't need IB data are stuck.
After: LLM error classifier identifies "IB connection refused" as systemic. IB stage pauses on the first failure (not after 3). Meanwhile, items already past the IB stage continue through transcript → diarize → chart. Items that don't need IB (already have tick data cached) proceed normally. When IB recovers, only the actually-blocked items resume.
Scenario: Three different YouTube monitoring pipelines all need Groq API for transcription. Groq's rate limit is 20 RPM.
Solution: Declare a shared resource in config:
resources:
groq_transcribe:
concurrency: 3 # 3 concurrent across ALL pipelines
Each pipeline's transcript stage acquires a semaphore from the shared pool. Three pipelines, each with 8 concurrent items, but the Groq bottleneck is shared — never more than 3 simultaneous Groq calls system-wide.
Scenario: A company analysis pipeline uses GPT-4 for bear case + bull case + sentiment analysis. Runs autonomously overnight.
Guard config:
guards:
daily_cost_limit: 5.00 # Pause if $5 spent today
max_pending: 200 # Don't queue more than 200
Each stage returns _cost: 0.15 in its result dict. The runner tracks cumulative daily cost. At $5.00, the pipeline pauses with an event log entry. Morning review shows exactly how many items completed and at what cost.
Existing orchestration frameworks solve adjacent problems well. The gap is specific: per-entity-per-stage tracking with versioning and surgical reprocessing.
| Capability | Airflow | Dagster | Temporal | Prefect | Stage-Per-Item |
|---|---|---|---|---|---|
| Unit of work | Scheduled run | Asset materialization | Durable function | Flow execution | Entity × stage |
| Per-entity tracking | Weak (dynamic task mapping) | Via partitions | Child workflows | Via .map() |
Native — item_stages table |
| Code versioning | DAG version only | code_version per asset |
Manual patching | None | Per-stage handler_version + source hash |
| Staleness detection | None | Per-asset "Unsynced" | None | None | Per-entity-per-stage, auto-detected |
| Add stage + backfill | Manual | Backfill across partitions | New workflow instances | Manual | Auto-insert pending rows |
| Cached reprocessing | No — tasks are stateless | Partial — asset-level | Via activity replay | No | Stage reads cached upstream files |
| Error classification | Retry count only | Retry policies | Retry policies | Retry policies | LLM semantic classification |
| Infrastructure | Scheduler, metadata DB, workers | Dagster daemon, DB, webserver | Temporal server cluster | Prefect server or Cloud | Single SQLite file, async Python |
Airflow is the standard for scheduled ETL — daily batch jobs with complex inter-task dependencies. If your work is "run this DAG every morning at 6am," Airflow is battle-tested.
Dagster comes closest to the versioning story. Its code_version and "Unsynced" labels solve staleness at the asset level. For teams building analytics where the output is a table or model, Dagster's asset graph is excellent. The gap: it models assets, not entities within an asset. Tracking 18,000 items at different stages requires DynamicPartitionsDefinition with custom staleness logic on top.
Temporal is the strongest per-entity model — each entity can be a child workflow with durable state. For long-running processes (order fulfillment, subscription lifecycle), Temporal is unmatched. The tradeoff: you're running a distributed systems platform (Temporal server, persistence layer, worker fleet), and the stage pipeline isn't declarative — it's imperative code.
Prefect has the best developer ergonomics — pure Python, no DAG boilerplate. Its .map() fans out per-item with independent tracking. But there's no versioning, no staleness detection, and no persistent entity registry between runs.
Your workload is: thousands of entities, each flowing through ordered stages that evolve independently, where raw data is cached and you need to re-process surgically when code changes. You want this in a single SQLite file without standing up infrastructure.
Typical use cases: research pipelines (scrape → extract → enrich → embed), content processing (fetch → transcribe → diarize → summarize), entity enrichment (discover → profile → score → report).
| Metric | Value | What it measures |
|---|---|---|
| Entities processed | 18,000+ VIC theses, 1,200+ earnings calls | Scale of the workload |
| Prompt iteration cost | $3 vs $50 (re-run enrich vs re-run all) | Savings from cached reprocessing |
| Time to add a stage | ~5 min (add to YAML + write handler) | Pipeline extensibility |
| Error resolution time | Seconds (semantic classification) | Faster than reading stack traces |
| Infrastructure cost | $0 (SQLite, no server) | Operational simplicity |
| Metric | Observation |
|---|---|
| Stale detection accuracy | Zero false negatives — every code change surfaces in the dashboard |
| Error classification accuracy | ~95% correct category (transient vs systemic vs item-specific) |
| Backfill completeness | 100% — auto-inserts cover all existing items when stage added |
| Recovery from DB loss | Disk-based recovery script reconciles results from cached files |
Before this framework existed, each pipeline was a standalone script with:
The VIC (Value Investors Club) pipeline processes 18,000+ investment theses through 3 stages. Here's the end-to-end flow:
vic_ideas:
handler: vic_ideas
kind: backfill
stages:
- name: fetch
concurrency: 4
- name: extract
concurrency: 8
- name: check_enrich
concurrency: 4
resource: groq_transcribe
pacing:
max_per_hour: 200
retry_max_attempts: 3
guards:
daily_cost_limit: 10.00
storage:
base_dir: jobs/data/vic_ideas
pattern: "{idea_id}"
.xz, store on disk. Returns: {url, status_code, content_length, _files: {html: "12345/page.html.xz"}}. The raw HTML is the cached artifact that all downstream stages read from.
results table.
VERSION_DEPS. If the LLM finds discrepancies with the parser, flags them for review.
VERSION_DEPS["check_enrich"].handler_version.check_enrich: 17,200 stale.item_stages rows to pending.The framework runs 12 pipelines in production, processing ~20,000 entities across investment research, earnings analysis, YouTube content monitoring, and entity intelligence. It's proven the core model — stage-per-item with versioning and cached reprocessing — across diverse workloads.
Separate the core engine from rivus-specific handlers. The framework becomes a standalone Python package with a clean public API: define stages in YAML or Python, write handler functions, run the pipeline.
Pipeline, Stage, Handler, Resultpipeline run, pipeline status, pipeline reprocess --stage XMake it easy for others to adopt. A new user should go from pip install to running their first pipeline in under 10 minutes.
jobs.db across versionsAddress the single-machine limitation for teams that need distributed execution, while preserving the simplicity of the single-file model for solo developers.
A world where building a research pipeline is as simple as writing a function per stage and declaring the pipeline in YAML. The framework handles entity tracking, versioning, staleness, retries, cost control, and observability. You focus on what each stage does — the framework handles everything else.
Built with Python 3.12, SQLite (WAL mode), asyncio. Zero external infrastructure required. Runs on a single laptop.
Last updated: March 2026.