> **Note (2026-03-20)**: API renamed — `run_pipeline()`→`run()`, `run_flow()`→`run_batch()`, `ExecuteResult`→`RunResult`, `FlowResult`→`BatchResult`, `DataflowConfig`→`BatchConfig`, `variong`→`vario`.

# Live Run Graph — Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** ngstudio shows the recipe graph (D2 diagram) with live state annotations as the CLI executor runs — stages light up as they complete, showing scores, cost, and Thing counts.

**Architecture:** The executor writes traces to `ng_runs.db` incrementally (per-stage, not batch-at-end). ngstudio polls the active run's traces and annotates the D2 diagram nodes with execution state. The viz layer gains a new `recipe_to_d2_annotated()` that colors/annotates nodes based on completed traces.

**Tech Stack:** SQLite (WAL mode), NiceGUI (polling via `ui.timer`), D2/Kroki (diagram rendering), existing `viz.py` + `ui_viewers.py`

---

## Task 1: Add `status` column and incremental DB writes

Split `save_run()` into three functions: `create_run()` (at start), `save_trace()` (per stage), `finish_run()` (at end). Add `status` column to `runs` table.

**Files:**
- Modify: `vario/db.py`
- Test: `vario/tests/test_db.py`

**Step 1: Write the failing tests**

```python
# In test_db.py — add these tests

def test_incremental_run_lifecycle(tmp_path):
    """create_run → save_trace → save_trace → finish_run."""
    db_path = tmp_path / "test.db"
    from vario.db import create_run, save_trace, finish_run, get_run
    from vario.context import Trace

    run_id = create_run("best_of_n", "test problem", budget_max_usd=0.10, db_path=db_path)
    assert run_id

    # Run should exist with status=running
    run = get_run(run_id, db_path=db_path)
    assert run["status"] == "running"
    assert run["recipe"] == "best_of_n"
    assert len(run["traces"]) == 0

    # Add trace
    trace = Trace(
        stage_id="stage_0.produce",
        block_type="produce",
        input_count=0,
        output_count=5,
        tokens=1000,
        cost_usd=0.01,
        latency_ms=500,
    )
    save_trace(run_id, trace, db_path=db_path)

    run = get_run(run_id, db_path=db_path)
    assert len(run["traces"]) == 1
    assert run["status"] == "running"

    # Finish
    from vario.thing import Thing
    things = [Thing(content="answer", props={"score": 85, "model": "sonnet"})]
    finish_run(run_id, things=things, total_cost_usd=0.01, outcome="1 thing, best score 85", db_path=db_path)

    run = get_run(run_id, db_path=db_path)
    assert run["status"] == "done"
    assert run["best_score"] == 85
    assert run["thing_count"] == 1


def test_get_active_run(tmp_path):
    """get_active_run returns the most recent running run."""
    db_path = tmp_path / "test.db"
    from vario.db import create_run, finish_run, get_active_run
    from vario.thing import Thing

    # No active runs
    assert get_active_run(db_path=db_path) is None

    # Create one
    run_id = create_run("best_of_n", "test", budget_max_usd=0.10, db_path=db_path)
    active = get_active_run(db_path=db_path)
    assert active["run_id"] == run_id

    # Finish it — no more active
    finish_run(run_id, things=[], total_cost_usd=0, outcome="done", db_path=db_path)
    assert get_active_run(db_path=db_path) is None
```

**Step 2: Run tests to verify they fail**

Run: `pytest vario/tests/test_db.py::test_incremental_run_lifecycle -v`
Expected: FAIL — `create_run`, `save_trace`, `finish_run`, `get_active_run` don't exist

**Step 3: Implement incremental DB functions**

In `vario/db.py`:

