Stage-Per-Item

A pipeline framework for processing thousands of entities through multi-stage research workflows — with versioning, staleness detection, and automatic backfill.

Contents
  1. The Problem
  2. Core Idea
  3. Architecture
  4. Motivating Examples
  5. Prior Work Comparison
  6. Measuring Usefulness
  7. Walkthrough
  8. Vision & Roadmap

1. The Problem

You're building a system that monitors 18,000 investment theses scraped from a members-only club. For each thesis, you need to: fetch the HTML, extract structured fields (ticker, thesis type, quality score), run an LLM to validate the extraction, compute returns at 5 horizons, and generate embeddings for ML prediction. That's 6 stages × 18,000 items = 108,000 stage-executions.

Halfway through, you improve your extraction prompt. Now 12,000 items have stale extractions. You don't want to re-fetch the HTML (it hasn't changed and the source is rate-limited). You want to re-run just the extraction and its downstream stages on the cached data.

A week later, you add a 7th stage: embed the thesis text for ML training. You need it to run on all 18,000 items — but only on items that already have a valid extraction. And you want the system to figure that out automatically.

The pain is specific: You have thousands of entities, each flowing through ordered stages, where stages evolve independently, raw data is cached, and you need to re-process surgically — not re-run everything from scratch.

This is not a scheduling problem (Airflow). It's not an asset materialization problem (Dagster). It's not a durable function problem (Temporal). It's a stage-per-item tracking problem, and no mainstream framework provides it natively.

18K+
entities processed
6–8
stages per pipeline
12
active pipelines
$0.001
per error classified

2. Core Idea

The Insight

Make each entity a first-class row that progresses independently through a shared stage pipeline. Track status per entity per stage. Version each stage's code. When code changes, mark affected entity-stage pairs as stale — not the whole pipeline.

Think of it like a factory assembly line, but where each product moves at its own pace, any station can be upgraded independently, and upgrading a station automatically flags products that passed through the old version for re-inspection.

The key properties that distinguish this from existing approaches:

#PropertyWhat it means
1Declarative stage schemaStages defined in YAML, not in code. Adding a stage is a config change that triggers backfill.
2Independent entity progressEntity A can be at stage 5 while entity B is at stage 2. No batch synchronization.
3Per-entity-per-stage stalenessCode change to stage 3 only marks entities that already passed stage 3. Entities still at stage 1 are unaffected.
4Dynamic entity setNew entities appear at any time and start at stage 1. No pre-enumeration required.
5Stage dependencies with cachingRaw data cached on disk. Downstream stages re-run on cached data without re-fetching.

3. Architecture

System Overview

jobs.yaml (declarative config) │ ┌───────────────┼───────────────┐ │ │ │ ┌────▼────┐ ┌─────▼─────┐ ┌─────▼─────┐ │Discovery│ │ Pacing │ │ Guards │ │ plugins │ │ rate-limit│ │ cost/$day │ └────┬────┘ │ backoff │ │ preflight │ │ └─────┬─────┘ └─────┬─────┘ │ │ │ ┌────▼───────────────▼───────────────▼────┐ │ Runner Loop │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │Worker│ │Worker│ │Worker│ ... │ │ │stg 1 │→ │stg 2 │→ │stg 3 │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │ event │ event │ │ └─────┼──────────┼─────────┼───────────────┘ │ │ │ ┌─────▼──────────▼─────────▼───┐ │ SQLite (jobs.db) │ │ work_items │ item_stages │ │ results │ events │ └──────────────────────────────┘

Data Model

Three tables capture all pipeline state:

TableGrainKey columnsPurpose
work_itemsOne row per entityjob_id, item_key, status, data (JSON)Entity registry & queue
item_stagesOne row per entity × stageitem_id, stage, status, elapsed_s, attemptsPer-stage progress tracking
resultsOne row per entity × stageresult (JSON), handler_versionStage outputs + version provenance

Plus an events table for the audit trail (circuit breaker trips, backfill pauses, error classifications).

Handler Pattern

A handler is a Python module exporting one function: process_stage(). It dispatches on stage name via match/case:

