# CEO Quality Backtest — Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Build a point-in-time CEO quality scoring pipeline that backtests leadership traits against forward stock returns across hundreds of companies.

**Architecture:** `finance/ceo_quality/` with 6 modules: dataset (company universe), assess (LLM scoring with Wayback), features (numeric extraction), predict (walk-forward CV), backtest (binary outcome), iterate (parameter search). SQLite DB for persistence, Finnhub for prices, Wayback Machine for point-in-time web content.

**Tech Stack:** Python, SQLite, Finnhub API (`lib/finnhub`), Wayback Machine (`lib/ingest/archive_fetch`), `lib/llm` (grok-fast default), scikit-learn, scipy, click CLI.

**Design doc:** `docs/plans/2026-03-03-ceo-quality-backtest-design.md`

---

### Task 1: Project scaffolding — CLAUDE.md, __init__.py, data markers

**Files:**
- Create: `finance/ceo_quality/__init__.py`
- Create: `finance/ceo_quality/CLAUDE.md`
- Create: `finance/ceo_quality/LOGBOOK.md`
- Create: `finance/ceo_quality/data/.share`
- Create: `finance/ceo_quality/reports/.share`

**Step 1: Create package init**

```python
"""CEO Quality Backtest — point-in-time CEO scoring vs forward returns."""
```

**Step 2: Create CLAUDE.md**

```markdown
# CEO Quality Backtest — Project Instructions

emoji: 👔

## Purpose

Test whether CEO/leadership quality predicts stock returns by scoring CEOs
point-in-time (using only information available as of a historical date)
and measuring forward returns.

## Architecture

| File         | Purpose                                              |
|-------------|------------------------------------------------------|
| dataset.py  | Build (company, as_of_date, ceo_name) tuples         |
| assess.py   | Point-in-time CEO scoring (LLM + Wayback + date)     |
| features.py | Extract numeric features from assessment JSON         |
| predict.py  | Walk-forward CV: CEO features → returns               |
| backtest.py | Binary outcome analysis (winners vs losers)           |
| iterate.py  | Vary rubric weights, prompt variations, feature combos|

## Data

- `data/dataset.db` — SQLite: companies, assessments, returns, features
- `data/assessments/` — Per-company point-in-time assessment JSONs
- `reports/` — Generated HTML reports

## Key Dependencies

- `lib/finnhub` — Stock prices + company profiles
- `lib/ingest/archive_fetch` — Wayback Machine snapshots
- `lib/llm` — CEO assessment LLM calls (default: grok-fast)
- `finance/vic_analysis/prices/` — Daily candles, trading calendar

## Running

```bash
# Build dataset (50 S&P 500 companies)
python -m finance.ceo_quality.dataset --sp500 --limit 50

# Assess CEOs point-in-time
python -m finance.ceo_quality.assess --as-of 2024-01-15 --limit 50

# Extract features + run backtest
python -m finance.ceo_quality.features
python -m finance.ceo_quality.backtest --report

# Walk-forward CV
python -m finance.ceo_quality.predict --report

# Iterate
python -m finance.ceo_quality.iterate --report
```
```

**Step 3: Create LOGBOOK.md, .share files**

LOGBOOK.md:
```markdown
# CEO Quality Backtest — Logbook

## 2026-03-03 — Project Setup

Initial build from design doc `docs/plans/2026-03-03-ceo-quality-backtest-design.md`.
```

data/.share: `CEO quality backtest data`
reports/.share: `CEO quality backtest reports`

**Step 4: Commit**

```bash
git add finance/ceo_quality/
git commit -m "feat(ceo_quality): scaffold project structure"
```

---

### Task 2: dataset.py — Build company universe with CEO names

**Files:**
- Create: `finance/ceo_quality/dataset.py`
- Create: `finance/ceo_quality/tests/test_dataset.py`

**Step 1: Write the failing test**

```python
"""Tests for dataset module."""
import sqlite3
from pathlib import Path
from unittest.mock import patch

from finance.ceo_quality.dataset import (
    DB_PATH,
    init_db,
    add_company,
    get_companies,
    fetch_sp500_constituents,
)


def test_init_db_creates_tables(tmp_path):
    db = tmp_path / "test.db"
    init_db(db)
    conn = sqlite3.connect(str(db))
    tables = [r[0] for r in conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table'"
    ).fetchall()]
    conn.close()
    assert "companies" in tables


def test_add_and_get_company(tmp_path):
    db = tmp_path / "test.db"
    init_db(db)
    add_company("AAPL", "Apple Inc.", "Tim Cook", "Technology", db_path=db)
    companies = get_companies(db_path=db)
    assert len(companies) == 1
    assert companies[0]["symbol"] == "AAPL"
    assert companies[0]["ceo_name"] == "Tim Cook"


def test_add_company_upsert(tmp_path):
    db = tmp_path / "test.db"
    init_db(db)
    add_company("AAPL", "Apple Inc.", "Tim Cook", "Technology", db_path=db)
    add_company("AAPL", "Apple Inc.", "New CEO", "Technology", db_path=db)
    companies = get_companies(db_path=db)
    assert len(companies) == 1
    assert companies[0]["ceo_name"] == "New CEO"


def test_fetch_sp500_returns_list():
    """Integration test — requires Finnhub API key."""
    with patch("finance.ceo_quality.dataset.finnhub_api") as mock_api:
        mock_api.return_value = {
            "constituents": ["AAPL", "MSFT", "GOOGL"],
            "symbol": "^GSPC",
        }
        result = fetch_sp500_constituents()
        assert isinstance(result, list)
        assert len(result) == 3
        assert "AAPL" in result
```

**Step 2: Run test to verify it fails**

Run: `cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_dataset.py -v`
Expected: FAIL (module not found)

**Step 3: Write dataset.py**

