# Company Data Consolidation Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Consolidate company data from 4 scattered stores into a single registry at `intel/companies/data/companies.db` with tag-based universe membership and CEO history.

**Architecture:** The existing `intel/companies/data/companies.db` (9,502 companies from supplychain migration) becomes the single source of truth. A new `company_tags` table maps companies to universes (ai_tech, crypto_treasury, darlings, supplychain, semi, ceo_quality). A new `ceo_history` table tracks CEO tenure. The `company_analysis` job switches from YAML watchlist discovery to DB-driven `company_registry` discovery. `finance/ceo_quality/dataset.py` reads from the consolidated DB instead of its own.

**Tech Stack:** SQLite, Python, Click, PyYAML

---

### Task 1: Add `company_tags` and `ceo_history` tables to companies.db

**Files:**
- Modify: `intel/companies/common.py` (add table creation to `init_db()`)

**Step 1: Add schema to `init_db()`**

In `intel/companies/common.py`, add to the `init_db()` function after the existing CREATE TABLE statements:

```python
db.execute("""
    CREATE TABLE IF NOT EXISTS company_tags (
        company_id  INTEGER NOT NULL REFERENCES companies(id),
        tag         TEXT NOT NULL,
        added_at    TEXT DEFAULT (datetime('now')),
        metadata    TEXT,
        PRIMARY KEY (company_id, tag)
    )
""")
db.execute("""
    CREATE INDEX IF NOT EXISTS idx_company_tags_tag ON company_tags(tag)
""")
db.execute("""
    CREATE TABLE IF NOT EXISTS ceo_history (
        id          INTEGER PRIMARY KEY AUTOINCREMENT,
        company_id  INTEGER NOT NULL REFERENCES companies(id),
        ceo_name    TEXT NOT NULL,
        role_start  TEXT,
        role_end    TEXT,
        source      TEXT NOT NULL,
        confidence  TEXT,
        fetched_at  TEXT DEFAULT (datetime('now')),
        UNIQUE(company_id, ceo_name, role_start)
    )
""")
db.commit()
```

**Step 2: Run init to create tables**

Run: `python -c "import sys; sys.path.insert(0,'.'); from intel.companies.common import init_db; init_db()"`

Verify: `sqlite3 intel/companies/data/companies.db ".tables"` shows `company_tags` and `ceo_history`

**Step 3: Commit**

```bash
git add intel/companies/common.py
git commit -m "feat(intel/companies): add company_tags and ceo_history tables"
```

---

### Task 2: Create registry module with tag + CEO operations

**Files:**
- Create: `intel/companies/registry.py`

**Step 1: Write the registry module**