async def process_stage(*, item_key, data, job, stage) -> dict | None:
    match stage:
        case "fetch":     return await _fetch(item_key, data, job)
        case "extract":   return _extract(item_key, data, job)
        case "enrich":    return await _enrich(item_key, data, job)

HANDLER_VERSION = {"fetch": "1", "extract": "2026-02-27.1", "enrich": "2"}

VERSION_DEPS = {
    "enrich": [_ENRICH_PROMPT, _ENRICH_SYSTEM, str(ENRICH_LLM)],
}

The runner calls process_stage() once per entity per stage, stores the return dict in the results table with the current handler_version, and advances item_stages.

Version Tracking

Two complementary mechanisms detect when results become stale:

MechanismHow it worksWhen to use
HANDLER_VERSIONExplicit semantic version per stage. Bump manually when logic changes.Most handlers. Simple, explicit.
VERSION_DEPSHash of inspect.getsource(func) + declared dependency strings/functions. Auto-detects changes.LLM stages where prompt strings and model config affect output.

The dashboard compares each result's stored handler_version against the current version. Mismatches surface as a stale count. One click resets stale items to pending for that stage — re-running on cached upstream data.

Stage Execution: Event-Driven, Not Polling

Each stage has a dedicated async worker. When stage N finishes an item, it fires stage N+1's asyncio.Event immediately (<1ms). Workers sleep on the event when idle — zero DB queries, zero CPU. A 30-second safety poll prevents stalls.

This matters for pipelines where the first stage is fast (fetch cached HTML) but the third is slow (LLM call). Items flow through without waiting for the entire batch.

Error Classification

Every exception passes through an LLM classifier (~$0.001 per call) that categorizes it and selects an action:

ClassificationExamplesAuto-action
transientTimeout, 429, SSL resetRetry with backoff
item_specificBad HTML, missing fieldFail item, continue pipeline
temporalMarket closed, outside hoursPause job, retry after time
systemicCookie expired, DNS failurePause on first occurrence
code_bugKeyError, AttributeErrorPause + Pushover alert

The critical insight: an item-specific parse error should never pause the entire pipeline. Only systemic issues (affecting all items) warrant a pause. Traditional circuit breakers can't distinguish these — they just count consecutive failures.

4. Motivating Examples

Example 1: Prompt iteration without re-fetching

Scenario: You've processed 18,000 investment theses through fetch → extract → enrich. Your LLM enrichment prompt isn't catching catalysts well enough.

Before (typical workflow): Re-run the entire pipeline. Re-fetch all 18,000 pages from a rate-limited source. Wait 6 hours. Cost: $50 in API calls + bandwidth.

After (stage-per-item): Edit the prompt string in VERSION_DEPS. The hash changes automatically. Dashboard shows "18,000 stale" for the enrich stage. Click "Reprocess Stale." The enrich stage re-runs on cached extract results. Fetch is untouched. Time: 20 minutes. Cost: $3 in LLM calls.

Example 2: Adding a stage to an existing pipeline

Scenario: You have 18,000 theses with extractions. Now you want to add text embeddings for ML prediction.

Before: Write a one-off script. Track which items you've already embedded. Handle failures. Build your own retry logic. Maintain a separate "embedded_ids" set.

After: Add embed to the stages list in jobs.yaml. The framework auto-inserts pending rows in item_stages for all existing items that have completed the prerequisite stage. Embeddings generate using the existing runner with rate-limiting, retries, and progress tracking.

Example 3: Multi-source entity with smart error handling

Scenario: An earnings backfill pipeline has 6 stages: price → IR lookup → IB tick data → transcript → diarization → chart. The IB API goes down.

Before: Circuit breaker trips after 3 failures. Entire pipeline pauses. 200 items that don't need IB data are stuck.

After: LLM error classifier identifies "IB connection refused" as systemic. IB stage pauses on the first failure (not after 3). Meanwhile, items already past the IB stage continue through transcript → diarize → chart. Items that don't need IB (already have tick data cached) proceed normally. When IB recovers, only the actually-blocked items resume.

Example 4: Cross-pipeline resource coordination

