# Replace Qdrant with sqlite-vec — Implementation Plan

> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

**Goal:** Replace all Qdrant (local-mode) usage with sqlite-vec across rivus, eliminating the exclusive file lock problem and removing the qdrant-client dependency.

**Architecture:** Rewrite `lib/vectors/__init__.py` to use sqlite-vec's `vec0` virtual table instead of `qdrant-client`. Keep the same `VectorStore` public API so all consumers (watch, learning, semanticnet) work with zero changes to their call sites. Filtered search (used by semanticnet) maps to sqlite-vec metadata columns in WHERE clauses.

**Tech Stack:** sqlite-vec (v0.1.6+), Python sqlite3 stdlib, struct (for vector serialization)

---

## Consumer Inventory

| Consumer | DB Path | Collections | Uses Filter? | Notes |
|----------|---------|-------------|-------------|-------|
| `helm/api.py` | `~/.coord/watch_vectors/` | `sessions` (1536d) | No | Session topic embeddings for `/jump` |
| `learning/cli.py` | `~/.coord/learning_vectors/` | `principles`, `learnings` (1536d) | No (post-filter in Python) | Knowledge DB search for `learn find -s` |
| `lib/semnet/store.py` | per-project `data/vectors/` | `chunks`, `claims`, `doc_summaries` (1536d) | Yes — `category`, `direction` on claims | YouTube portal semantic search |
| `projects/vic/run_embed.py` | `projects/vic/data/vectors/` | via SemanticStore | No | VIC ideas embedding |
| `lib/semnet/presenter/data.py` | per-project | via SemanticStore | No | Uses `_to_uuid()` directly |

**Key insight:** Only `lib/vectors/__init__.py` imports `qdrant_client`. All consumers go through `VectorStore`. The one exception is `semanticnet/store.py` which imports `FieldCondition`/`Filter`/`MatchValue` from `qdrant_client.models` for filtered search — this needs to change.

---

### Task 1: Install sqlite-vec + verify extension loading

**Files:**
- Modify: `requirements.txt` or equivalent

**Step 1: Install sqlite-vec**

Run: `pip install sqlite-vec`

**Step 2: Verify extension loads in our Python**

```python
python -c "
import sqlite3, sqlite_vec
db = sqlite3.connect(':memory:')
db.enable_load_extension(True)
sqlite_vec.load(db)
print('vec_version:', db.execute('select vec_version()').fetchone()[0])
print('sqlite_version:', sqlite3.sqlite_version)
"
```

Expected: version string printed, no errors. If macOS system Python blocks extension loading, conda Python (our `python`) should work.

**Step 3: Commit**

```bash
git add -A && git commit -m "chore: install sqlite-vec dependency"
```

---

### Task 2: Rewrite lib/vectors to use sqlite-vec (tests first)

**Files:**
- Modify: `lib/vectors/__init__.py`
- Modify: `lib/vectors/tests/test_vectors.py` (adapt imports only)

**Step 1: Update test imports**