```python
#!/usr/bin/env python
"""Dataset builder — compile (company, ceo_name, sector) tuples.

Sources:
  1. S&P 500 constituents from Finnhub index API
  2. CEO names from Finnhub company profiles
  3. Existing semi universe (intel/companies/semi/)

Usage:
    python -m finance.ceo_quality.dataset --sp500 --limit 50
    python -m finance.ceo_quality.dataset --semi
    python -m finance.ceo_quality.dataset --tickers AAPL,MSFT,GOOGL
"""

import json
import sqlite3
import sys
import time
from pathlib import Path

import click
from loguru import logger

_RIVUS = Path(__file__).resolve().parent.parent.parent
if str(_RIVUS) not in sys.path:
    sys.path.insert(0, str(_RIVUS))

from lib.finnhub import api as finnhub_api, FinnhubError

DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "dataset.db"

SCHEMA = """
CREATE TABLE IF NOT EXISTS companies (
    symbol        TEXT PRIMARY KEY,
    name          TEXT,
    ceo_name      TEXT,
    sector        TEXT,
    industry      TEXT,
    market_cap    REAL,
    ipo_date      TEXT,
    source        TEXT,
    updated_at    TEXT DEFAULT (datetime('now'))
);
"""


def init_db(db_path: Path | None = None):
    db_path = db_path or DB_PATH
    db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(db_path))
    conn.executescript(SCHEMA)
    conn.close()


def add_company(symbol: str, name: str, ceo_name: str | None,
                sector: str | None, industry: str | None = None,
                market_cap: float | None = None, ipo_date: str | None = None,
                source: str = "finnhub", db_path: Path | None = None):
    db_path = db_path or DB_PATH
    conn = sqlite3.connect(str(db_path))
    conn.execute(
        """INSERT INTO companies (symbol, name, ceo_name, sector, industry,
           market_cap, ipo_date, source)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?)
           ON CONFLICT(symbol) DO UPDATE SET
             name=excluded.name, ceo_name=excluded.ceo_name,
             sector=excluded.sector, industry=excluded.industry,
             market_cap=excluded.market_cap, ipo_date=excluded.ipo_date,
             source=excluded.source, updated_at=datetime('now')""",
        (symbol, name, ceo_name, sector, industry, market_cap, ipo_date, source),
    )
    conn.commit()
    conn.close()


def get_companies(db_path: Path | None = None) -> list[dict]:
    db_path = db_path or DB_PATH
    conn = sqlite3.connect(str(db_path))
    conn.row_factory = sqlite3.Row
    rows = conn.execute("SELECT * FROM companies ORDER BY symbol").fetchall()
    conn.close()
    return [dict(r) for r in rows]


def fetch_sp500_constituents() -> list[str]:
    """Get S&P 500 constituent symbols from Finnhub."""
    data = finnhub_api("index/constituents", symbol="^GSPC")
    return sorted(data.get("constituents", []))


def fetch_company_profile(symbol: str) -> dict | None:
    """Fetch company profile from Finnhub. Returns dict or None."""
    try:
        profile = finnhub_api("stock/profile2", symbol=symbol)
        if not profile or not profile.get("name"):
            return None
        return profile
    except FinnhubError as e:
        logger.warning(f"Finnhub error for {symbol}: {e}")
        return None


def build_from_finnhub(symbols: list[str], db_path: Path | None = None):
    """Fetch profiles for a list of symbols and store in DB."""
    db_path = db_path or DB_PATH
    init_db(db_path)

    for i, sym in enumerate(symbols):
        profile = fetch_company_profile(sym)
        if not profile:
            logger.warning(f"[{i+1}/{len(symbols)}] {sym}: no profile")
            continue

        # Finnhub profile doesn't have CEO name directly — we get it from
        # company executives endpoint or use webUrl for Wayback later
        ceo_name = None  # Will be populated by assess.py or manual lookup

        add_company(
            symbol=sym,
            name=profile.get("name", ""),
            ceo_name=ceo_name,
            sector=profile.get("finnhubIndustry", ""),
            industry=profile.get("finnhubIndustry", ""),
            market_cap=profile.get("marketCapitalization"),
            ipo_date=profile.get("ipo", ""),
            source="finnhub",
            db_path=db_path,
        )
        logger.info(f"[{i+1}/{len(symbols)}] {sym}: {profile.get('name', '?')} "
                     f"({profile.get('finnhubIndustry', '?')})")

        # Rate limit courtesy
        if (i + 1) % 10 == 0:
            time.sleep(0.5)


def build_from_semi(db_path: Path | None = None):
    """Import existing semi universe companies."""
    db_path = db_path or DB_PATH
    init_db(db_path)

    semi_data = _RIVUS / "intel" / "companies" / "semi" / "data"
    if not semi_data.exists():
        logger.warning(f"Semi data not found at {semi_data}")
        return

    for json_file in sorted(semi_data.glob("*/assessment.json")):
        try:
            data = json.loads(json_file.read_text())
        except (json.JSONDecodeError, FileNotFoundError):
            continue

        company = json_file.parent.name
        symbol = data.get("symbol", "")
        name = data.get("company_name", company)

        # Extract CEO from founder_deep_dive if available
        ceo_name = None
        founder = data.get("founder_deep_dive", {})
        if isinstance(founder, dict):
            ceo_name = founder.get("name")

        if symbol:
            add_company(
                symbol=symbol, name=name, ceo_name=ceo_name,
                sector="Semiconductors", source="semi_universe",
                db_path=db_path,
            )
            logger.info(f"Semi: {symbol} ({name}) — CEO: {ceo_name or 'unknown'}")


@click.command()
@click.option("--sp500", is_flag=True, help="Load S&P 500 constituents")
@click.option("--semi", is_flag=True, help="Import semi universe")
@click.option("--tickers", help="Comma-separated ticker list")
@click.option("--limit", type=int, help="Limit number of companies")
@click.option("--list", "list_companies", is_flag=True, help="List current companies")
def main(sp500: bool, semi: bool, tickers: str | None, limit: int | None,
         list_companies: bool):
    """Build company dataset for CEO quality backtest."""
    init_db()

    if list_companies:
        companies = get_companies()
        print(f"{'Symbol':<8} {'Name':<30} {'CEO':<25} {'Sector':<20} {'Source'}")
        print("-" * 100)
        for c in companies:
            print(f"{c['symbol']:<8} {(c['name'] or '')[:29]:<30} "
                  f"{(c['ceo_name'] or '?')[:24]:<25} "
                  f"{(c['sector'] or '')[:19]:<20} {c['source']}")
        print(f"\nTotal: {len(companies)}")
        return

    if semi:
        build_from_semi()

    if sp500:
        symbols = fetch_sp500_constituents()
        if limit:
            symbols = symbols[:limit]
        logger.info(f"Building dataset from {len(symbols)} S&P 500 constituents")
        build_from_finnhub(symbols)

    if tickers:
        symbols = [t.strip() for t in tickers.split(",")]
        build_from_finnhub(symbols)

    companies = get_companies()
    logger.info(f"Dataset: {len(companies)} companies total")


if __name__ == "__main__":
    main()
```