```python
#!/usr/bin/env python
"""Company registry — single source of truth for company data.

Provides tag-based universe membership and CEO history tracking.
All company data lives in intel/companies/data/companies.db.
"""

import json
import sqlite3
from pathlib import Path

from intel.companies.common import DB_PATH, init_db

def open_registry(db_path: Path | None = None) -> sqlite3.Connection:
    """Open registry DB with row_factory."""
    db_path = db_path or DB_PATH
    conn = sqlite3.connect(str(db_path))
    conn.row_factory = sqlite3.Row
    return conn


def ensure_company(conn: sqlite3.Connection, *, name: str, ticker: str | None = None,
                   sector: str | None = None, industry: str | None = None,
                   market_cap_m: float | None = None, country: str | None = None,
                   description: str | None = None) -> int:
    """Insert or update company, return company_id.

    Matches by ticker first (if provided), then by name.
    """
    row = None
    if ticker:
        row = conn.execute(
            "SELECT id FROM companies WHERE ticker = ? OR symbol = ?",
            (ticker, ticker),
        ).fetchone()
    if not row:
        row = conn.execute(
            "SELECT id FROM companies WHERE name = ? OR canonical_name = ?",
            (name, name),
        ).fetchone()

    if row:
        company_id = row["id"]
        updates = []
        params = []
        if ticker:
            updates.append("ticker = COALESCE(?, ticker)")
            params.append(ticker)
            updates.append("symbol = COALESCE(?, symbol)")
            params.append(ticker)
        if sector:
            updates.append("sector = ?")
            params.append(sector)
        if industry:
            updates.append("industry = ?")
            params.append(industry)
        if market_cap_m is not None:
            updates.append("market_cap_m = ?")
            params.append(market_cap_m)
        if description:
            updates.append("description = ?")
            params.append(description)
        if updates:
            updates.append("updated_at = datetime('now')")
            params.append(company_id)
            conn.execute(
                f"UPDATE companies SET {', '.join(updates)} WHERE id = ?",
                params,
            )
        return company_id

    conn.execute(
        """INSERT INTO companies (name, ticker, symbol, sector, industry,
           market_cap_m, country, description, is_public)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)""",
        (name, ticker, ticker, sector, industry, market_cap_m, country, description),
    )
    return conn.execute("SELECT last_insert_rowid()").fetchone()[0]


def tag_company(conn: sqlite3.Connection, company_id: int, tag: str,
                metadata: dict | None = None) -> None:
    """Add a tag to a company (idempotent)."""
    conn.execute(
        """INSERT INTO company_tags (company_id, tag, metadata)
           VALUES (?, ?, ?)
           ON CONFLICT(company_id, tag) DO UPDATE SET
             metadata = COALESCE(excluded.metadata, company_tags.metadata)""",
        (company_id, tag, json.dumps(metadata) if metadata else None),
    )


def get_companies_by_tag(conn: sqlite3.Connection, tag: str) -> list[dict]:
    """Get all companies with a given tag."""
    rows = conn.execute(
        """SELECT c.*, ct.tag, ct.metadata as tag_metadata
           FROM companies c
           JOIN company_tags ct ON c.id = ct.company_id
           WHERE ct.tag = ?
           ORDER BY c.name""",
        (tag,),
    ).fetchall()
    return [dict(r) for r in rows]


def get_tags(conn: sqlite3.Connection, company_id: int) -> list[str]:
    """Get all tags for a company."""
    rows = conn.execute(
        "SELECT tag FROM company_tags WHERE company_id = ?",
        (company_id,),
    ).fetchall()
    return [r["tag"] for r in rows]


def record_ceo(conn: sqlite3.Connection, company_id: int, ceo_name: str,
               role_start: str | None = None, role_end: str | None = None,
               source: str = "unknown", confidence: str | None = None) -> None:
    """Record CEO tenure (idempotent upsert)."""
    conn.execute(
        """INSERT INTO ceo_history (company_id, ceo_name, role_start, role_end, source, confidence)
           VALUES (?, ?, ?, ?, ?, ?)
           ON CONFLICT(company_id, ceo_name, role_start) DO UPDATE SET
             role_end = COALESCE(excluded.role_end, ceo_history.role_end),
             source = excluded.source, confidence = excluded.confidence""",
        (company_id, ceo_name, role_start, role_end, source, confidence),
    )


def get_current_ceo(conn: sqlite3.Connection, company_id: int) -> dict | None:
    """Get most recent CEO (role_end IS NULL or latest)."""
    row = conn.execute(
        """SELECT * FROM ceo_history WHERE company_id = ? AND role_end IS NULL
           ORDER BY fetched_at DESC LIMIT 1""",
        (company_id,),
    ).fetchone()
    return dict(row) if row else None
```

**Step 2: Verify it loads**

Run: `python -c "import sys; sys.path.insert(0,'.'); from intel.companies.registry import open_registry; print('ok')"`

**Step 3: Commit**

```bash
git add intel/companies/registry.py
git commit -m "feat(intel/companies): registry module with tag + CEO operations"
```

---

### Task 3: Import watchlist YAML into registry with tags

**Files:**
- Create: `intel/companies/import_watchlist.py`

**Step 1: Write the import script**

