Should Rivus Jobs Use Prefect 3?

Feb 27, 2026 · Research + hands-on spike · Consulted: Codex 3, Gemini 3.1 Pro, Grok Code

TL;DR: Prefect 3 looks great on paper — native async, global concurrency limits, built-in DAG pipelines, free observability UI. But the hands-on spike revealed that it doesn't replace our tracker (we still need SQLite for work item state), concurrency limits need a running server, and we'd trade 6,700 LOC of custom code for ~73MB of dependency + a server process + custom glue code that reimplements many of the same concepts. The strongest alternative is refactoring the custom system with clean module boundaries — same features, zero new deps, 40-50% smaller codebase.

1. The Problem

The rivus jobs system is a custom async Python pipeline engine: ~15 jobs discover items from external sources (YouTube, Finnhub, web scraping), process them through multi-stage pipelines, and store structured results. The core is ~6,700 LOC across 6 files, with ~14,000 LOC total including handlers and UI.

It works well, but the codebase has grown complex:

The question: would adopting a standard framework like Prefect 3 reduce complexity, or just shift it?

2. What Prefect 3 Actually Is

Prefect is a Python workflow orchestration framework. You decorate functions with @task and @flow, and Prefect handles scheduling, retries, state tracking, and observability.

Architecture

Your Python Process Prefect Server (optional) ┌──────────────────────┐ ┌──────────────────────┐ │ @flow decorated func │───API───▶│ Uvicorn + FastAPI │ │ @task decorated func │ │ SQLite or Postgres │ │ Retries, concurrency │ │ React dashboard UI │ │ Error hooks │ │ Scheduler service │ └──────────────────────┘ └──────────────────────┘ Port 4200 (default) ~512MB-1GB RAM

Key Facts (verified Feb 2026)

AspectDetail
Latest version3.6.20 (Feb 27, 2026)
Python support3.10 – 3.14
LicenseApache 2.0
Default backendSQLite at ~/.prefect/prefect.db
Install size~73MB, 55 direct dependencies, 100+ transitive
Server RAM512MB–1GB baseline, 2GB+ recommended for prod
Async supportFirst-class async def for tasks and flows
No-server modeYes — flows run in-process with ephemeral state (loses UI, scheduling, global concurrency)
TelemetryPhones home by default. Disable: PREFECT_SERVER_ANALYTICS_ENABLED=false
Prefect 2 → 3 was a breaking rewrite (mid-2024). Pydantic 2 migration, Agents replaced by Workers, WebSocket event system. The 3.x line is stable as of 3.4+ but the history signals willingness to make breaking changes.

3. How It Maps to Our Jobs System