The existing tests import `from lib.vectors import ModelMismatchError, VectorStore` and `Filter`. Remove the `Filter` import from tests (it's not used there). Tests should pass as-is against the new implementation since the API is the same.

**Step 2: Run existing tests to verify they fail (no implementation yet)**

Run: `python -m pytest lib/vectors/tests/test_vectors.py -v`
Expected: Import errors or failures (since qdrant_client will be replaced)

**Step 3: Rewrite `lib/vectors/__init__.py`**

Replace the entire implementation. New design:

```python
"""lib.vectors — sqlite-vec backed vector store for local embedding storage.

Usage:
    from lib.vectors import VectorStore

    vs = VectorStore("/path/to/db.sqlite")
    vs.ensure_collection("sessions", dim=1536, model="text-embedding-3-small")
    vs.upsert("sessions", id="abc", vector=[...], payload={"title": "..."})
    results = vs.search("sessions", query_vector=[...], limit=5)
    vs.close()

Each consumer keeps its own DB file. This library wraps connection + CRUD.
Replaces qdrant-client local mode — no exclusive file lock, WAL mode for
concurrent readers.
"""

import json
import struct
import sqlite3
import uuid
from pathlib import Path

import sqlite_vec
from loguru import logger


class ModelMismatchError(ValueError):
    """Raised when a vector operation uses a different model than the collection was created with."""


_ID_NAMESPACE = uuid.NAMESPACE_URL


def _to_uuid(id_: str) -> str:
    """Convert an arbitrary string ID to a deterministic UUID v5."""
    return str(uuid.uuid5(_ID_NAMESPACE, id_))


def _serialize_f32(vector: list[float]) -> bytes:
    """Serialize float list to compact binary for sqlite-vec."""
    return struct.pack(f"{len(vector)}f", *vector)


class VectorFilter:
    """Replacement for qdrant_client.models.Filter.

    Usage:
        filter_ = VectorFilter(must=[
            FieldCondition(key="category", value="bullish"),
            FieldCondition(key="direction", value="up"),
        ])
    """
    def __init__(self, must: list["FieldCondition"] | None = None):
        self.must = must or []


class FieldCondition:
    """Replacement for qdrant_client.models.FieldCondition."""
    def __init__(self, key: str, value: str):
        self.key = key
        self.value = value


# Back-compat alias
Filter = VectorFilter


class VectorStore:
    """sqlite-vec backed vector store.

    Each collection becomes a vec0 virtual table plus a payload table.
    Schema per collection:
    - vec_{name}: vec0 virtual table (rowid, embedding float[dim] distance_metric=cosine)
    - payload_{name}: (rowid INTEGER PRIMARY KEY, orig_id TEXT UNIQUE, payload_json TEXT)

    We use integer rowid as the bridge between vec0 and payload tables.
    orig_id is the caller's string ID, mapped to rowid via the payload table.
    """

    _ORIG_ID_KEY = "_orig_id"

    def __init__(self, path: str | Path):
        self._path = Path(path).expanduser()
        # For sqlite-vec, path is a single .sqlite file, not a directory
        # If caller passes a directory path (old Qdrant convention), use db.sqlite inside it
        if self._path.suffix != ".sqlite":
            self._path.mkdir(parents=True, exist_ok=True)
            db_file = self._path / "vectors.sqlite"
        else:
            self._path.parent.mkdir(parents=True, exist_ok=True)
            db_file = self._path
        self._db_file = db_file
        self._conn = sqlite3.connect(str(db_file))
        self._conn.execute("PRAGMA journal_mode=WAL")
        self._conn.execute("PRAGMA busy_timeout=5000")
        self._conn.enable_load_extension(True)
        sqlite_vec.load(self._conn)
        self._conn.enable_load_extension(False)

        # Collection metadata (model tracking)
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS _collection_meta (
                name TEXT PRIMARY KEY,
                dim INTEGER NOT NULL,
                model TEXT
            )
        """)
        self._conn.commit()
        self._meta: dict[str, dict] = {}
        for row in self._conn.execute("SELECT name, dim, model FROM _collection_meta"):
            self._meta[row[0]] = {"dim": row[1], "model": row[2]}

    def _check_model(self, collection: str, model: str | None) -> None:
        if model is None:
            return
        recorded = self._meta.get(collection, {}).get("model")
        if recorded and recorded != model:
            raise ModelMismatchError(
                f"Collection '{collection}' uses model '{recorded}', "
                f"but caller passed model='{model}'"
            )

    def ensure_collection(self, name: str, dim: int, *, model: str | None = None) -> None:
        """Create collection tables if they don't exist."""
        # Check model consistency
        if model:
            existing = self._meta.get(name, {}).get("model")
            if existing and existing != model:
                raise ModelMismatchError(
                    f"Collection '{name}' was created with model '{existing}', "
                    f"cannot change to '{model}'"
                )

        # Create vec0 virtual table
        self._conn.execute(f"""
            CREATE VIRTUAL TABLE IF NOT EXISTS vec_{name} USING vec0(
                embedding float[{dim}] distance_metric=cosine
            )
        """)
        # Create payload table
        self._conn.execute(f"""
            CREATE TABLE IF NOT EXISTS payload_{name} (
                rowid INTEGER PRIMARY KEY,
                orig_id TEXT UNIQUE NOT NULL,
                payload_json TEXT DEFAULT '{{}}'
            )
        """)
        # Record metadata
        self._conn.execute("""
            INSERT OR REPLACE INTO _collection_meta (name, dim, model)
            VALUES (?, ?, ?)
        """, (name, dim, model or self._meta.get(name, {}).get("model")))
        self._conn.commit()

        self._meta[name] = {"dim": dim, "model": model or self._meta.get(name, {}).get("model")}

    def _get_or_create_rowid(self, collection: str, orig_id: str) -> int:
        """Get existing rowid for an orig_id, or allocate a new one."""
        row = self._conn.execute(
            f"SELECT rowid FROM payload_{collection} WHERE orig_id = ?",
            (orig_id,),
        ).fetchone()
        if row:
            return row[0]
        cursor = self._conn.execute(
            f"INSERT INTO payload_{collection} (orig_id) VALUES (?)",
            (orig_id,),
        )
        return cursor.lastrowid

    def upsert(self, collection: str, *, id: str, vector: list[float],
               payload: dict | None = None, model: str | None = None) -> None:
        self._check_model(collection, model)
        payload_json = json.dumps(payload) if payload else "{}"
        blob = _serialize_f32(vector)

        # Check if exists
        row = self._conn.execute(
            f"SELECT rowid FROM payload_{collection} WHERE orig_id = ?",
            (id,),
        ).fetchone()

        if row:
            rid = row[0]
            self._conn.execute(
                f"UPDATE vec_{collection} SET embedding = ? WHERE rowid = ?",
                (blob, rid),
            )
            self._conn.execute(
                f"UPDATE payload_{collection} SET payload_json = ? WHERE rowid = ?",
                (payload_json, rid),
            )
        else:
            # Insert into payload table first to get rowid
            cursor = self._conn.execute(
                f"INSERT INTO payload_{collection} (orig_id, payload_json) VALUES (?, ?)",
                (id, payload_json),
            )
            rid = cursor.lastrowid
            self._conn.execute(
                f"INSERT INTO vec_{collection} (rowid, embedding) VALUES (?, ?)",
                (rid, blob),
            )
        self._conn.commit()

    def upsert_batch(self, collection: str, points: list[dict],
                     *, model: str | None = None) -> None:
        self._check_model(collection, model)
        for p in points:
            orig_id = p["id"]
            payload_json = json.dumps(p.get("payload") or {})
            blob = _serialize_f32(p["vector"])

            row = self._conn.execute(
                f"SELECT rowid FROM payload_{collection} WHERE orig_id = ?",
                (orig_id,),
            ).fetchone()

            if row:
                rid = row[0]
                self._conn.execute(
                    f"UPDATE vec_{collection} SET embedding = ? WHERE rowid = ?",
                    (blob, rid),
                )
                self._conn.execute(
                    f"UPDATE payload_{collection} SET payload_json = ? WHERE rowid = ?",
                    (payload_json, rid),
                )
            else:
                cursor = self._conn.execute(
                    f"INSERT INTO payload_{collection} (orig_id, payload_json) VALUES (?, ?)",
                    (orig_id, payload_json),
                )
                rid = cursor.lastrowid
                self._conn.execute(
                    f"INSERT INTO vec_{collection} (rowid, embedding) VALUES (?, ?)",
                    (rid, blob),
                )
        self._conn.commit()

    def search(self, collection: str, query_vector: list[float], *,
               limit: int = 5, filter_: "VectorFilter | None" = None,
               model: str | None = None) -> list[dict]:
        self._check_model(collection, model)
        blob = _serialize_f32(query_vector)

        if filter_ and filter_.must:
            # Post-filter: fetch more candidates, then filter
            # sqlite-vec metadata columns would be better but require schema changes
            # For now, overfetch and filter in Python (matches existing Qdrant behavior)
            overfetch = limit * 10
            rows = self._conn.execute(f"""
                SELECT v.rowid, v.distance, p.orig_id, p.payload_json
                FROM vec_{collection} v
                JOIN payload_{collection} p ON p.rowid = v.rowid
                WHERE v.embedding MATCH ?
                  AND k = ?
            """, (blob, overfetch)).fetchall()

            results = []
            for rid, distance, orig_id, pj in rows:
                payload = json.loads(pj) if pj else {}
                # Check filter conditions
                match = True
                for cond in filter_.must:
                    if payload.get(cond.key) != cond.value:
                        match = False
                        break
                if match:
                    results.append({
                        "id": orig_id,
                        "score": 1.0 - distance,  # cosine distance -> similarity
                        "payload": payload,
                    })
                if len(results) >= limit:
                    break
            return results
        else:
            rows = self._conn.execute(f"""
                SELECT v.rowid, v.distance, p.orig_id, p.payload_json
                FROM vec_{collection} v
                JOIN payload_{collection} p ON p.rowid = v.rowid
                WHERE v.embedding MATCH ?
                  AND k = ?
            """, (blob, limit)).fetchall()

            return [
                {
                    "id": orig_id,
                    "score": 1.0 - distance,  # cosine distance -> similarity
                    "payload": json.loads(pj) if pj else {},
                }
                for _rid, distance, orig_id, pj in rows
            ]

    def delete(self, collection: str, ids: list[str]) -> None:
        for orig_id in ids:
            row = self._conn.execute(
                f"SELECT rowid FROM payload_{collection} WHERE orig_id = ?",
                (orig_id,),
            ).fetchone()
            if row:
                rid = row[0]
                self._conn.execute(f"DELETE FROM vec_{collection} WHERE rowid = ?", (rid,))
                self._conn.execute(f"DELETE FROM payload_{collection} WHERE rowid = ?", (rid,))
        self._conn.commit()

    def count(self, collection: str) -> int:
        try:
            row = self._conn.execute(f"SELECT COUNT(*) FROM payload_{collection}").fetchone()
            return row[0] if row else 0
        except sqlite3.OperationalError:
            return 0

    def close(self) -> None:
        self._conn.close()

    def __enter__(self):
        return self

    def __exit__(self, *exc):
        self.close()


__all__ = ["VectorStore", "VectorFilter", "FieldCondition", "Filter", "ModelMismatchError", "_to_uuid"]
```

**Step 4: Run tests**

Run: `python -m pytest lib/vectors/tests/test_vectors.py -v`
Expected: All 17 tests pass

**Step 5: Commit**

```bash
git add lib/vectors/__init__.py lib/vectors/tests/test_vectors.py
git commit -m "feat: replace Qdrant with sqlite-vec in lib/vectors"
```

---

### Task 3: Update semanticnet filtered search

**Files:**
- Modify: `lib/semnet/store.py` (lines 152-170)

**Step 1: Replace qdrant imports in search_claims**

Change:
```python
from qdrant_client.models import FieldCondition, Filter, MatchValue

conditions = []
if category:
    conditions.append(FieldCondition(key="category", match=MatchValue(value=category)))
if direction:
    conditions.append(FieldCondition(key="direction", match=MatchValue(value=direction)))

filter_ = Filter(must=conditions) if conditions else None
return self._vs.search(COL_CLAIMS, query_vector, limit=limit, filter_=filter_, model=EMBED_MODEL)
```

To:
```python
from lib.vectors import FieldCondition, VectorFilter

conditions = []
if category:
    conditions.append(FieldCondition(key="category", value=category))
if direction:
    conditions.append(FieldCondition(key="direction", value=direction))

filter_ = VectorFilter(must=conditions) if conditions else None
return self._vs.search(COL_CLAIMS, query_vector, limit=limit, filter_=filter_, model=EMBED_MODEL)
```

**Step 2: Run semanticnet tests**

Run: `python -m pytest lib/semnet/tests/ -v`
Expected: All pass

**Step 3: Commit**

```bash
git add lib/semnet/store.py
git commit -m "refactor: use lib.vectors filter classes instead of qdrant_client imports"
```

---

### Task 4: Update learning/cli.py — rename Qdrant references

**Files:**
- Modify: `learning/cli.py`

**Step 1: Rename variable and comments**

All the Qdrant-specific naming (`_qdrant_store`, `_qdrant_upsert_one`, `_semantic_search_qdrant`, `QDRANT_PATH`, `qdrant_vs`, `qdrant_meta`, etc.) should be renamed to generic vector store names. The actual `VectorStore` import and API calls don't change — they're already using the `lib.vectors` wrapper.

Key renames:
- `QDRANT_PATH` → `VECTOR_DB_PATH`
- `_qdrant_store()` → `_vector_store()`
- `_qdrant_upsert_one()` → `_vector_upsert_one()`
- `_semantic_search_qdrant()` → `_semantic_search_vectors()`
- `qdrant_vs` → `vec_store`
- `qdrant_meta` → `vec_meta`
- `qdrant_total` → `vec_total`
- `qdrant_points_p` → `vec_points_p`
- `qdrant_points_l` → `vec_points_l`
- All comments referencing "Qdrant" → "vector store" or "sqlite-vec"

The `from lib.vectors import VectorStore` import stays the same. No API changes needed.

**Step 2: Run learning tests**

Run: `python -m pytest learning/ -v --ignore=learning/gyms --ignore=learning/session_review -x`
Expected: Pass (or skip if no relevant tests)

**Step 3: Commit**

```bash
git add learning/cli.py
git commit -m "refactor: rename Qdrant references to generic vector store names in learning CLI"
```

---

### Task 5: Update helm/api.py — remove Qdrant references

**Files:**
- Modify: `helm/api.py`

**Step 1: Update comments and variable names**

The Helm API already uses `VectorStore` from `lib.vectors`. Just update:
- Comments saying "Qdrant" → "sqlite-vec" or "vector store"
- The `_upsert_session_embedding` docstring references "Qdrant vector store"
- The search endpoint comments

No API or import changes needed — it's all going through `lib.vectors.VectorStore`.

**Step 2: Commit**

```bash
git add helm/api.py
git commit -m "docs: update Helm API comments from Qdrant to sqlite-vec"
```

---

### Task 6: Update projects/vic/run_embed.py comments

**Files:**
- Modify: `projects/vic/run_embed.py` (comments only)

**Step 1: Update Qdrant references in comments**

Change "Qdrant scroll" and similar references to "vector store".

**Step 2: Commit**

```bash
git add projects/vic/run_embed.py
git commit -m "docs: update vic embed comments from Qdrant to sqlite-vec"
```

---

### Task 7: Remove qdrant-client dependency

**Files:**
- Check: `requirements.txt`, `pyproject.toml`, or equivalent

**Step 1: Find dependency declaration**

```bash
grep -r "qdrant" requirements*.txt pyproject.toml setup.py setup.cfg 2>/dev/null
```

**Step 2: Remove qdrant-client**

Remove `qdrant-client` from the dependency list. Add `sqlite-vec` if not already listed.

**Step 3: Verify no remaining qdrant imports**

```bash
grep -r "from qdrant" --include="*.py" .
grep -r "import qdrant" --include="*.py" .
```

Expected: No matches (only in docs/plans or comments)

**Step 4: Commit**

```bash
git add -A && git commit -m "chore: remove qdrant-client dependency, add sqlite-vec"
```

---

### Task 8: Update documentation

**Files:**
- Modify: `infra/README.md` (Qdrant section → sqlite-vec)
- Modify: `lib/CLAUDE.md` (if it references Qdrant)
- Modify: `docs/plans/2026-02-22-session-overview-jump-design.md` (references Qdrant)

**Step 1: Update infra/README.md**

Replace the "Qdrant (Local Vector Search)" section with:

```markdown
## sqlite-vec (Local Vector Search)

Vector storage for semantic search, using `sqlite-vec` extension for SQLite.
No server process, no exclusive file locks — WAL mode allows concurrent readers.

**Library**: `lib/vectors/` — thin wrapper around sqlite-vec `vec0` virtual tables
**Package**: `sqlite-vec` (v0.1.6+), installed via pip
**Storage**: SQLite WAL-mode database, one DB file per consumer

| Path | Consumer | Collections | Purpose |
|------|----------|-------------|---------|
| `~/.coord/watch_vectors/vectors.sqlite` | `helm/` | `sessions` | Session topic trees + badges (for `/jump`) |
| `~/.coord/learning_vectors/vectors.sqlite` | `learning/` | `learnings`, `principles` | Knowledge DB items (for `learn find -s`) |

**Usage** (same API as before):
```python
from lib.vectors import VectorStore

with VectorStore("~/.coord/watch_vectors") as vs:
    vs.ensure_collection("sessions", dim=1536)
    vs.upsert("sessions", id="abc", vector=[...], payload={...})
    results = vs.search("sessions", query_vector=[...], limit=5)
```
```

**Step 2: Update other docs**

Update any remaining references to Qdrant in CLAUDE.md files and design docs.

**Step 3: Commit**

```bash
git add -A && git commit -m "docs: update all references from Qdrant to sqlite-vec"
```

---

### Task 9: Data migration — re-embed existing data

**Files:** None (operational task)

Existing Qdrant data directories (`~/.coord/watch_vectors/`, `~/.coord/learning_vectors/`) contain Qdrant's internal format (RocksDB). The new sqlite-vec store will create `vectors.sqlite` inside the same directory paths.

**Step 1: Verify old Qdrant data doesn't conflict**

The old Qdrant data is in subdirectories like `collection/`, `meta.json`, `storage/`. The new sqlite-vec file is `vectors.sqlite`. They don't conflict — both can coexist.

**Step 2: Re-populate watch vectors**

Watch vectors are re-generated on each prompt (the watch API embeds session trees on every `POST /api/prompt`). Send a few prompts in active sessions and they'll auto-populate.

**Step 3: Re-populate learning vectors**

```bash
cd ~/all-code/rivus && python -m learning.cli embed --all
```

This will re-embed all principles and learning instances into the new sqlite-vec store.

**Step 4: Re-populate semanticnet vectors (if needed)**

```bash
cd ~/all-code/rivus && python projects/vic/run_embed.py
# Or for other channels:
python -m lib.semnet.pipeline embed --channel healthygamer
```

**Step 5: Verify**

```bash
python -c "
from lib.vectors import VectorStore
for path, name in [
    ('~/.coord/watch_vectors', 'sessions'),
    ('~/.coord/learning_vectors', 'principles'),
    ('~/.coord/learning_vectors', 'learnings'),
]:
    try:
        vs = VectorStore(path)
        print(f'{path}/{name}: {vs.count(name)} vectors')
        vs.close()
    except Exception as e:
        print(f'{path}/{name}: {e}')
"
```

**Step 6: Clean up old Qdrant data (optional, after verification)**

```bash
# Only after confirming new data is populated:
# trash ~/.coord/watch_vectors/collection ~/.coord/watch_vectors/meta.json
# trash ~/.coord/learning_vectors/collection ~/.coord/learning_vectors/meta.json
```

---

### Task 10: Run full test suite

**Step 1: Run all vector-related tests**

```bash
python -m pytest lib/vectors/tests/ lib/semnet/tests/ -v
```

Expected: All pass

**Step 2: Run broader tests**

```bash
python -m pytest lib/ -v --ignore=lib/semnet/presenter -x
```

**Step 3: Verify watch API starts**

```bash
ops restart watch
curl -s http://localhost:8130/health | python -m json.tool
```

**Step 4: Verify /jump works**

```bash
/jump test query
```

**Step 5: Verify learning search**

```bash
learn find -s "vector store" --limit 3
```

**Step 6: Final commit if any fixes needed**

```bash
git add -A && git commit -m "fix: address test failures from sqlite-vec migration"
```

---

## Summary of Changes

| File | Change |
|------|--------|
| `lib/vectors/__init__.py` | Full rewrite: qdrant-client → sqlite-vec |
| `lib/vectors/tests/test_vectors.py` | Minimal import cleanup |
| `lib/semnet/store.py` | Replace qdrant_client.models imports with lib.vectors filter classes |
| `learning/cli.py` | Rename `_qdrant_*` variables/functions to generic names |
| `helm/api.py` | Update comments only |
| `projects/vic/run_embed.py` | Update comments only |
| `infra/README.md` | Replace Qdrant section with sqlite-vec |
| Dependencies | Remove `qdrant-client`, add `sqlite-vec` |

## Why This Works

- **No exclusive lock**: sqlite-vec uses SQLite WAL mode — 1 writer + unlimited concurrent readers
- **Same API**: VectorStore class keeps identical method signatures
- **Simpler deps**: sqlite-vec is a single pip package, no RocksDB, no portalocker
- **Fast enough**: vec0 handles our scale (hundreds to low thousands of vectors per collection) easily
- **Filtered search**: Post-filter in Python (same as Qdrant was doing internally at our scale)