```python
# Add status column to schema (after existing columns in runs table):
#   status TEXT DEFAULT 'running'
# Add to CREATE TABLE runs: status TEXT DEFAULT 'running',

# New functions:

def create_run(
    recipe: str,
    problem: str,
    *,
    budget_max_usd: float = 0,
    db_path: Path | None = None,
) -> str:
    """Create a run record with status=running. Returns run_id."""
    run_id = uuid.uuid4().hex[:12]
    started_at = datetime.now(UTC).isoformat()
    conn = _get_db(db_path)
    try:
        conn.execute(
            """INSERT INTO runs
               (run_id, recipe, problem, problem_hash, started_at,
                status, budget_max_usd)
               VALUES (?, ?, ?, ?, ?, 'running', ?)""",
            (run_id, recipe, problem, _problem_hash(problem),
             started_at, budget_max_usd),
        )
        conn.commit()
    finally:
        conn.close()
    return run_id


def save_trace(
    run_id: str,
    trace: "Trace",
    *,
    db_path: Path | None = None,
) -> None:
    """Append a single trace to an existing run."""
    conn = _get_db(db_path)
    try:
        conn.execute(
            """INSERT INTO traces
               (run_id, stage_id, block_type, input_count, output_count,
                tokens, cost_usd, latency_ms, metadata_json)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (run_id, trace.stage_id, trace.block_type,
             trace.input_count, trace.output_count,
             trace.tokens, trace.cost_usd, trace.latency_ms,
             json.dumps(trace.metadata)),
        )
        conn.commit()
    finally:
        conn.close()


def finish_run(
    run_id: str,
    *,
    things: list["Thing"],
    total_cost_usd: float,
    outcome: str,
    run_log_json: str | None = None,
    db_path: Path | None = None,
) -> None:
    """Mark a run as done, save things and final stats."""
    scores = [t.score for t in things if t.score is not None]
    best_score = max(scores) if scores else None
    finished_at = datetime.now(UTC).isoformat()

    conn = _get_db(db_path)
    try:
        conn.execute(
            """UPDATE runs SET
                 status = 'done', finished_at = ?, total_cost_usd = ?,
                 outcome = ?, run_log_json = ?, thing_count = ?, best_score = ?
               WHERE run_id = ?""",
            (finished_at, total_cost_usd, outcome, run_log_json,
             len(things), best_score, run_id),
        )
        sorted_things = sorted(things, key=lambda t: t.score if t.score is not None else -1, reverse=True)
        for rank, thing in enumerate(sorted_things):
            conn.execute(
                """INSERT INTO things (run_id, content, score, model, props_json, rank)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (run_id, thing.content, thing.score, thing.model,
                 json.dumps(thing.props), rank),
            )
        conn.commit()
    finally:
        conn.close()


def get_active_run(*, db_path: Path | None = None) -> dict | None:
    """Get the most recent run with status=running."""
    conn = _get_db(db_path)
    try:
        row = conn.execute(
            "SELECT * FROM runs WHERE status = 'running' ORDER BY started_at DESC LIMIT 1"
        ).fetchone()
        if not row:
            return None
        result = dict(row)
        result["traces"] = [
            dict(r) for r in conn.execute(
                "SELECT * FROM traces WHERE run_id = ? ORDER BY id", (result["run_id"],)
            ).fetchall()
        ]
        return result
    finally:
        conn.close()
```

Also add WAL mode to `_get_db()`:
```python
conn.execute("PRAGMA journal_mode=WAL")
```

And add migration for existing DBs without `status` column:
```python
# In _get_db, after executescript(_SCHEMA):
try:
    conn.execute("SELECT status FROM runs LIMIT 0")
except sqlite3.OperationalError:
    conn.execute("ALTER TABLE runs ADD COLUMN status TEXT DEFAULT 'done'")
    conn.commit()
```

**Step 4: Run tests to verify they pass**

Run: `pytest vario/tests/test_db.py -v`
Expected: PASS

**Step 5: Commit**

```bash
git add vario/db.py vario/tests/test_db.py
git commit -m "feat(ng): incremental DB writes — create_run, save_trace, finish_run"
```

---

## Task 2: Wire executor to use incremental DB writes

Change `execute()` to call `create_run()` at start and `finish_run()` at end. Add a trace callback so blocks can write traces as they complete.

**Files:**
- Modify: `vario/executor.py`
- Modify: `vario/context.py` (add trace callback)
- Test: `vario/tests/test_executor.py` (existing tests should still pass)

**Step 1: Add trace callback to Context**

In `vario/context.py`, add an optional callback:

```python
@dataclass
class Context:
    problem: str
    budget: Budget
    traces: list[Trace] = field(default_factory=list)
    stage_path: list[str] = field(default_factory=list)
    cached_content: str | None = None
    on_trace: Callable[[Trace], None] | None = None  # called when a trace is appended

    def add_trace(self, trace: Trace) -> None:
        """Append trace and notify callback if set."""
        self.traces.append(trace)
        if self.on_trace:
            try:
                self.on_trace(trace)
            except Exception:
                pass  # don't let callback failure break execution
```