```python
#!/usr/bin/env python
"""Import companies from watchlist YAML into the unified company registry.

Usage:
    python -m intel.companies.import_watchlist [--watchlist PATH] [--dry-run]
"""

import sys
from pathlib import Path

import click
import yaml
from loguru import logger

_RIVUS = Path(__file__).resolve().parent.parent.parent
if str(_RIVUS) not in sys.path:
    sys.path.insert(0, str(_RIVUS))

from intel.companies.common import init_db
from intel.companies.registry import open_registry, ensure_company, tag_company

DEFAULT_WATCHLIST = _RIVUS / "jobs" / "data" / "companies" / "watchlist.yaml"

# Map YAML sections to tags based on comment markers
# The watchlist has: AI autonomous systems, crypto treasury, darlings, special additions
SECTION_TAG_MAP = {
    "ai": "ai_tech",
    "autonomous": "ai_tech",
    "cloud": "ai_tech",
    "semiconductor": "ai_tech",
    "data": "ai_tech",
    "enterprise": "ai_tech",
    "generative": "ai_tech",
    "health": "ai_tech",
    "robotics": "ai_tech",
    "security": "ai_tech",
    "crypto": "crypto_treasury",
    "bitcoin": "crypto_treasury",
    "darling": "darlings",
    "special": "darlings",
}


def detect_tag(ai_involvement: str, name: str) -> str:
    """Infer tag from ai_involvement field or company name."""
    text = (ai_involvement or "").lower()
    if "bitcoin" in text or "crypto" in text or "treasury" in text:
        return "crypto_treasury"
    if any(k in text for k in ["darling", "booking", "netflix"]):
        return "darlings"
    return "ai_tech"


@click.command()
@click.option("--watchlist", type=click.Path(exists=True), default=str(DEFAULT_WATCHLIST))
@click.option("--dry-run", is_flag=True)
def main(watchlist: str, dry_run: bool):
    """Import watchlist companies into unified registry."""
    init_db()

    with open(watchlist) as f:
        raw = f.read()
        data = yaml.safe_load(raw) or {}

    entries = data.get("companies", [])
    logger.info(f"Loading {len(entries)} companies from {watchlist}")

    if dry_run:
        for e in entries:
            tag = detect_tag(e.get("ai_involvement", ""), e.get("name", ""))
            logger.info(f"  {e.get('ticker', '?'):>6} {e.get('name', '?'):<40} → {tag}")
        return

    conn = open_registry()
    added = 0
    for entry in entries:
        name = entry.get("name", "")
        ticker = entry.get("ticker")
        ai_involvement = entry.get("ai_involvement", "")
        if not name:
            continue

        company_id = ensure_company(conn, name=name, ticker=ticker)
        tag = detect_tag(ai_involvement, name)
        tag_company(conn, company_id, tag, metadata={"ai_involvement": ai_involvement})
        tag_company(conn, company_id, "ceo_quality")  # all watchlist companies are ceo_quality candidates
        added += 1
        logger.debug(f"  {ticker or '?':>6} {name:<40} → {tag} (id={company_id})")

    conn.commit()
    conn.close()
    logger.info(f"Imported {added} companies into registry")


if __name__ == "__main__":
    main()
```

**Step 2: Run the import (dry-run first, then real)**

Run: `python -m intel.companies.import_watchlist --dry-run`
Then: `python -m intel.companies.import_watchlist`

Verify: `sqlite3 intel/companies/data/companies.db "SELECT tag, COUNT(*) FROM company_tags GROUP BY tag"`

**Step 3: Commit**

```bash
git add intel/companies/import_watchlist.py
git commit -m "feat(intel/companies): import watchlist into registry with tags"
```

---

### Task 4: Add `company_registry` discovery strategy

**Files:**
- Modify: `jobs/lib/discovery.py` (add new strategy class)
- Modify: `jobs/jobs.yaml` (switch company_analysis to new strategy)

**Step 1: Add strategy to discovery.py**

After the `CompanyWatchlistDiscovery` class (~line 1073), add:

```python
@register("company_registry")
class CompanyRegistryDiscovery(BaseDiscovery):
    """Discover companies from the unified company registry by tag.

    Params:
        tag: which tag to discover (e.g. 'ceo_quality', 'ai_tech')
        db_path: optional override path to companies.db
    """

    async def fetch_items(self) -> list[dict]:
        import re
        import sqlite3
        from pathlib import Path

        tag = self.params.get("tag", "ceo_quality")
        db_path = self.params.get("db_path", str(Path(__file__).parent.parent.parent / "intel" / "companies" / "data" / "companies.db"))

        if not Path(db_path).exists():
            logger.error(f"company_registry: DB not found: {db_path}")
            return []

        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            """SELECT c.id, c.name, c.ticker, c.symbol, ct.metadata
               FROM companies c
               JOIN company_tags ct ON c.id = ct.company_id
               WHERE ct.tag = ?
               ORDER BY c.name""",
            (tag,),
        ).fetchall()
        conn.close()

        items = []
        for r in rows:
            name = r["name"]
            slug = re.sub(r"[^a-z0-9]+", "_", name.lower()).strip("_")
            items.append({
                "key": slug,
                "data": {
                    "company_name": name,
                    "company_slug": slug,
                    "ticker": r["ticker"] or r["symbol"] or "",
                    "registry_id": r["id"],
                    "priority": 10,
                },
            })
        return items
```

**Step 2: Update jobs.yaml — switch company_analysis discovery**

Change the `company_analysis` discovery section from:

```yaml
    discovery:
      strategy: company_watchlist
      params:
        watchlist: ~/all-code/rivus/jobs/data/companies/watchlist.yaml
```

To:

```yaml
    discovery:
      strategy: company_registry
      params:
        tag: ceo_quality
```

**Step 3: Verify discovery finds items**

Run: `python -c "
import sys, asyncio; sys.path.insert(0,'.')
from jobs.lib.discovery import get_strategy
s = get_strategy({'strategy': 'company_registry', 'params': {'tag': 'ceo_quality'}})
items = asyncio.run(s.fetch_items())
print(f'{len(items)} items')
for i in items[:3]: print(f'  {i[\"key\"]}: {i[\"data\"][\"ticker\"]}')
"`

**Step 4: Commit**

```bash
git add jobs/lib/discovery.py jobs/jobs.yaml
git commit -m "feat(jobs): company_registry discovery strategy reads from unified DB"
```

---

### Task 5: Update `finance/ceo_quality/dataset.py` to read from registry

**Files:**
- Modify: `finance/ceo_quality/dataset.py`

The goal is to make `dataset.py` read company + CEO data from the unified registry instead of maintaining its own separate DB. The `dataset.db` still exists for assessment-specific data (scores, features, returns) but company master data comes from the registry.

**Step 1: Add `--from-registry` option**

Add a new function and CLI option:

```python
def build_from_registry(tag: str = "ceo_quality", db_path: Path | None = None):
    """Import companies from the unified registry into the CEO quality dataset."""
    from intel.companies.registry import open_registry, get_companies_by_tag

    db_path = db_path or DB_PATH
    init_db(db_path)

    conn = open_registry()
    companies = get_companies_by_tag(conn, tag)
    conn.close()

    dataset_conn = sqlite3.connect(str(db_path))
    imported = 0
    for c in companies:
        symbol = c.get("ticker") or c.get("symbol")
        if not symbol:
            continue
        dataset_conn.execute(
            """INSERT INTO companies (symbol, name, sector, industry, market_cap, source)
               VALUES (?, ?, ?, ?, ?, ?)
               ON CONFLICT(symbol) DO UPDATE SET
                 name = COALESCE(excluded.name, companies.name),
                 sector = COALESCE(excluded.sector, companies.sector),
                 industry = COALESCE(excluded.industry, companies.industry),
                 market_cap = COALESCE(excluded.market_cap, companies.market_cap),
                 source = excluded.source, updated_at = datetime('now')""",
            (symbol, c["name"], c.get("sector"), c.get("industry"),
             c.get("market_cap_m"), f"registry:{tag}"),
        )
        imported += 1
    dataset_conn.commit()
    dataset_conn.close()
    logger.info(f"Imported {imported} companies from registry tag '{tag}'")
```

Add CLI option in `main()`:
```python
@click.option("--from-registry", is_flag=True, help="Import from unified company registry")
```