Scenario: Three different YouTube monitoring pipelines all need Groq API for transcription. Groq's rate limit is 20 RPM.

Solution: Declare a shared resource in config:

resources:
  groq_transcribe:
    concurrency: 3    # 3 concurrent across ALL pipelines

Each pipeline's transcript stage acquires a semaphore from the shared pool. Three pipelines, each with 8 concurrent items, but the Groq bottleneck is shared — never more than 3 simultaneous Groq calls system-wide.

Example 5: Cost-controlled autonomous operation

Scenario: A company analysis pipeline uses GPT-4 for bear case + bull case + sentiment analysis. Runs autonomously overnight.

Guard config:

guards:
  daily_cost_limit: 5.00    # Pause if $5 spent today
  max_pending: 200          # Don't queue more than 200

Each stage returns _cost: 0.15 in its result dict. The runner tracks cumulative daily cost. At $5.00, the pipeline pauses with an event log entry. Morning review shows exactly how many items completed and at what cost.

5. Prior Work Comparison

Existing orchestration frameworks solve adjacent problems well. The gap is specific: per-entity-per-stage tracking with versioning and surgical reprocessing.

Capability Airflow Dagster Temporal Prefect Stage-Per-Item
Unit of work Scheduled run Asset materialization Durable function Flow execution Entity × stage
Per-entity tracking Weak (dynamic task mapping) Via partitions Child workflows Via .map() Native — item_stages table
Code versioning DAG version only code_version per asset Manual patching None Per-stage handler_version + source hash
Staleness detection None Per-asset "Unsynced" None None Per-entity-per-stage, auto-detected
Add stage + backfill Manual Backfill across partitions New workflow instances Manual Auto-insert pending rows
Cached reprocessing No — tasks are stateless Partial — asset-level Via activity replay No Stage reads cached upstream files
Error classification Retry count only Retry policies Retry policies Retry policies LLM semantic classification
Infrastructure Scheduler, metadata DB, workers Dagster daemon, DB, webserver Temporal server cluster Prefect server or Cloud Single SQLite file, async Python

Where each framework excels

Airflow is the standard for scheduled ETL — daily batch jobs with complex inter-task dependencies. If your work is "run this DAG every morning at 6am," Airflow is battle-tested.

Dagster comes closest to the versioning story. Its code_version and "Unsynced" labels solve staleness at the asset level. For teams building analytics where the output is a table or model, Dagster's asset graph is excellent. The gap: it models assets, not entities within an asset. Tracking 18,000 items at different stages requires DynamicPartitionsDefinition with custom staleness logic on top.

Temporal is the strongest per-entity model — each entity can be a child workflow with durable state. For long-running processes (order fulfillment, subscription lifecycle), Temporal is unmatched. The tradeoff: you're running a distributed systems platform (Temporal server, persistence layer, worker fleet), and the stage pipeline isn't declarative — it's imperative code.

Prefect has the best developer ergonomics — pure Python, no DAG boilerplate. Its .map() fans out per-item with independent tracking. But there's no versioning, no staleness detection, and no persistent entity registry between runs.

The gap we fill

When to use stage-per-item

Your workload is: thousands of entities, each flowing through ordered stages that evolve independently, where raw data is cached and you need to re-process surgically when code changes. You want this in a single SQLite file without standing up infrastructure.

Typical use cases: research pipelines (scrape → extract → enrich → embed), content processing (fetch → transcribe → diarize → summarize), entity enrichment (discover → profile → score → report).

6. Measuring Usefulness

Direct metrics

MetricValueWhat it measures
Entities processed18,000+ VIC theses, 1,200+ earnings callsScale of the workload
Prompt iteration cost$3 vs $50 (re-run enrich vs re-run all)Savings from cached reprocessing
Time to add a stage~5 min (add to YAML + write handler)Pipeline extensibility
Error resolution timeSeconds (semantic classification)Faster than reading stack traces
Infrastructure cost$0 (SQLite, no server)Operational simplicity

Process metrics

MetricObservation
Stale detection accuracyZero false negatives — every code change surfaces in the dashboard
Error classification accuracy~95% correct category (transient vs systemic vs item-specific)
Backfill completeness100% — auto-inserts cover all existing items when stage added
Recovery from DB lossDisk-based recovery script reconciles results from cached files

