Feb 27, 2026 · Research + hands-on spike · Consulted: Codex 3, Gemini 3.1 Pro, Grok Code
The rivus jobs system is a custom async Python pipeline engine: ~15 jobs discover items from external sources (YouTube, Finnhub, web scraping), process them through multi-stage pipelines, and store structured results. The core is ~6,700 LOC across 6 files, with ~14,000 LOC total including handlers and UI.
It works well, but the codebase has grown complex:
tracker.py — 1,682 LOC, 69 functions (god object)discovery.py — 1,805 LOC, 17 strategies with heavy boilerplaterunner.py — 1,284 LOC, 13-parameter run_job()The question: would adopting a standard framework like Prefect 3 reduce complexity, or just shift it?
Prefect is a Python workflow orchestration framework. You decorate functions with @task and @flow, and Prefect handles scheduling, retries, state tracking, and observability.
| Aspect | Detail |
|---|---|
| Latest version | 3.6.20 (Feb 27, 2026) |
| Python support | 3.10 – 3.14 |
| License | Apache 2.0 |
| Default backend | SQLite at ~/.prefect/prefect.db |
| Install size | ~73MB, 55 direct dependencies, 100+ transitive |
| Server RAM | 512MB–1GB baseline, 2GB+ recommended for prod |
| Async support | First-class async def for tasks and flows |
| No-server mode | Yes — flows run in-process with ephemeral state (loses UI, scheduling, global concurrency) |
| Telemetry | Phones home by default. Disable: PREFECT_SERVER_ANALYTICS_ENABLED=false |
| Our Concept | Prefect Equivalent | Fit |
|---|---|---|
| Job (e.g. dumb_money_live) | @flow — a Python function decorated as a flow | Good |
| Stage (fetch, extract, score) | @task — individual step with retry/timeout | Good |
| Multi-stage pipeline | Data dependencies between tasks (implicit DAG) | Good |
| Shared semaphores (youtube=4) | Global Concurrency LimitsServer-side rate limits that coordinate across all workers. Created via CLI or API. Tasks acquire slots via context manager. | Good (needs server) |
| Discovery strategies | Scheduled flows or periodic tasks | Partial |
| LLM Doctor | on_failure hooks (observational only, can't reschedule) | Partial |
| Work item tracker (pending/done/failed) | No equivalent — Prefect tracks task runs, not work items | Gap |
| VERSION_DEPS / stale detection | No equivalent | Gap |
| YAML hot-reload | No equivalent — flows are Python code | Gap |
| Custom dashboard (domain columns) | Prefect UI shows flow/task runs, not domain-level data | Gap |
| Test mode (coexists with prod) | Queue namespace separation (manual setup) | Partial |
| Token bucket rate limiting | Global Concurrency Limits with slot_decay_per_second | Good |
Each stage becomes a @task. Data flows implicitly — a task that consumes another task's return value automatically waits for it:
@task(retries=3, retry_delay_seconds=[2, 5, 10], tags=["youtube"])
async def fetch_transcript(video: dict) -> dict:
async with concurrency("youtube", occupy=1):
vtt = await download_captions(video["video_id"])
return {**video, "transcript_path": vtt}
@task(retries=1)
async def extract_metadata(video: dict) -> dict:
# Runs AFTER fetch_transcript because it consumes its output
return await parse_vtt(video["transcript_path"])
@flow
async def dumb_money_pipeline(videos: list[dict]):
for video in videos:
fetched = await fetch_transcript(video) # stage 1
extracted = await extract_metadata(fetched) # stage 2 (waits for 1)
.map()Process multiple items in parallel across a stage:
@flow
async def pipeline(videos: list[dict]):
# Fan out: one task per video, all run concurrently
fetched_futures = fetch_transcript.map(videos)
# Chain: each extract gets the corresponding fetch result
extracted_futures = extract_metadata.map(fetched_futures)
return [f.result() for f in extracted_futures]
concurrency("youtube") logs a warning and is a no-op. You must run prefect server start for global limits to enforce.
Our jobs system supports stage_deps — stage C requires stages A and B to both complete. In Prefect, this is implicit via data flow: if a task's arguments come from two other tasks, it waits for both:
@flow
async def pipeline(item):
a_result = await stage_a(item)
b_result = await stage_b(item) # runs concurrently with A
c_result = await stage_c(a_result, b_result) # waits for BOTH
Discovery is about finding new items to process. In our system, each job has a discovery strategy (e.g. youtube_channel, finnhub_calendar) that runs periodically.
@flow
async def discover_dml_videos():
"""Runs every 60 minutes, finds new videos, triggers pipeline."""
videos = await youtube_channel_strategy("@DumbMoneyLive")
new_videos = filter_already_processed(videos) # check tracker DB
if new_videos:
await dumb_money_pipeline(new_videos)
# Schedule it
discover_dml_videos.serve(name="dml-discovery", interval=3600)
# Discovery writes to tracker DB
@flow(name="discover")
async def discover():
items = await strategy.discover()
tracker.upsert_items(items) # still our SQLite
# Processing reads from tracker DB (separate scheduled flow)
@flow(name="process")
async def process_pending():
pending = tracker.get_pending("dumb_money_live", limit=10)
for item in pending:
tracker.mark_in_progress(item)
await pipeline(item)
tracker.mark_done(item)
Three ways to inject work into a Prefect-based system:
# From CLI or external script
$ prefect deployment run 'dml-pipeline/production' \
--param '{"video_id": "abc123", "url": "https://..."}'
# From Python
from prefect.deployments import run_deployment
run_deployment("dml-pipeline/production", parameters={"video_id": "abc123"})
# Just call the flow function — works without server
await dumb_money_pipeline([{"video_id": "abc123", ...}])
# Write to tracker DB, let scheduled flow pick it up
jobctl inject dml --stage fetch --key "abc123" --data '{"url":"..."}'
# The process_pending flow picks it up on next cycle
Option C is what we'd likely use — it preserves our existing jobctl workflow and doesn't require the Prefect server for injection.
This was asked specifically: does Prefect scale to multiple machines?
# Machine A: start server with Postgres (SQLite won't work multi-machine)
export PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://...
prefect server start --host 0.0.0.0
# Machine B: point worker at server
export PREFECT_API_URL=http://machine-a:4200/api
prefect worker start --pool my-pool
# Machine C: same thing
export PREFECT_API_URL=http://machine-a:4200/api
prefect worker start --pool my-pool
| Feature | Multi-machine? | Notes |
|---|---|---|
| Task execution | Yes | Workers pick up flow runs from any machine |
| Global concurrency limits | Yes | Server-side, all workers coordinate |
| Scheduling | Yes | Server creates runs, workers execute them |
| Our SQLite tracker | No | SQLite is single-machine. Would need Postgres or Redis. |
| Local file storage | No | Handlers write to local disk. Would need S3/NFS. |
| Discovery strategies | Partial | Strategies that scrape APIs work anywhere. Strategies reading local files don't. |
Bottom line: Prefect itself scales to multiple machines easily. But our handlers assume local disk and SQLite, so real multi-machine would require more work than just adding workers.
We built a proof-of-concept migrating the dumb_money_live pipeline to Prefect 3. See jobs/spike_prefect.py for the full code.
@task(retries=3, retry_delay_seconds=[2, 5, 10]) replaces our custom retry logic.on_failure integrates with our doctor pattern.async def handlers.retry_delay_seconds=[2, 5, 10] gives progressive backoff without our custom pacer.concurrency("youtube", occupy=1) logs a warning and becomes a no-op. Our current system uses in-process asyncio.Semaphore which works without any server. Adopting Prefect means running prefect server start permanently.
inv jobs.test (which we use constantly), the 3s tax per invocation is noticeable.
on_failure is observational — it fires after all retries are exhausted, but cannot change the task state. Our doctor's "retry in 1 hour" or "pause the whole job" actions would need workarounds (e.g. suspend_flow_run() inside the task itself, not from a hook).
| Dimension | Current Custom | With Prefect 3 |
|---|---|---|
| Core LOC | ~6,700 (runner + tracker + discovery + job + doctor + pacer) | ~400-900 glue + Prefect (73MB) + still need tracker (~1,000 LOC) |
| Dependencies | asyncio, sqlite3, loguru, yaml, click | All of the above + prefect (100+ transitive packages) |
| Processes | 1 (runner) | 2+ (prefect server + worker process) |
| RAM overhead | ~100MB (runner + SQLite) | ~600MB+ (server) + runner |
| Concurrency limits | asyncio.Semaphore (in-process, zero overhead) |
Server-side Global Concurrency Limits (needs server, HTTP coordination) |
| Retries | Doctor classifies each error, decides action | Built-in exponential backoff + on_failure hook (can't reschedule from hook) |
| Scheduling | asyncio.sleep loops + YAML interval |
Built-in cron/interval (requires server or .serve() process) |
| UI / Dashboard | Custom Gradio with domain-level columns | Prefect UI (flow/task runs, no domain columns) + still need custom dashboard |
| Config | jobs.yaml with hot-reload | Python decorators (no hot-reload) |
| Test mode | --test coexists with production |
Queue namespacing (manual setup) |
| Multi-machine | Single machine only | Yes — workers on any machine |
| Startup time | <0.1s | ~2.8s (ephemeral server spin-up) |
We consulted three frontier models (Codex 3, Gemini 3.1 Pro, Grok Code) with Redis on the table. Here's how the options ranked:
| Framework | Async | Semaphores | Pipelines | Glue LOC | Verdict |
|---|---|---|---|---|---|
| Prefect 3 | Native | Built-in | Native DAGs | 400–900 | Best on paper |
| arq | Native | Custom Redis | Manual chain | 900–1,800 | Strong |
| Dramatiq | No | Native rate limiter | Sequential | 700–1,200 | Good |
| taskiq | Native | Custom Redis | Manual chain | 900–1,800 | Fair |
| Temporal | Native | Queue routing | Native | 1,200–2,500 | Overkill |
| Celery | No | Custom Redis | Canvas | 1,000–2,000 | Poor fit |
The multi-model consensus said Prefect 3 was the clear winner. But the hands-on spike revealed that the theoretical gains evaporate when you account for the tracker gap:
| What Prefect replaces | LOC saved |
|---|---|
| Runner orchestration loop | ~800 |
| Retry/backoff logic | ~100 |
| Semaphore management | ~80 |
| Total saved | ~980 |
| What we still need (custom) | LOC |
|---|---|
| SQLite tracker (item state, stage progress, dedup) | ~1,000 |
| Discovery strategies | ~1,000 |
| Doctor error intelligence | ~500 |
| Prefect glue (flow defs, hooks, config translation) | ~500 |
| Custom dashboard (Prefect UI doesn't show domain data) | ~600 |
| Total custom remains | ~3,600 |
Net: we'd save ~980 LOC of runner code and gain 73MB of dependencies + a server process. The tracker, discovery, doctor, and dashboard all stay custom.
All three models also recommended a refactor of the custom system (approach B) that achieves similar or better results with zero new dependencies:
| Refactor action | LOC impact |
|---|---|
| Split tracker.py (69 fn → 5 modules) | Reorganized, not reduced |
_sync_item_status() → SQL view | −20 LOC + eliminates race conditions |
| Merge simple_worker + stage_worker | −200 LOC |
| run_job() 13 params → RunContext dataclass | −100 LOC, cleaner test mode |
| Discovery protocol + shared base | 1,800 → ~600 LOC |
| Delete dead code (deprecated hashes, dual cost tables) | −200 LOC |
| Net reduction | ~1,700 LOC, zero new deps |
Adopt if multi-machine scaling is a near-term goal or if you want the Prefect UI for ops visibility. Accept that you keep the tracker, doctor, and custom dashboard.
Skip if this remains a single-machine system. The dependency weight and server overhead aren't justified for what amounts to replacing the runner loop.
Better ROI. Split tracker.py, deduplicate discovery.py, merge workers, kill dead code. Gets to ~5,000 LOC total with clean module boundaries. Zero new deps. Keeps every feature including hot-reload and VERSION_DEPS.
Timeline: 4–6 focused sessions. Lower risk than a framework migration.
prefect gcl create youtube --limit 4. Tasks acquire slots via async with concurrency("youtube"). Coordinates across all workers connected to the same server.@flow. The top-level unit of work in Prefect. Contains tasks and orchestration logic. Equivalent to our "job".@task. An individual step within a flow. Has its own retry policy, timeout, and failure hooks. Equivalent to our "stage handler".prefect.yaml or flow.serve().