# Handler Development Guide

## Handler Signature

All handlers must use keyword-only args:

```python
async def process_stage(*, item_key: str, data: dict, stage: str, job) -> dict | None:
```

## LLM Calls

- **Avoid `max_tokens`** -- Truncated responses break JSON parsing. Let the model finish naturally. If cost is a concern, use a cheaper model, not a token ceiling.
- **Use `tools=["web_search"]`** for web-grounded calls, not `native_web_search=True`. Tool-based search (Serper) is consistent across providers.
- **Use `temperature=0`** for structured/JSON extraction. No creativity needed for factual extraction.
- **Put documents in the system prompt with XML tags** -- For prompt caching efficiency, place the document in the system prompt as a static prefix: `system="You analyze documents.\n<document>\n{text}\n</document>"`. The user prompt then contains only the extraction instruction (which varies).

## Raw Data First

**Cache raw artifacts so re-extraction is cheap.** The fetch stage should save original content (HTML, JSON, audio) to disk. The extract stage parses from cached files, not live fetches. This enables:

- **Schema evolution**: Add new fields by re-running extract on cached data
- **Bug fixes**: Fix parsing bugs without re-fetching (which may rate-limit or fail)
- **Experimentation**: Try different extraction approaches on the same data

```
fetch stage  -> saves raw HTML/JSON/audio to disk
extract stage -> reads from disk, parses, stores structured data
```

If a field might be useful later, capture it. Storage is cheap; re-fetching months later may be impossible.

## Raw Content Storage for Large Jobs

For jobs with many items (1000+), store raw content as **compressed files on local disk**, gitignored, backed up to external storage. DB holds only structured/extracted data.

**Convention**:
```
jobs/data/{job_name}/html/{id}.html.xz   # compressed raw content, gitignored
jobs/data/{job_name}/{job_name}.db       # structured data only (gitignored)
```

**Why not SQLite-in-LFS**: LFS adds complexity. Raw files compress well individually (`.xz`), are easy to back up (`rsync`), and don't bloat the repo.

**Backup**: `jobs/data/` is gitignored and backed up to external SanDisk 4TB.

## Logging

**Structured logging via `get_handler_logger`**: Every handler uses a bound logger with `handler=name` field. Pass domain data as kwargs.

```python
from jobs.lib.logging import get_handler_logger
log = get_handler_logger("my_handler")

# Good: structured kwargs (queryable in JSONL)
log.info("scored", score=42, source="youtube", title=title)

# Avoid: f-string interpolation (not machine-parseable)
log.info(f"[score] {item_key}: {score} | {source} | {title}")
```

No need to include `item_key` in kwargs -- the runner sets it via `logger.contextualize(item=item_key)`.

**Query structured logs**:
```bash
jq 'select(.extra.handler=="pltr_discovery")' jobs/data/runner.log.jsonl
jq 'select(.extra.score > 30)' jobs/data/runner.log.jsonl
```

**Log levels**:
- **`info`**: Domain-relevant attributes -- title, date, duration, view count
- **`debug`**: Implementation details -- cue counts, byte ranges, VTT timestamps

## Sanity Checking

Every handler should validate ingested data. At 40 items/hour, checks are effectively free.

**What to check per item**:
- **Completeness**: Required fields present and non-empty
- **Plausibility**: Duration >0 and <24h, date not in the future
- **Consistency**: `live_status=was_live` but duration <60s? Suspicious

**How to flag**:
- `logger.warning(...)` -- suspicious but continue
- `logger.error(...)` + skip -- clearly invalid
- Store `quality_flags` list in metadata.json for downstream filtering

## Return Values & Results Table

Handlers return `dict | None`. The runner stores returned dicts in the `results` SQLite table (`UNIQUE(job_id, item_key, stage)`).

**Return dict conventions**:
- Stage-specific structured data (scores, metadata fields, counts)
- `_files`: `{"transcript": "video_id/video_id.en.vtt"}` -- external file references relative to `job.storage.resolved_dir`
- `_priority`: numeric value to update item priority (lower = processed first)
- `_cost`: USD cost of LLM calls for guard checker
- `_version`: schema version for evolving formats

**Reading previous stage results**: Use `get_result(conn, job.id, item_key, "stage_name")` from `jobs.lib.tracker`.

**External files**: Large artifacts (VTT, parquet, audio) stay on disk. Reference them via `_files`. Lightweight data (scores, metadata) goes only in the results table.

## Dashboard Columns

Handlers can define `dashboard_columns()` to customize how their items appear in the detail tables:

```python
def dashboard_columns() -> dict:
    return {
        "detail_fields": [
            {"field": "title", "label": "Title"},
            {"field": "duration", "label": "Dur", "format": "duration"},
            {"field": "metadata_score", "label": "Score"},
        ]
    }
```

Falls back to `detail_columns` from YAML if handler doesn't define this.

### Progressive Detail Display

`detail_columns` can list fields from **both** discovery data and stage results. The dashboard merges result fields into the display data, so as items progress through stages, richer information appears automatically.

**Convention**: List all desired fields in `detail_columns`. Missing fields are skipped gracefully.

```yaml
# Fields from discovery + extraction -- missing fields skipped
detail_columns: [trade_dir, company, yr_mo, symbol, author, winner]
```

**Display rules**:
- `None`, `""`, `False`, `0` -> hidden (field skipped entirely)
- `True` -> shows field name in bold (e.g. `winner` -> **winner**)
- `trade_dir` -> color-coded: LONG / SHORT

## Rerun-Friendly Stage Design

Every stage should be independently rerunnable without re-running earlier stages:

**Checklist for each stage**:
- **Reads from cache, not live**: Does this stage read from DB/disk artifacts cached by earlier stages?
- **Idempotent writes**: Can this stage run twice safely? (UPDATE not INSERT, upsert patterns)
- **No side effects on earlier stages**: Running stage N doesn't corrupt data from stage N-1
- **Test rerun independently**: Verify `--reprocess-stage {name}` works

## Speaker Diarization (`jobs/lib/diarize.py`)

Uses **pyannote.audio** (v3.4) for speaker diarization.

**Setup requirements**:
1. `pip install pyannote.audio` (pulls torch, lightning, etc.)
2. `HF_TOKEN` env var with HuggingFace token
3. Accept gated model licenses (all 4 required):
   - https://huggingface.co/pyannote/speaker-diarization-3.1
   - https://huggingface.co/pyannote/segmentation-3.0
   - https://huggingface.co/pyannote/wespeaker-voxceleb-resnet34-LM
   - https://huggingface.co/pyannote/speaker-diarization-community-1

**Usage**:
```bash
python jobs/lib/diarize.py audio.mp3
python jobs/lib/diarize.py audio.mp3 --max-speakers 3 --json-out
```

**Notes**:
- Uses MPS (Apple Silicon) acceleration automatically when available
- pyannote 3.x + torch >=2.6: script monkey-patches `torch.load` to use `weights_only=False`
- Model downloads ~1GB on first run (cached in `~/.cache/huggingface/`)