Ablation: What happens without it

Before this framework existed, each pipeline was a standalone script with:

Limitations

7. Walkthrough: VIC Thesis Pipeline

The VIC (Value Investors Club) pipeline processes 18,000+ investment theses through 3 stages. Here's the end-to-end flow:

Configuration

vic_ideas:
  handler: vic_ideas
  kind: backfill
  stages:
    - name: fetch
      concurrency: 4
    - name: extract
      concurrency: 8
    - name: check_enrich
      concurrency: 4
      resource: groq_transcribe
  pacing:
    max_per_hour: 200
    retry_max_attempts: 3
  guards:
    daily_cost_limit: 10.00
  storage:
    base_dir: jobs/data/vic_ideas
    pattern: "{idea_id}"

Stage-by-stage flow for a single entity

1
fetch — Download HTML via proxy, compress to .xz, store on disk. Returns: {url, status_code, content_length, _files: {html: "12345/page.html.xz"}}. The raw HTML is the cached artifact that all downstream stages read from.
2
extract — Decompress cached HTML, parse with BeautifulSoup. Extract: ticker symbol, trade direction, thesis type, description text, catalysts, quality score, sector, posted date. No external calls — runs on cached data. Returns structured dict stored in results table.
3
check_enrich — LLM validates extraction accuracy and enriches with inferred fields (contrarian flag, time horizon, catalyst specificity). Reads extract result from DB, sends to LLM with system prompt from VERSION_DEPS. If the LLM finds discrepancies with the parser, flags them for review.

What happens when you change the extraction prompt

1
Edit the prompt string referenced in VERSION_DEPS["check_enrich"].
2
Next runner cycle computes new source hash — it differs from stored handler_version.
3
Dashboard shows: check_enrich: 17,200 stale.
4
Click "Reprocess Stale" — resets those item_stages rows to pending.
5
Runner picks them up. Each reads its cached extract result (no fetch, no re-extract). LLM calls only. Completes in ~30 min at 4 concurrency.

Pipeline status view

Job: vic_ideas (backfill) Items: 18,032 Priority: 5 ───────────────────────────────────────────────────────────── Stage Pending Active Done Failed Stale fetch 12 4 17,980 36 0 extract 0 0 17,980 0 0 check_enrich 200 4 17,700 80 52 ───────────────────────────────────────────────────────────── Cost today: $3.42 / $10.00 limit Pacing: 180/200 per hour Circuit breaker: OK Last discovery: 2m ago

8. Vision & Roadmap

Where it stands

The framework runs 12 pipelines in production, processing ~20,000 entities across investment research, earnings analysis, YouTube content monitoring, and entity intelligence. It's proven the core model — stage-per-item with versioning and cached reprocessing — across diverse workloads.

Independent library roadmap

Phase 1: Extract & Package (3–6 months)

Separate the core engine from rivus-specific handlers. The framework becomes a standalone Python package with a clean public API: define stages in YAML or Python, write handler functions, run the pipeline.

Phase 2: Developer Experience (6–12 months)

Make it easy for others to adopt. A new user should go from pip install to running their first pipeline in under 10 minutes.

Phase 3: Scale & Ecosystem (1–2 years)

Address the single-machine limitation for teams that need distributed execution, while preserving the simplicity of the single-file model for solo developers.

Why it compounds

Each improvement to the framework benefits all pipelines simultaneously. Better error classification → fewer false pauses across 12 pipelines. Better version tracking → zero stale results slip through. Better caching → faster prompt iteration for every LLM stage. This is the compounding property: investing in the framework infrastructure has multiplicative returns as the number of pipelines grows.

North star

A world where building a research pipeline is as simple as writing a function per stage and declaring the pipeline in YAML. The framework handles entity tracking, versioning, staleness, retries, cost control, and observability. You focus on what each stage does — the framework handles everything else.

Built with Python 3.12, SQLite (WAL mode), asyncio. Zero external infrastructure required. Runs on a single laptop.

Last updated: March 2026.