Add import: `from collections.abc import Callable`

**Step 2: Update blocks and repeat to use `ctx.add_trace()` instead of `ctx.traces.append()`**

Search all blocks for `ctx.traces.append(` — each should become `ctx.add_trace(`.

Files to check:
- `vario/blocks/produce.py`
- `vario/blocks/score.py`
- `vario/blocks/revise.py`
- `vario/blocks/reduce.py`
- `vario/executor.py` (the repeat stage trace append at line ~295)

Use grep to find all occurrences, replace each `ctx.traces.append(trace)` with `ctx.add_trace(trace)`.

**Step 3: Update `execute()` to use incremental DB**

In `vario/executor.py`, replace the batch `save_run` call (lines 445-451) with:

```python
async def execute(recipe, problem, budget, *, cached_content=None, image_urls=None):
    # At the top, create run in DB:
    run_id = None
    try:
        from vario.db import create_run, save_trace, finish_run
        run_id = create_run(recipe.name, problem, budget_max_usd=budget.max_usd)
        # Set up trace callback for incremental writes
        def _on_trace(trace: Trace):
            save_trace(run_id, trace)
    except Exception:
        logger.opt(exception=True).warning("Failed to create ng run in DB")
        _on_trace = None

    ctx = Context(
        problem=problem, budget=budget,
        cached_content=cached_content,
        on_trace=_on_trace,
    )

    # ... existing stage wiring ...

    # At the end, replace the old save_run block with:
    if run_id:
        try:
            from vario.db import finish_run
            run_log_json = json.dumps(asdict(run_log)) if run_log else None
            finish_run(
                run_id,
                things=results,
                total_cost_usd=budget.spent_usd,
                outcome=run_log.outcome if run_log else "",
                run_log_json=run_log_json,
            )
            logger.info(f"execute({recipe.name}): saved as run {run_id}")
        except Exception:
            logger.opt(exception=True).warning("Failed to finish ng run in DB")
```

Add needed imports at the top of executor.py:
```python
import json
from dataclasses import asdict
```

**Step 4: Run all existing tests**

Run: `pytest vario/tests/ -v`
Expected: All existing tests PASS (behavior unchanged, just writing traces incrementally)

**Step 5: Commit**

```bash
git add vario/executor.py vario/context.py vario/blocks/produce.py vario/blocks/score.py vario/blocks/revise.py vario/blocks/reduce.py
git commit -m "feat(ng): executor writes traces incrementally via on_trace callback"
```

---

## Task 3: State-annotated D2 diagram

Add `recipe_to_d2_annotated()` to `viz.py` — takes stages + list of completed traces, colors nodes by state (pending/done).

**Files:**
- Modify: `vario/viz.py`
- Test: `vario/tests/test_viz.py`

**Step 1: Write the failing test**

```python
# In test_viz.py

def test_annotated_d2_pending():
    """All stages pending when no traces."""
    from vario.executor import load_recipe
    from vario.viz import recipe_to_d2_annotated
    recipe = load_recipe("best_of_n")
    d2 = recipe_to_d2_annotated(recipe.stages, traces=[])
    # All nodes should have pending style (gray fill)
    assert "fill: \"#e5e7eb\"" in d2 or "pending" in d2.lower()


def test_annotated_d2_partial():
    """First stage done, rest pending."""
    from vario.executor import load_recipe
    from vario.context import Trace
    from vario.viz import recipe_to_d2_annotated
    recipe = load_recipe("best_of_n")
    traces = [
        Trace(stage_id="stage_0.produce", block_type="produce",
              input_count=0, output_count=5, tokens=500, cost_usd=0.01,
              latency_ms=1200, metadata={"best_score": None}),
    ]
    d2 = recipe_to_d2_annotated(recipe.stages, traces=traces)
    # First node should have done style (green fill)
    assert "fill: \"#dcfce7\"" in d2 or "done" in d2.lower()


def test_annotated_d2_with_score():
    """Score shows in node label when available."""
    from vario.executor import load_recipe
    from vario.context import Trace
    from vario.viz import recipe_to_d2_annotated
    recipe = load_recipe("best_of_n")
    traces = [
        Trace(stage_id="stage_0.produce", block_type="produce",
              input_count=0, output_count=5),
        Trace(stage_id="stage_1.score", block_type="score",
              input_count=5, output_count=5,
              metadata={"best_score": 87}),
    ]
    d2 = recipe_to_d2_annotated(recipe.stages, traces=traces)
    assert "87" in d2
```