**Step 4: Create tests/__init__.py**

Empty file.

**Step 5: Run tests**

Run: `cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_dataset.py -v`
Expected: PASS

**Step 6: Run integration test**

Run: `cd /Users/tchklovski/all-code/rivus && python -m finance.ceo_quality.dataset --sp500 --limit 5 && python -m finance.ceo_quality.dataset --list`
Expected: 5 companies listed with Finnhub profiles

**Step 7: Commit**

```bash
git add finance/ceo_quality/dataset.py finance/ceo_quality/tests/
git commit -m "feat(ceo_quality): dataset builder — S&P 500 + semi universe"
```

---

### Task 3: assess.py — Point-in-time CEO scoring with LLM + Wayback

**Files:**
- Create: `finance/ceo_quality/assess.py`
- Create: `finance/ceo_quality/tests/test_assess.py`

**Step 1: Write the failing test**

```python
"""Tests for CEO assessment module."""
import json
from pathlib import Path
from unittest.mock import AsyncMock, patch

import pytest

from finance.ceo_quality.assess import (
    SCORING_DIMENSIONS,
    build_assessment_prompt,
    parse_assessment_response,
    assess_ceo,
)


def test_scoring_dimensions_defined():
    assert len(SCORING_DIMENSIONS) == 6
    assert "track_record" in SCORING_DIMENSIONS
    assert "communication" in SCORING_DIMENSIONS


def test_build_assessment_prompt():
    prompt = build_assessment_prompt(
        company="Apple Inc.",
        ceo_name="Tim Cook",
        as_of_date="2024-01-15",
        wayback_context="Apple reported record Q4 revenue...",
    )
    assert "Tim Cook" in prompt
    assert "2024-01-15" in prompt
    assert "Apple" in prompt
    assert "record Q4" in prompt


def test_parse_assessment_response_valid():
    response = json.dumps({
        "track_record": 8, "decision_quality": 7,
        "technical_depth": 6, "team_building": 8,
        "drive_intensity": 9, "communication": 8,
        "overall_score": 7.7,
        "rationale": "Strong operator...",
    })
    result = parse_assessment_response(response)
    assert result["track_record"] == 8
    assert result["overall_score"] == 7.7
    assert "rationale" in result


def test_parse_assessment_response_extracts_json():
    response = "Here is my assessment:\n```json\n" + json.dumps({
        "track_record": 5, "decision_quality": 5,
        "technical_depth": 5, "team_building": 5,
        "drive_intensity": 5, "communication": 5,
        "overall_score": 5.0, "rationale": "Average",
    }) + "\n```\nThat's my analysis."
    result = parse_assessment_response(response)
    assert result["track_record"] == 5


@pytest.mark.asyncio
async def test_assess_ceo_returns_scores():
    mock_response = json.dumps({
        "track_record": 8, "decision_quality": 7,
        "technical_depth": 6, "team_building": 8,
        "drive_intensity": 9, "communication": 8,
        "overall_score": 7.7, "rationale": "Strong...",
    })
    with patch("finance.ceo_quality.assess.call_llm", new_callable=AsyncMock,
               return_value=mock_response):
        with patch("finance.ceo_quality.assess.fetch_wayback_context",
                   return_value="Some context"):
            result = await assess_ceo("AAPL", "Apple Inc.", "Tim Cook", "2024-01-15")
            assert result["track_record"] == 8
            assert result["symbol"] == "AAPL"
            assert result["as_of_date"] == "2024-01-15"
```

**Step 2: Run test to verify it fails**

Run: `cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_assess.py -v`
Expected: FAIL (module not found)

**Step 3: Write assess.py**

```python
#!/usr/bin/env python
"""Point-in-time CEO assessment — score leadership quality as of a historical date.

Uses Wayback Machine snapshots + date-constrained LLM to ensure only information
available before the as-of date is used. This prevents look-ahead bias.

Usage:
    python -m finance.ceo_quality.assess --as-of 2024-01-15 --limit 10
    python -m finance.ceo_quality.assess --symbol AAPL --as-of 2024-01-15
"""

import asyncio
import json
import re
import sqlite3
import sys
from datetime import datetime
from pathlib import Path

import click
from loguru import logger

_RIVUS = Path(__file__).resolve().parent.parent.parent
if str(_RIVUS) not in sys.path:
    sys.path.insert(0, str(_RIVUS))

from lib.llm import call_llm
from lib.ingest.archive_fetch import find_snapshots, fetch_snapshot
from lib.ingest.html_utils import extract_text_from_html