Our ConceptPrefect EquivalentFit
Job (e.g. dumb_money_live)@flow — a Python function decorated as a flowGood
Stage (fetch, extract, score)@task — individual step with retry/timeoutGood
Multi-stage pipelineData dependencies between tasks (implicit DAG)Good
Shared semaphores (youtube=4)Global Concurrency LimitsServer-side rate limits that coordinate across all workers. Created via CLI or API. Tasks acquire slots via context manager.Good (needs server)
Discovery strategiesScheduled flows or periodic tasksPartial
LLM Doctoron_failure hooks (observational only, can't reschedule)Partial
Work item tracker (pending/done/failed)No equivalent — Prefect tracks task runs, not work itemsGap
VERSION_DEPS / stale detectionNo equivalentGap
YAML hot-reloadNo equivalent — flows are Python codeGap
Custom dashboard (domain columns)Prefect UI shows flow/task runs, not domain-level dataGap
Test mode (coexists with prod)Queue namespace separation (manual setup)Partial
Token bucket rate limitingGlobal Concurrency Limits with slot_decay_per_secondGood

4. How Stages Would Work

Each stage becomes a @task. Data flows implicitly — a task that consumes another task's return value automatically waits for it:

@task(retries=3, retry_delay_seconds=[2, 5, 10], tags=["youtube"])
async def fetch_transcript(video: dict) -> dict:
    async with concurrency("youtube", occupy=1):
        vtt = await download_captions(video["video_id"])
    return {**video, "transcript_path": vtt}

@task(retries=1)
async def extract_metadata(video: dict) -> dict:
    # Runs AFTER fetch_transcript because it consumes its output
    return await parse_vtt(video["transcript_path"])

@flow
async def dumb_money_pipeline(videos: list[dict]):
    for video in videos:
        fetched = await fetch_transcript(video)     # stage 1
        extracted = await extract_metadata(fetched)  # stage 2 (waits for 1)

Fan-out with .map()

Process multiple items in parallel across a stage:

@flow
async def pipeline(videos: list[dict]):
    # Fan out: one task per video, all run concurrently
    fetched_futures = fetch_transcript.map(videos)
    # Chain: each extract gets the corresponding fetch result
    extracted_futures = extract_metadata.map(fetched_futures)
    return [f.result() for f in extracted_futures]

How concurrency limits apply

Prefect Server ┌─────────────────┐ Task A: fetch(vid1) ────────▶│ youtube: 4 slots │◀──── Task D: fetch(vid4) Task B: fetch(vid2) ────────▶│ slot 1: A │◀──── Task E: fetch(vid5) Task C: fetch(vid3) ────────▶│ slot 2: B(E waits — all 4 taken)slot 3: C │ │ slot 4: D │ └─────────────────┘ When A finishes → slot 1 freed → E acquires → runs
Server required. In ephemeral mode (no server), concurrency("youtube") logs a warning and is a no-op. You must run prefect server start for global limits to enforce.

Stage dependencies

Our jobs system supports stage_deps — stage C requires stages A and B to both complete. In Prefect, this is implicit via data flow: if a task's arguments come from two other tasks, it waits for both:

@flow
async def pipeline(item):
    a_result = await stage_a(item)
    b_result = await stage_b(item)       # runs concurrently with A
    c_result = await stage_c(a_result, b_result)  # waits for BOTH

5. How Discovery Would Work

Discovery is about finding new items to process. In our system, each job has a discovery strategy (e.g. youtube_channel, finnhub_calendar) that runs periodically.

Option A: Scheduled discovery flow

@flow
async def discover_dml_videos():
    """Runs every 60 minutes, finds new videos, triggers pipeline."""
    videos = await youtube_channel_strategy("@DumbMoneyLive")
    new_videos = filter_already_processed(videos)  # check tracker DB
    if new_videos:
        await dumb_money_pipeline(new_videos)

# Schedule it
discover_dml_videos.serve(name="dml-discovery", interval=3600)

Option B: Separate discovery + processing

# Discovery writes to tracker DB
@flow(name="discover")
async def discover():
    items = await strategy.discover()
    tracker.upsert_items(items)  # still our SQLite

# Processing reads from tracker DB (separate scheduled flow)
@flow(name="process")
async def process_pending():
    pending = tracker.get_pending("dumb_money_live", limit=10)
    for item in pending:
        tracker.mark_in_progress(item)
        await pipeline(item)
        tracker.mark_done(item)
Key insight from the spike: Prefect does NOT track work items. It tracks task runs (ephemeral executions). "Has video XYZ been processed?" is not a question Prefect answers. We still need our SQLite tracker for item dedup, stage progress, and idempotency. This is the biggest gap.

6. Pushing Jobs & Items In

Three ways to inject work into a Prefect-based system:

A. API trigger (push a flow run)

# From CLI or external script
$ prefect deployment run 'dml-pipeline/production' \
    --param '{"video_id": "abc123", "url": "https://..."}'

# From Python
from prefect.deployments import run_deployment
run_deployment("dml-pipeline/production", parameters={"video_id": "abc123"})

B. Direct function call

# Just call the flow function — works without server
await dumb_money_pipeline([{"video_id": "abc123", ...}])

C. Via our existing jobctl / tracker

# Write to tracker DB, let scheduled flow pick it up
jobctl inject dml --stage fetch --key "abc123" --data '{"url":"..."}'
# The process_pending flow picks it up on next cycle

Option C is what we'd likely use — it preserves our existing jobctl workflow and doesn't require the Prefect server for injection.

7. Multi-Machine Scaling

This was asked specifically: does Prefect scale to multiple machines?

Yes. Prefect's architecture separates the server (state, scheduling, UI) from workers (execution). Workers connect to the server API over HTTP, so they can run on any machine.

How it works

Machine A (server) Machine B (worker) ┌────────────────────┐ ┌────────────────────┐ │ prefect server │◀──HTTP──▶│ prefect worker │ │ - API on :4200 │ │ - polls for runs │ │ - Postgres DB │ │ - executes flows │ │ - Scheduler │ │ - reports state │ │ - React UI │ └────────────────────┘ └────────────────────┘ ▲ Machine C (worker) │ ┌────────────────────┐ └──────────HTTP───────▶│ prefect worker │ │ - same code, diff │ │ machine │ └────────────────────┘

Setup

# Machine A: start server with Postgres (SQLite won't work multi-machine)
export PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://...
prefect server start --host 0.0.0.0

# Machine B: point worker at server
export PREFECT_API_URL=http://machine-a:4200/api
prefect worker start --pool my-pool

# Machine C: same thing
export PREFECT_API_URL=http://machine-a:4200/api
prefect worker start --pool my-pool

What scales and what doesn't

FeatureMulti-machine?Notes
Task executionYesWorkers pick up flow runs from any machine
Global concurrency limitsYesServer-side, all workers coordinate
SchedulingYesServer creates runs, workers execute them
Our SQLite trackerNoSQLite is single-machine. Would need Postgres or Redis.
Local file storageNoHandlers write to local disk. Would need S3/NFS.
Discovery strategiesPartialStrategies that scrape APIs work anywhere. Strategies reading local files don't.

Bottom line: Prefect itself scales to multiple machines easily. But our handlers assume local disk and SQLite, so real multi-machine would require more work than just adding workers.

8. What the Spike Revealed

We built a proof-of-concept migrating the dumb_money_live pipeline to Prefect 3. See jobs/spike_prefect.py for the full code.

What worked well

What didn't work or felt awkward

1. No work item tracker. Prefect tracks flow runs and task runs — ephemeral executions. It does NOT answer "has video abc123 been processed?" or "what stage is item XYZ at?" We still need our SQLite tracker for: This is the single biggest gap. Prefect replaces the runner, not the tracker.
2. Concurrency limits are server-side only. In ephemeral mode (no server), concurrency("youtube", occupy=1) logs a warning and becomes a no-op. Our current system uses in-process asyncio.Semaphore which works without any server. Adopting Prefect means running prefect server start permanently.
3. Heavy startup. An empty flow takes 2.8 seconds to run (ephemeral server spin-up). Import alone is 0.84s. Our runner starts in <0.1s. For the long-running daemon this is one-time, but for inv jobs.test (which we use constantly), the 3s tax per invocation is noticeable.
4. Error hooks can't reschedule. on_failure is observational — it fires after all retries are exhausted, but cannot change the task state. Our doctor's "retry in 1 hour" or "pause the whole job" actions would need workarounds (e.g. suspend_flow_run() inside the task itself, not from a hook).
5. More operational concerns
  • 73MB install, 55 direct deps, 100+ transitive — FastAPI, Uvicorn, SQLAlchemy, Alembic, Pydantic, Docker SDK, OpenTelemetry, Amplitude analytics (!), Prometheus client. Heavy for what amounts to a job runner.
  • Amplitude telemetry is a required dependency — phones home by default. Disable with env var.
  • No VERSION_DEPS equivalent — "this handler's code changed, reprocess old results" is a concept Prefect has no awareness of.
  • No YAML hot-reload — flows are Python code. Our "edit jobs.yaml, runner hot-reloads" has no equivalent.
  • Eventual consistency — task states are batched to the server, so the UI may lag behind actual state by seconds.
  • WebSocket requirement — Prefect 3 uses WebSockets for events. Our Caddy reverse proxy would need explicit WebSocket upgrade config.

9. Side-by-Side Comparison

DimensionCurrent CustomWith Prefect 3
Core LOC ~6,700 (runner + tracker + discovery + job + doctor + pacer) ~400-900 glue + Prefect (73MB) + still need tracker (~1,000 LOC)
Dependencies asyncio, sqlite3, loguru, yaml, click All of the above + prefect (100+ transitive packages)
Processes 1 (runner) 2+ (prefect server + worker process)
RAM overhead ~100MB (runner + SQLite) ~600MB+ (server) + runner
Concurrency limits asyncio.Semaphore (in-process, zero overhead) Server-side Global Concurrency Limits (needs server, HTTP coordination)
Retries Doctor classifies each error, decides action Built-in exponential backoff + on_failure hook (can't reschedule from hook)
Scheduling asyncio.sleep loops + YAML interval Built-in cron/interval (requires server or .serve() process)
UI / Dashboard Custom Gradio with domain-level columns Prefect UI (flow/task runs, no domain columns) + still need custom dashboard
Config jobs.yaml with hot-reload Python decorators (no hot-reload)
Test mode --test coexists with production Queue namespacing (manual setup)
Multi-machine Single machine only Yes — workers on any machine
Startup time <0.1s ~2.8s (ephemeral server spin-up)

10. The Alternatives (With Redis)

We consulted three frontier models (Codex 3, Gemini 3.1 Pro, Grok Code) with Redis on the table. Here's how the options ranked:

FrameworkAsyncSemaphoresPipelinesGlue LOCVerdict
Prefect 3NativeBuilt-inNative DAGs400–900Best on paper
arqNativeCustom RedisManual chain900–1,800Strong
DramatiqNoNative rate limiterSequential700–1,200Good
taskiqNativeCustom RedisManual chain900–1,800Fair
TemporalNativeQueue routingNative1,200–2,500Overkill
CeleryNoCustom RedisCanvas1,000–2,000Poor fit

Why the spike changed the picture

The multi-model consensus said Prefect 3 was the clear winner. But the hands-on spike revealed that the theoretical gains evaporate when you account for the tracker gap:

What Prefect replacesLOC saved
Runner orchestration loop~800
Retry/backoff logic~100
Semaphore management~80
Total saved~980
What we still need (custom)LOC
SQLite tracker (item state, stage progress, dedup)~1,000
Discovery strategies~1,000
Doctor error intelligence~500
Prefect glue (flow defs, hooks, config translation)~500
Custom dashboard (Prefect UI doesn't show domain data)~600
Total custom remains~3,600

Net: we'd save ~980 LOC of runner code and gain 73MB of dependencies + a server process. The tracker, discovery, doctor, and dashboard all stay custom.

The refactor alternative

All three models also recommended a refactor of the custom system (approach B) that achieves similar or better results with zero new dependencies:

Refactor actionLOC impact
Split tracker.py (69 fn → 5 modules)Reorganized, not reduced
_sync_item_status() → SQL view−20 LOC + eliminates race conditions
Merge simple_worker + stage_worker−200 LOC
run_job() 13 params → RunContext dataclass−100 LOC, cleaner test mode
Discovery protocol + shared base1,800 → ~600 LOC
Delete dead code (deprecated hashes, dual cost tables)−200 LOC
Net reduction~1,700 LOC, zero new deps

11. Recommendation

Prefect 3 Conditional

Adopt if multi-machine scaling is a near-term goal or if you want the Prefect UI for ops visibility. Accept that you keep the tracker, doctor, and custom dashboard.

Skip if this remains a single-machine system. The dependency weight and server overhead aren't justified for what amounts to replacing the runner loop.

Refactor Custom Recommended

Better ROI. Split tracker.py, deduplicate discovery.py, merge workers, kill dead code. Gets to ~5,000 LOC total with clean module boundaries. Zero new deps. Keeps every feature including hot-reload and VERSION_DEPS.

Timeline: 4–6 focused sessions. Lower risk than a framework migration.

What would change my recommendation

What wouldn't change it

12. Glossary

Global Concurrency Limits
Prefect's server-side rate limiting. Created via prefect gcl create youtube --limit 4. Tasks acquire slots via async with concurrency("youtube"). Coordinates across all workers connected to the same server.
Flow
A Python function decorated with @flow. The top-level unit of work in Prefect. Contains tasks and orchestration logic. Equivalent to our "job".
Task
A Python function decorated with @task. An individual step within a flow. Has its own retry policy, timeout, and failure hooks. Equivalent to our "stage handler".
WAL (Write-Ahead Logging)
SQLite journaling mode that allows concurrent readers and a single writer. Our tracker uses WAL mode for safe concurrent access from the runner and dashboard.
Ephemeral mode
Running Prefect without a server. Flows execute in-process with temporary state. Loses the UI, scheduling, and global concurrency limits. Good for development, not production.
Worker
A Prefect process that polls the server for scheduled flow runs and executes them. Can run on any machine that can reach the server API.
Deployment
A named, configured instance of a flow. Specifies schedule, parameters, and work pool. Created via prefect.yaml or flow.serve().