**Step 2: Run tests to verify they fail**

Run: `pytest vario/tests/test_viz.py::test_annotated_d2_pending -v`
Expected: FAIL — `recipe_to_d2_annotated` doesn't exist

**Step 3: Implement annotated D2**

In `vario/viz.py`, add:

```python
def recipe_to_d2_annotated(
    stages: list[Stage],
    traces: list[Trace],
) -> str:
    """Like recipe_to_d2 but with execution state annotations.

    Nodes are colored by state:
    - pending (no trace): gray fill (#e5e7eb)
    - done (trace exists): green fill (#dcfce7), shows score/cost if available
    """
    labeled = label_stages(stages)
    lines: list[str] = []

    # Build trace lookup by stage index
    trace_by_idx: dict[int, Trace] = {}
    for i, trace in enumerate(traces):
        trace_by_idx[i] = trace

    for stage_idx, ls in enumerate(labeled):
        node_id = ls.label
        box_label = _stage_box_label(ls)
        trace = trace_by_idx.get(stage_idx)

        # State annotation
        if trace:
            # Done — annotate with results
            score = trace.metadata.get("best_score")
            cost = trace.cost_usd
            annotation_parts = []
            if score is not None:
                annotation_parts.append(f"score: {score:.0f}")
            if cost:
                annotation_parts.append(f"${cost:.4f}")
            if trace.output_count:
                annotation_parts.append(f"{trace.output_count} out")
            annotation = " | ".join(annotation_parts)
            if annotation:
                box_label += f"\\n{annotation}"
            fill = '"#dcfce7"'  # light green
            font_color = '"#166534"'
        else:
            fill = '"#e5e7eb"'  # gray
            font_color = '"#6b7280"'

        if ls.stage.type == "repeat" and ls.sub_stages:
            lines.append(f"{node_id}: {box_label} {{")
            lines.append(f"  style.border-radius: 8")
            lines.append(f"  style.fill: {fill}")
            for sub in ls.sub_stages:
                sub_label = _stage_box_label(sub)
                lines.append(f"  {sub.label}: {sub_label}")
            for k in range(len(ls.sub_stages) - 1):
                a, b = ls.sub_stages[k], ls.sub_stages[k + 1]
                ann = _edge_annotation(a, b)
                edge = f"  {a.label} -> {b.label}"
                if ann:
                    edge += f": {ann}"
                lines.append(edge)
            if len(ls.sub_stages) >= 2:
                last = ls.sub_stages[-1].label
                first = ls.sub_stages[0].label
                lines.append(f"  {last} -> {first}: next round {{")
                lines.append(f"    style.stroke-dash: 5")
                lines.append(f"  }}")
            lines.append("}")
        else:
            lines.append(f"{node_id}: {box_label} {{")
            lines.append(f"  style.fill: {fill}")
            lines.append(f"  style.font-color: {font_color}")
            lines.append(f"}}")

        # Top-level edges
    for i in range(len(labeled) - 1):
        a, b = labeled[i], labeled[i + 1]
        ann = _edge_annotation(a, b)
        edge = f"{a.label} -> {b.label}"
        if ann:
            edge += f": {ann}"
        lines.append(edge)

    return "\n".join(lines)
```

**Step 4: Run tests to verify they pass**

Run: `pytest vario/tests/test_viz.py -v`
Expected: PASS

**Step 5: Commit**

```bash
git add vario/viz.py vario/tests/test_viz.py
git commit -m "feat(ng): state-annotated D2 diagrams for live run visualization"
```

---

## Task 4: ngstudio Run Observer page

Add a "Run" page to ngstudio that watches the active run (or a selected past run) and shows the annotated D2 graph + stage detail viewers.

**Files:**
- Create: `vario/ui_run.py`
- Modify: `vario/app.py` (register the page)

**Step 1: Create the Run Observer page**