DATA_DIR = Path(__file__).parent / "data"
ASSESSMENTS_DIR = DATA_DIR / "assessments"
DB_PATH = DATA_DIR / "dataset.db"

SCORING_DIMENSIONS = {
    "track_record": "Prior exits, revenue milestones, career trajectory",
    "decision_quality": "Strategic pivots, M&A outcomes, capital allocation",
    "technical_depth": "Domain expertise, patents, engineering background",
    "team_building": "Executive retention, key hires, org development",
    "drive_intensity": "Ambition, work ethic, boldness, risk appetite",
    "communication": "Earnings call clarity, honesty, vision articulation",
}

DEFAULT_MODEL = "xai/grok-4-1-fast"


def build_assessment_prompt(company: str, ceo_name: str, as_of_date: str,
                            wayback_context: str | None = None,
                            profile_context: str | None = None) -> str:
    """Build the CEO assessment prompt with point-in-time constraints."""
    dim_text = "\n".join(f"- **{k}** ({v}): score 0-10"
                         for k, v in SCORING_DIMENSIONS.items())

    context_parts = []
    if wayback_context:
        context_parts.append(f"### Company web content (archived as of ~{as_of_date}):\n{wayback_context[:3000]}")
    if profile_context:
        context_parts.append(f"### Company profile:\n{profile_context}")
    context_block = "\n\n".join(context_parts) if context_parts else "(No archived content available)"

    return f"""You are assessing the CEO of {company} as of {as_of_date}.

CRITICAL: Only use information that would have been publicly available
before {as_of_date}. Do not reference events after this date.
You are scoring leadership quality to predict future stock performance.

## CEO: {ceo_name}

## Available Context

{context_block}

## Scoring Dimensions

Rate each dimension 0-10:

{dim_text}

## Instructions

1. Assess each dimension based on publicly available information as of {as_of_date}
2. Be calibrated: 5 = average public company CEO, 7 = clearly above average, 9 = exceptional
3. If you lack information for a dimension, score 5 (neutral) and note it in rationale
4. Compute overall_score as the simple average of all dimension scores

Respond with ONLY a JSON object:
```json
{{
    "track_record": <0-10>,
    "decision_quality": <0-10>,
    "technical_depth": <0-10>,
    "team_building": <0-10>,
    "drive_intensity": <0-10>,
    "communication": <0-10>,
    "overall_score": <float>,
    "rationale": "<2-3 sentence summary of key strengths and weaknesses>"
}}
```"""


def parse_assessment_response(response: str) -> dict:
    """Extract JSON assessment from LLM response."""
    # Try direct JSON parse
    try:
        return json.loads(response)
    except json.JSONDecodeError:
        pass

    # Extract from markdown code block
    match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", response, re.DOTALL)
    if match:
        try:
            return json.loads(match.group(1))
        except json.JSONDecodeError:
            pass

    # Last resort: find first { ... }
    match = re.search(r"\{.*\}", response, re.DOTALL)
    if match:
        try:
            return json.loads(match.group(0))
        except json.JSONDecodeError:
            pass

    raise ValueError(f"Could not parse assessment JSON from response: {response[:200]}")


def fetch_wayback_context(company_name: str, as_of_date: str) -> str | None:
    """Fetch company web content from Wayback Machine near the as-of date."""
    # Common IR page patterns
    slug = company_name.lower().replace(" ", "").replace(",", "").replace(".", "")
    urls_to_try = [
        f"https://www.{slug}.com/",
        f"https://ir.{slug}.com/",
    ]

    as_of_ts = as_of_date.replace("-", "")  # YYYYMMDD format

    for url in urls_to_try:
        try:
            snapshots = find_snapshots(url, limit=5)
            if not snapshots:
                continue

            # Find closest snapshot before as_of_date
            best = None
            for snap in snapshots:
                if snap["timestamp"][:8] <= as_of_ts:
                    best = snap
                    break
            if not best:
                best = snapshots[-1]  # oldest available

            html, info = fetch_snapshot(url, timestamp=best["timestamp"])
            text = extract_text_from_html(html)
            if text and len(text) > 100:
                logger.info(f"Wayback: got {len(text)} chars from {url} "
                           f"(snapshot {best['timestamp']})")
                return text[:5000]
        except Exception as e:
            logger.debug(f"Wayback failed for {url}: {e}")
            continue

    return None


async def assess_ceo(symbol: str, company: str, ceo_name: str,
                     as_of_date: str, model: str = DEFAULT_MODEL) -> dict:
    """Run point-in-time CEO assessment."""
    # Gather context
    wayback_ctx = fetch_wayback_context(company, as_of_date)

    # Build and call LLM
    prompt = build_assessment_prompt(
        company=company,
        ceo_name=ceo_name,
        as_of_date=as_of_date,
        wayback_context=wayback_ctx,
    )

    response = await call_llm(model=model, prompt=prompt)
    result = parse_assessment_response(str(response))

    # Add metadata
    result["symbol"] = symbol
    result["company"] = company
    result["ceo_name"] = ceo_name
    result["as_of_date"] = as_of_date
    result["model"] = model
    result["has_wayback"] = wayback_ctx is not None
    result["assessed_at"] = datetime.now().isoformat()

    # Save to file
    ASSESSMENTS_DIR.mkdir(parents=True, exist_ok=True)
    out_file = ASSESSMENTS_DIR / f"{symbol}_{as_of_date}.json"
    out_file.write_text(json.dumps(result, indent=2))
    logger.info(f"Saved assessment: {out_file.name} (overall: {result.get('overall_score', '?')})")

    # Save to DB
    _save_assessment_to_db(result)

    return result