And in the body:
```python
if from_registry:
    build_from_registry()
```

**Step 2: Verify**

Run: `python -m finance.ceo_quality.dataset --from-registry --list`

**Step 3: Commit**

```bash
git add finance/ceo_quality/dataset.py
git commit -m "feat(ceo_quality): --from-registry reads from unified company DB"
```

---

### Task 6: Backfill CEO data from job results into registry

**Files:**
- Modify: `intel/companies/registry.py` (add backfill function)

**Step 1: Add backfill function to registry.py**

```python
def backfill_ceos_from_jobs(db_path: Path | None = None) -> int:
    """Pull CEO names from company_analysis job results into registry.

    Reads jobs.db results table for company_analysis profile stage,
    extracts CEO + ceo_since, and writes to ceo_history.
    Returns count of records written.
    """
    import json

    jobs_db = Path(__file__).parent.parent.parent / "jobs" / "data" / "jobs.db"
    if not jobs_db.exists():
        return 0

    jobs_conn = sqlite3.connect(str(jobs_db))
    jobs_conn.row_factory = sqlite3.Row
    rows = jobs_conn.execute(
        """SELECT item_key, result FROM results
           WHERE job_id = 'company_analysis' AND stage = 'profile'"""
    ).fetchall()
    jobs_conn.close()

    conn = open_registry(db_path)
    updated = 0
    for row in rows:
        try:
            result = json.loads(row["result"])
        except (json.JSONDecodeError, TypeError):
            continue

        ceo = result.get("ceo")
        ticker = result.get("ticker")
        if not ceo or not ticker:
            continue

        # Find company by ticker
        company = conn.execute(
            "SELECT id FROM companies WHERE ticker = ? OR symbol = ?",
            (ticker, ticker),
        ).fetchone()
        if not company:
            continue

        record_ceo(conn, company["id"], ceo,
                   role_start=result.get("ceo_since"),
                   source="company_analysis_job",
                   confidence="web_search_grounded")
        updated += 1

    conn.commit()
    conn.close()
    return updated
```

**Step 2: Verify**

Run: `python -c "import sys; sys.path.insert(0,'.'); from intel.companies.registry import backfill_ceos_from_jobs; print(f'Updated {backfill_ceos_from_jobs()} CEOs')"`

**Step 3: Commit**

```bash
git add intel/companies/registry.py
git commit -m "feat(intel/companies): backfill CEO data from job results into registry"
```

---

### Task 7: Update documentation

**Files:**
- Modify: `intel/companies/CLAUDE.md` — update architecture to reflect consolidation, remove "Future consolidation" section
- Modify: `CLAUDE.md` — add `intel/companies/registry.py` to lib/packages or note in intel section

**Step 1: Update intel/companies/CLAUDE.md**

Replace the "Relationship to jobs/ Company Systems" section to reflect that the registry is now the single source of truth. Remove the "Future consolidation options" since it's done.

**Step 2: Update top-level CLAUDE.md**

In the `intel/` description row, note that `intel/companies/data/companies.db` is the unified company registry with tag-based universes.

**Step 3: Commit**

```bash
git add intel/companies/CLAUDE.md CLAUDE.md
git commit -m "docs: update company registry as single source of truth"
```

---

## Execution Summary

| Task | What | Files |
|------|------|-------|
| 1 | Schema: company_tags + ceo_history tables | `intel/companies/common.py` |
| 2 | Registry module: ensure_company, tag, CEO ops | `intel/companies/registry.py` (new) |
| 3 | Import watchlist YAML → registry with tags | `intel/companies/import_watchlist.py` (new) |
| 4 | company_registry discovery strategy for jobs | `jobs/lib/discovery.py`, `jobs/jobs.yaml` |
| 5 | CEO quality dataset reads from registry | `finance/ceo_quality/dataset.py` |
| 6 | Backfill CEO data from job results | `intel/companies/registry.py` |
| 7 | Documentation updates | `intel/companies/CLAUDE.md`, `CLAUDE.md` |

Total: 7 tasks, ~3 new files, ~4 modified files. Each task is independently committable.