```python
# vario/ui_run.py
"""Run observer page — live D2 graph with execution state.

Polls ng_runs.db for the active run, annotates D2 diagram nodes
with stage completion state, scores, and costs.

Click completed stages to see detail viewers (from ui_viewers.py).
"""

from __future__ import annotations

from loguru import logger
from nicegui import ui

from vario.context import Trace
from vario.db import get_active_run, get_run, list_runs
from vario.executor import load_recipe
from vario.render_d2 import render_d2_url
from vario.ui_viewers import render_stage_detail
from vario.thing import Thing
from vario.viz import recipe_to_d2_annotated


def create_run_page():
    """Create the Run Observer page UI components."""

    state: dict = {
        "run_id": None,
        "recipe_name": None,
        "last_trace_count": 0,
    }

    # --- Header ---
    with ui.row().classes("w-full items-center gap-4"):
        ui.label("Run Observer").classes("text-h6")
        status_label = ui.label("No active run").classes("text-caption text-grey")
        refresh_btn = ui.button(icon="refresh", on_click=lambda: poll()).props("flat dense")

    # --- D2 Diagram ---
    diagram_image = ui.image("").classes("w-full").style(
        "max-width: 700px; background: white; padding: 12px; "
        "border-radius: 8px; border: 1px solid #444"
    )
    diagram_image.set_visibility(False)

    # --- Stage detail panel ---
    detail_container = ui.column().classes("w-full")

    # --- Run history selector ---
    with ui.expansion("Past runs", icon="history").classes("w-full"):
        history_container = ui.column().classes("w-full gap-1")

    def _load_history():
        """Populate past runs list."""
        history_container.clear()
        runs = list_runs(limit=10)
        with history_container:
            for run in runs:
                score_text = f" (score: {run['best_score']:.0f})" if run.get("best_score") else ""
                cost_text = f"${run.get('total_cost_usd', 0):.4f}"
                label = f"{run['recipe']} — {cost_text}{score_text}"
                ui.button(
                    label,
                    on_click=lambda rid=run["run_id"]: _select_run(rid),
                ).props("flat dense no-caps align=left").classes("w-full")

    def _select_run(run_id: str):
        """Load a specific run by ID."""
        run = get_run(run_id)
        if run:
            state["run_id"] = run_id
            state["recipe_name"] = run["recipe"]
            state["last_trace_count"] = len(run.get("traces", []))
            _render_run(run)

    def _render_run(run: dict):
        """Render the annotated D2 diagram for a run."""
        recipe_name = run["recipe"]
        traces_data = run.get("traces", [])
        run_status = run.get("status", "done")

        # Reconstruct Trace objects from DB dicts
        import json
        traces = [
            Trace(
                stage_id=t.get("stage_id", ""),
                block_type=t.get("block_type", ""),
                input_count=t.get("input_count", 0),
                output_count=t.get("output_count", 0),
                tokens=t.get("tokens", 0),
                cost_usd=t.get("cost_usd", 0),
                latency_ms=t.get("latency_ms", 0),
                metadata=json.loads(t["metadata_json"]) if t.get("metadata_json") else {},
            )
            for t in traces_data
        ]

        # Load recipe for structure
        try:
            recipe = load_recipe(recipe_name)
        except FileNotFoundError:
            status_label.text = f"Recipe not found: {recipe_name}"
            return

        # Generate annotated D2
        d2_source = recipe_to_d2_annotated(recipe.stages, traces)
        url = render_d2_url(d2_source)
        diagram_image.set_source(url)
        diagram_image.set_visibility(True)

        # Status
        cost = run.get("total_cost_usd", 0) or 0
        outcome = run.get("outcome", "")
        icon = "⏳" if run_status == "running" else "✅"
        status_label.text = (
            f"{icon} {recipe_name} — {len(traces)} stages — "
            f"${cost:.4f} — {outcome}"
        )

    async def poll():
        """Check for active run or updates to current run."""
        # Try active run first
        active = get_active_run()
        if active:
            new_count = len(active.get("traces", []))
            if (active["run_id"] != state.get("run_id")
                    or new_count != state.get("last_trace_count")):
                state["run_id"] = active["run_id"]
                state["recipe_name"] = active["recipe"]
                state["last_trace_count"] = new_count
                _render_run(active)
            return

        # If we were watching a running run that just finished, reload it
        if state.get("run_id"):
            run = get_run(state["run_id"])
            if run and run.get("status") == "done":
                if len(run.get("traces", [])) != state.get("last_trace_count"):
                    state["last_trace_count"] = len(run.get("traces", []))
                    _render_run(run)

    # Poll every 500ms for active run updates
    ui.timer(0.5, poll)

    # Load history on page open
    _load_history()
```

**Step 2: Register the page in app.py**

In `vario/app.py`, add:

```python
from vario.ui_run import create_run_page

PAGES = {
    "run": {"label": "Run", "icon": "play_circle", "render": create_run_page},
    "design": {"label": "Design", "icon": "architecture", "render": create_design_page},
}
```

And change the default page to "run":
```python
def index(page: str = "run"):
    if page not in PAGES:
        page = "run"
```

**Step 3: Fix the render_d2 import path**

In `ui_run.py` the import is `from vario.render_d2 import render_d2_url` but it's actually at `vario/render_d2.py`. Fix to:
```python
from vario.render_d2 import render_d2_url
```

(Same fix needed in `ui_design.py` if not already correct.)

**Step 4: Smoke test**

Start ngstudio, open in browser, verify the Run page loads:
```bash
timeout 10 python vario/app.py -p 7961 2>&1
# Should print "NiceGUI ready to go on http://localhost:7961"
```

Then in another terminal, run a recipe:
```bash
vario run best_of_n "What is 2+2?" -r fast --budget 0.05
```

ngstudio should show the D2 diagram updating as stages complete.

**Step 5: Commit**

```bash
git add vario/ui_run.py vario/app.py
git commit -m "feat(ng): ngstudio Run Observer page — live annotated D2 graph"
```

---

## Task 5: README execution trace section with screenshots

After Tasks 1-4, run a recipe with ngstudio open, screenshot the annotated graph, and add the execution trace section to the README.

**Files:**
- Modify: `vario/README.md`
- Create: `vario/docs/` (screenshots)

**Step 1: Run a recipe and capture screenshots**

```bash
# Start ngstudio
python vario/app.py &

# Run a recipe
vario run best_of_n "What causes tides?" -r fast --budget 0.10

# Screenshot ngstudio
appctl screenshot "Chrome"  # or whatever browser has ngstudio open
```

**Step 2: Add execution trace section to README**

After the "Recipes" section, add:

```markdown
## How a run works

A recipe defines the pipeline. Here's `best_of_n` executing:

![best_of_n execution](docs/best_of_n_run.png)

### Thing lifecycle

A Thing starts with just `content` from `produce`, then accumulates props:

| After stage | props                                              |
|-------------|-----------------------------------------------------|
| produce     | `{model: "haiku", candidate_id: 1}`                |
| score       | `{model: "haiku", candidate_id: 1, score: 84, reason: "..."}`  |
| reduce      | (only top-k Things survive)                         |

### Recipe YAML

```yaml
name: best_of_n
stages:
  - type: produce
    params:
      n: 5
  - type: score
    params:
      feedback: false
      rubric: ["correctness", "reasoning quality", "completeness"]
      model: haiku
  - type: reduce
    params:
      method: top_k
      k: 1
params:
  models: ["sonnet"]
  temperature: 0.7
```

### Streaming behavior

| Block   | Behavior                                              |
|---------|-------------------------------------------------------|
| produce | Emits Things as LLM calls finish (`as_completed`)     |
| score   | Processes each Thing as it arrives (stream-preserving) |
| revise  | Processes each Thing as it arrives (stream-preserving) |
| reduce  | **Barrier** — waits for all upstream Things            |
| repeat  | **Barrier** — materializes stream, runs rounds         |

### RunResult

Every run returns:
```python
RunResult(
    things=[Thing(content="...", props={"score": 84, "model": "haiku"})],
    traces=[Trace(stage_id="stage_0.produce", block_type="produce", ...)],
    budget=Budget(max_usd=0.10, spent_usd=0.0342),
    run_log=RunLog(recipe_name="best_of_n", outcome="1 thing, best score 84"),
)
```
```

**Step 3: Commit**

```bash
git add vario/README.md vario/docs/
git commit -m "docs(ng): add execution trace section with annotated graph screenshots"
```

---

## Summary

| Task | What                                    | Files changed                    |
|------|-----------------------------------------|----------------------------------|
| 1    | Incremental DB writes                   | db.py, test_db.py                |
| 2    | Wire executor to use incremental writes | executor.py, context.py, blocks/ |
| 3    | State-annotated D2 diagrams             | viz.py, test_viz.py              |
| 4    | ngstudio Run Observer page              | ui_run.py, app.py                |
| 5    | README execution trace + screenshots    | README.md, docs/                 |

Tasks 1-3 are independent of each other in testing but must be wired together in Task 2. Task 4 depends on 1+3. Task 5 depends on 4.