def _save_assessment_to_db(result: dict):
    """Store assessment in dataset.db."""
    conn = sqlite3.connect(str(DB_PATH))
    conn.execute("""
        CREATE TABLE IF NOT EXISTS assessments (
            symbol      TEXT,
            as_of_date  TEXT,
            ceo_name    TEXT,
            track_record     REAL,
            decision_quality REAL,
            technical_depth  REAL,
            team_building    REAL,
            drive_intensity  REAL,
            communication    REAL,
            overall_score    REAL,
            rationale   TEXT,
            model       TEXT,
            has_wayback INTEGER,
            assessed_at TEXT,
            PRIMARY KEY (symbol, as_of_date)
        )
    """)
    conn.execute(
        """INSERT OR REPLACE INTO assessments
           (symbol, as_of_date, ceo_name, track_record, decision_quality,
            technical_depth, team_building, drive_intensity, communication,
            overall_score, rationale, model, has_wayback, assessed_at)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (result["symbol"], result["as_of_date"], result.get("ceo_name"),
         result.get("track_record"), result.get("decision_quality"),
         result.get("technical_depth"), result.get("team_building"),
         result.get("drive_intensity"), result.get("communication"),
         result.get("overall_score"), result.get("rationale"),
         result.get("model"), int(result.get("has_wayback", False)),
         result.get("assessed_at")),
    )
    conn.commit()
    conn.close()


async def batch_assess(as_of_date: str, limit: int | None = None,
                       model: str = DEFAULT_MODEL):
    """Assess all companies in dataset that haven't been assessed yet."""
    conn = sqlite3.connect(str(DB_PATH))
    conn.row_factory = sqlite3.Row

    # Get companies not yet assessed for this as_of_date
    query = """
        SELECT c.* FROM companies c
        LEFT JOIN assessments a ON c.symbol = a.symbol AND a.as_of_date = ?
        WHERE a.symbol IS NULL AND c.ceo_name IS NOT NULL
        ORDER BY c.symbol
    """
    rows = conn.execute(query, (as_of_date,)).fetchall()
    conn.close()

    companies = [dict(r) for r in rows]
    if limit:
        companies = companies[:limit]

    logger.info(f"Assessing {len(companies)} companies as of {as_of_date}")

    results = []
    for i, comp in enumerate(companies):
        logger.info(f"[{i+1}/{len(companies)}] {comp['symbol']}: {comp['name']} "
                     f"(CEO: {comp['ceo_name']})")
        try:
            result = await assess_ceo(
                symbol=comp["symbol"],
                company=comp["name"],
                ceo_name=comp["ceo_name"],
                as_of_date=as_of_date,
                model=model,
            )
            results.append(result)
        except Exception as e:
            logger.error(f"Failed to assess {comp['symbol']}: {e}")

    logger.info(f"Assessed {len(results)}/{len(companies)} companies")
    return results


@click.command()
@click.option("--as-of", required=True, help="Assessment date (YYYY-MM-DD)")
@click.option("--symbol", help="Assess single symbol")
@click.option("--limit", type=int, help="Max companies to assess")
@click.option("--model", default=DEFAULT_MODEL, help="LLM model for assessment")
def main(as_of: str, symbol: str | None, limit: int | None, model: str):
    """Run point-in-time CEO assessments."""
    if symbol:
        conn = sqlite3.connect(str(DB_PATH))
        conn.row_factory = sqlite3.Row
        row = conn.execute("SELECT * FROM companies WHERE symbol = ?", (symbol,)).fetchone()
        conn.close()
        if not row:
            logger.error(f"Symbol {symbol} not in dataset")
            return
        comp = dict(row)
        result = asyncio.run(assess_ceo(
            symbol=comp["symbol"], company=comp["name"],
            ceo_name=comp["ceo_name"] or "Unknown CEO",
            as_of_date=as_of, model=model,
        ))
        print(json.dumps(result, indent=2))
    else:
        asyncio.run(batch_assess(as_of, limit=limit, model=model))


if __name__ == "__main__":
    main()
```

**Step 4: Run tests**

Run: `cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_assess.py -v`
Expected: PASS

**Step 5: Commit**

```bash
git add finance/ceo_quality/assess.py finance/ceo_quality/tests/test_assess.py
git commit -m "feat(ceo_quality): point-in-time CEO assessment with Wayback + LLM"
```

---

### Task 4: features.py — Extract numeric features from assessments

**Files:**
- Create: `finance/ceo_quality/features.py`
- Create: `finance/ceo_quality/tests/test_features.py`

**Step 1: Write the failing test**

```python
"""Tests for feature extraction."""
import numpy as np
import pandas as pd

from finance.ceo_quality.features import (
    load_assessments_df,
    compute_feature_matrix,
    add_returns,
)


def test_compute_feature_matrix():
    df = pd.DataFrame([
        {"symbol": "AAPL", "track_record": 8, "decision_quality": 7,
         "technical_depth": 6, "team_building": 8,
         "drive_intensity": 9, "communication": 8, "overall_score": 7.7},
        {"symbol": "MSFT", "track_record": 7, "decision_quality": 8,
         "technical_depth": 7, "team_building": 7,
         "drive_intensity": 7, "communication": 7, "overall_score": 7.2},
    ])
    X, feature_names = compute_feature_matrix(df)
    assert X.shape == (2, 7)  # 6 dimensions + overall
    assert "track_record" in feature_names
    assert "overall_score" in feature_names
    assert not np.isnan(X).any()
```

**Step 2: Write features.py**

```python
#!/usr/bin/env python
"""Feature extraction — convert CEO assessments into numeric feature matrix.

Loads assessments from dataset.db, produces feature arrays suitable for
sklearn models in predict.py and backtest.py.

Usage:
    python -m finance.ceo_quality.features
    python -m finance.ceo_quality.features --as-of 2024-01-15
"""

import sqlite3
import sys
from pathlib import Path

import click
import numpy as np
import pandas as pd
from loguru import logger

_RIVUS = Path(__file__).resolve().parent.parent.parent
if str(_RIVUS) not in sys.path:
    sys.path.insert(0, str(_RIVUS))

from finance.vic_analysis.prices.daily import fetch_daily_candles
from finance.vic_analysis.prices.calendar import next_trading_day, trading_day_offset

DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "dataset.db"

SCORE_COLS = [
    "track_record", "decision_quality", "technical_depth",
    "team_building", "drive_intensity", "communication",
]


def load_assessments_df(as_of_date: str | None = None,
                        db_path: Path | None = None) -> pd.DataFrame:
    """Load assessments from DB into DataFrame."""
    db_path = db_path or DB_PATH
    conn = sqlite3.connect(str(db_path))
    query = "SELECT * FROM assessments"
    params = []
    if as_of_date:
        query += " WHERE as_of_date = ?"
        params.append(as_of_date)
    df = pd.read_sql(query, conn, params=params)
    conn.close()
    return df


def compute_feature_matrix(df: pd.DataFrame) -> tuple[np.ndarray, list[str]]:
    """Extract numeric feature matrix from assessment DataFrame."""
    feature_cols = SCORE_COLS + ["overall_score"]
    X = df[feature_cols].fillna(5.0).values.astype(np.float64)
    return X, feature_cols


def add_returns(df: pd.DataFrame, horizons: list[int] | None = None,
                benchmark: str = "SPY") -> pd.DataFrame:
    """Add forward return columns for each horizon (days) from as_of_date."""
    if horizons is None:
        horizons = [90, 180, 365]

    import asyncio

    async def _fetch_returns_for_row(symbol: str, as_of: str, horizon: int):
        """Compute excess return for one (symbol, as_of, horizon) triple."""
        as_of_dt = pd.Timestamp(as_of)
        start_dt = next_trading_day(as_of_dt)
        end_dt = trading_day_offset(start_dt, horizon)

        try:
            candles = await fetch_daily_candles(symbol, start_dt.strftime("%Y-%m-%d"),
                                                end_dt.strftime("%Y-%m-%d"))
            if not candles or len(candles) < 2:
                return None
            stock_ret = (candles[-1]["close"] / candles[0]["open"] - 1) * 100

            bench = await fetch_daily_candles(benchmark, start_dt.strftime("%Y-%m-%d"),
                                              end_dt.strftime("%Y-%m-%d"))
            if not bench or len(bench) < 2:
                return stock_ret
            bench_ret = (bench[-1]["close"] / bench[0]["open"] - 1) * 100

            return stock_ret - bench_ret
        except Exception as e:
            logger.debug(f"Returns failed for {symbol} {horizon}d: {e}")
            return None

    async def _batch():
        for h in horizons:
            col = f"ret_{h}d_excess"
            returns = []
            for _, row in df.iterrows():
                ret = await _fetch_returns_for_row(row["symbol"], row["as_of_date"], h)
                returns.append(ret)
            df[col] = returns
            n_valid = sum(1 for r in returns if r is not None)
            logger.info(f"Returns {h}d: {n_valid}/{len(df)} valid")

    asyncio.run(_batch())
    return df


@click.command()
@click.option("--as-of", help="Filter to specific as-of date")
@click.option("--horizons", default="90,180,365", help="Comma-separated horizons in days")
def main(as_of: str | None, horizons: str):
    """Extract features and fetch returns for assessments."""
    df = load_assessments_df(as_of)
    if df.empty:
        logger.warning("No assessments found")
        return

    logger.info(f"Loaded {len(df)} assessments")

    X, names = compute_feature_matrix(df)
    logger.info(f"Feature matrix: {X.shape} ({', '.join(names)})")

    horizon_list = [int(h) for h in horizons.split(",")]
    df = add_returns(df, horizons=horizon_list)

    # Show summary
    for h in horizon_list:
        col = f"ret_{h}d_excess"
        valid = df[col].dropna()
        if len(valid) > 0:
            print(f"\n{h}d excess returns: n={len(valid)}, "
                  f"mean={valid.mean():+.1f}%, median={valid.median():+.1f}%")


if __name__ == "__main__":
    main()
```

**Step 3: Run tests and commit**

Run: `cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_features.py -v`

```bash
git add finance/ceo_quality/features.py finance/ceo_quality/tests/test_features.py
git commit -m "feat(ceo_quality): feature extraction + forward returns"
```

---

### Task 5: backtest.py — Binary outcome analysis (winners vs losers)

**Files:**
- Create: `finance/ceo_quality/backtest.py`
- Create: `finance/ceo_quality/tests/test_backtest.py`

**Step 1: Write the failing test**

```python
"""Tests for binary outcome backtest."""
import numpy as np
import pandas as pd

from finance.ceo_quality.backtest import (
    label_binary_outcomes,
    compute_feature_importance,
    run_binary_analysis,
)


def test_label_binary_outcomes():
    returns = pd.Series([50, 30, 10, 5, -5, -10, -30, -50])
    labels = label_binary_outcomes(returns, top_pct=0.25, bottom_pct=0.25)
    assert labels.sum() == 2  # top 25% = 2 items labeled 1
    assert (labels == 0).sum() == 2  # bottom 25% labeled 0
    assert labels.isna().sum() == 4  # middle 50% dropped


def test_run_binary_analysis():
    np.random.seed(42)
    n = 40
    df = pd.DataFrame({
        "track_record": np.random.randint(3, 10, n),
        "decision_quality": np.random.randint(3, 10, n),
        "technical_depth": np.random.randint(3, 10, n),
        "team_building": np.random.randint(3, 10, n),
        "drive_intensity": np.random.randint(3, 10, n),
        "communication": np.random.randint(3, 10, n),
        "overall_score": np.random.uniform(4, 9, n),
        "ret_365d_excess": np.random.normal(0, 30, n),
    })
    result = run_binary_analysis(df, horizon=365)
    assert "auc" in result
    assert "feature_importance" in result
    assert 0 <= result["auc"] <= 1
```

**Step 2: Write backtest.py**

```python
#!/usr/bin/env python
"""Binary outcome backtest — classify CEOs as winners or losers.

Top quartile excess return → 1 (winner), bottom quartile → 0 (loser).
Middle 50% dropped as ambiguous. Train classifiers on CEO features.

Usage:
    python -m finance.ceo_quality.backtest --horizon 365
    python -m finance.ceo_quality.backtest --report
"""

import sys
from pathlib import Path

import click
import numpy as np
import pandas as pd
from loguru import logger
from scipy.stats import spearmanr

_RIVUS = Path(__file__).resolve().parent.parent.parent
if str(_RIVUS) not in sys.path:
    sys.path.insert(0, str(_RIVUS))

from finance.ceo_quality.features import (
    SCORE_COLS, load_assessments_df, compute_feature_matrix,
)

DATA_DIR = Path(__file__).parent / "data"
REPORTS_DIR = Path(__file__).parent / "reports"


def label_binary_outcomes(returns: pd.Series, top_pct: float = 0.25,
                          bottom_pct: float = 0.25) -> pd.Series:
    """Label returns as 1 (top quartile) or 0 (bottom quartile), NaN for middle."""
    top_thresh = returns.quantile(1 - top_pct)
    bottom_thresh = returns.quantile(bottom_pct)

    labels = pd.Series(np.nan, index=returns.index)
    labels[returns >= top_thresh] = 1
    labels[returns <= bottom_thresh] = 0
    return labels


def compute_feature_importance(X: np.ndarray, y: np.ndarray,
                               feature_names: list[str]) -> dict:
    """Compute feature importance via logistic regression coefficients."""
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import StandardScaler

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    model = LogisticRegression(max_iter=1000, random_state=42)
    model.fit(X_scaled, y)

    importance = dict(zip(feature_names, model.coef_[0]))
    return dict(sorted(importance.items(), key=lambda x: abs(x[1]), reverse=True))


def run_binary_analysis(df: pd.DataFrame, horizon: int = 365,
                        top_pct: float = 0.25, bottom_pct: float = 0.25) -> dict:
    """Run binary outcome analysis on assessment data."""
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import cross_val_score
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import roc_auc_score

    ret_col = f"ret_{horizon}d_excess"
    if ret_col not in df.columns:
        raise ValueError(f"Missing {ret_col} — run features.py first")

    # Drop rows without returns
    valid = df.dropna(subset=[ret_col])
    if len(valid) < 10:
        logger.warning(f"Only {len(valid)} companies with {horizon}d returns")
        return {"auc": 0.5, "feature_importance": {}, "n_samples": len(valid)}

    # Label binary outcomes
    labels = label_binary_outcomes(valid[ret_col], top_pct, bottom_pct)
    mask = labels.notna()
    valid = valid[mask].reset_index(drop=True)
    y = labels[mask].values.astype(int)

    X, feature_names = compute_feature_matrix(valid)

    if len(y) < 10 or y.sum() < 2 or (1 - y).sum() < 2:
        logger.warning(f"Too few samples for classification: {len(y)} "
                       f"(winners={y.sum()}, losers={(1-y).sum()})")
        return {"auc": 0.5, "feature_importance": {}, "n_samples": len(y)}

    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Logistic regression with CV
    model = LogisticRegression(max_iter=1000, random_state=42)
    n_splits = min(5, min(y.sum(), (1 - y).sum()))
    n_splits = max(2, n_splits)

    cv_scores = cross_val_score(model, X_scaled, y, cv=n_splits, scoring="roc_auc")
    auc = cv_scores.mean()

    # Feature importance
    model.fit(X_scaled, y)
    importance = compute_feature_importance(X, y, feature_names)

    # Spearman correlations per dimension
    correlations = {}
    for col in SCORE_COLS + ["overall_score"]:
        if col in valid.columns:
            rho, p = spearmanr(valid[col], valid[ret_col])
            correlations[col] = {"rho": float(rho), "p": float(p)}

    result = {
        "auc": float(auc),
        "auc_std": float(cv_scores.std()),
        "cv_scores": cv_scores.tolist(),
        "feature_importance": importance,
        "correlations": correlations,
        "n_samples": len(y),
        "n_winners": int(y.sum()),
        "n_losers": int((1 - y).sum()),
        "horizon": horizon,
    }

    return result


def generate_report(results: dict, output: Path | None = None) -> str:
    """Generate HTML report for binary analysis."""
    REPORTS_DIR.mkdir(parents=True, exist_ok=True)
    output = output or REPORTS_DIR / "backtest_report.html"

    importance_rows = ""
    for feat, coef in results.get("feature_importance", {}).items():
        bar_width = min(abs(coef) * 50, 200)
        color = "#4CAF50" if coef > 0 else "#f44336"
        importance_rows += f"""
        <tr>
            <td>{feat}</td>
            <td style="text-align:right">{coef:+.3f}</td>
            <td><div style="width:{bar_width}px;height:16px;background:{color};
                 border-radius:3px"></div></td>
        </tr>"""

    corr_rows = ""
    for feat, vals in results.get("correlations", {}).items():
        sig = "*" if vals["p"] < 0.10 else ""
        corr_rows += f"""
        <tr>
            <td>{feat}</td>
            <td style="text-align:right">{vals['rho']:+.3f}{sig}</td>
            <td style="text-align:right">{vals['p']:.3f}</td>
        </tr>"""

    html = f"""<!DOCTYPE html>
<html><head><meta charset="utf-8">
<title>CEO Quality Backtest</title>
<style>
body {{ background:#1a1a2e; color:#e0e0e0; font-family:system-ui; padding:2rem; }}
table {{ border-collapse:collapse; margin:1rem 0; }}
th,td {{ padding:6px 12px; border:1px solid #333; }}
th {{ background:#16213e; }}
h1,h2 {{ color:#e94560; }}
.metric {{ font-size:2rem; color:#0f3460; }}
</style></head><body>
<h1>CEO Quality Backtest — Binary Outcome Analysis</h1>
<p>Horizon: {results['horizon']}d | Samples: {results['n_samples']}
   (Winners: {results['n_winners']}, Losers: {results['n_losers']})</p>

<h2>AUC: {results['auc']:.3f} +/- {results.get('auc_std', 0):.3f}</h2>
<p>{'Signal detected!' if results['auc'] > 0.55 else 'Weak/no signal'}</p>

<h2>Feature Importance (Logistic Regression)</h2>
<table><tr><th>Feature</th><th>Coefficient</th><th>Magnitude</th></tr>
{importance_rows}</table>

<h2>Spearman Correlations (scores vs excess returns)</h2>
<table><tr><th>Dimension</th><th>rho</th><th>p-value</th></tr>
{corr_rows}</table>
<p>* p &lt; 0.10</p>
</body></html>"""

    output.write_text(html)
    logger.info(f"Report saved: {output}")
    return html


@click.command()
@click.option("--horizon", default=365, type=int, help="Forward return horizon (days)")
@click.option("--as-of", help="Filter assessments by as-of date")
@click.option("--report", is_flag=True, help="Generate HTML report")
def main(horizon: int, as_of: str | None, report: bool):
    """Run binary outcome backtest."""
    df = load_assessments_df(as_of)
    if df.empty:
        logger.error("No assessments found — run assess.py first")
        return

    result = run_binary_analysis(df, horizon=horizon)

    print(f"\nBinary Backtest Results ({horizon}d horizon)")
    print(f"  Samples: {result['n_samples']} (W:{result['n_winners']} L:{result['n_losers']})")
    print(f"  AUC: {result['auc']:.3f} ± {result.get('auc_std', 0):.3f}")
    print(f"\n  Feature importance:")
    for feat, coef in result.get("feature_importance", {}).items():
        print(f"    {feat:<20s} {coef:+.3f}")
    print(f"\n  Correlations:")
    for feat, vals in result.get("correlations", {}).items():
        sig = " *" if vals["p"] < 0.10 else ""
        print(f"    {feat:<20s} ρ={vals['rho']:+.3f} (p={vals['p']:.3f}){sig}")

    if report:
        generate_report(result)


if __name__ == "__main__":
    main()
```

**Step 3: Run tests and commit**

```bash
cd /Users/tchklovski/all-code/rivus && python -m pytest finance/ceo_quality/tests/test_backtest.py -v
git add finance/ceo_quality/backtest.py finance/ceo_quality/tests/test_backtest.py
git commit -m "feat(ceo_quality): binary outcome backtest with feature importance"
```

---

### Task 6: predict.py — Walk-forward CV

**Files:**
- Create: `finance/ceo_quality/predict.py`

**Key pattern:** Reuse walk-forward expanding-window from `finance/vic_analysis/predict_robust.py` but simplified — no PCA (only 7 features), LogisticRegression instead of RidgeCV, binary target.

```python
#!/usr/bin/env python
"""Walk-forward cross-validation — CEO features → binary outcomes.

Expanding window with embargo gap, same pattern as predict_robust.py.

Usage:
    python -m finance.ceo_quality.predict --report
    python -m finance.ceo_quality.predict --horizon 180
"""
# Implementation: load assessments across multiple as_of dates,
# split by as_of_date year, train on earlier years, test on later.
# Embargo: training as_of + horizon < first test as_of date.
# Metrics: AUC, Spearman, feature importance per fold.
```

Full implementation follows the `walk_forward_cv` pattern from predict_robust.py but with binary LogisticRegression. The train/test split uses `as_of_date` year. Embargo gap = horizon days.

**Commit:**
```bash
git add finance/ceo_quality/predict.py
git commit -m "feat(ceo_quality): walk-forward CV for CEO quality prediction"
```

---

### Task 7: iterate.py — Parameter search

**Files:**
- Create: `finance/ceo_quality/iterate.py`

**Purpose:** Vary rubric weights, assessment prompt variations, feature combos, universe filters, outcome horizons, and model choice. Rank by AUC on held-out test set.

**Key variants to try:**
1. Dimension weights: equal, track_record-heavy, communication-heavy, drive-heavy
2. Horizons: 90d, 180d, 365d
3. Universe filters: all, tech-only, large-cap-only
4. Feature combos: all dims, top-3 only, overall_score only

**Commit:**
```bash
git add finance/ceo_quality/iterate.py
git commit -m "feat(ceo_quality): parameter search iteration loop"
```

---

### Task 8: Phase 1 execution — 50 companies, 1 as-of date

**Steps:**

1. Build dataset: `python -m finance.ceo_quality.dataset --sp500 --limit 50`
2. Manually populate CEO names for companies missing them (assess.py skips companies without CEO names, but batch_assess can use web_search via LLM)
3. Run assessments: `python -m finance.ceo_quality.assess --as-of 2024-01-15 --limit 50`
4. Fetch returns: `python -m finance.ceo_quality.features --as-of 2024-01-15`
5. Run backtest: `python -m finance.ceo_quality.backtest --as-of 2024-01-15 --report`
6. Update LOGBOOK.md with findings

Expected: ~$2 LLM cost, ~50 Finnhub calls. Report shows which CEO dimensions predict 1-year forward returns.

**Commit:**
```bash
git add finance/ceo_quality/LOGBOOK.md finance/ceo_quality/data/ finance/ceo_quality/reports/
git commit -m "data(ceo_quality): Phase 1 results — 50 S&P companies"
```
