# Supply Chain Pipeline: Add validate/enrich/score stages, merge relationship stages

Date: 2026-03-06

## Problem

The supplychain job pipeline has 5 stages (profile, suppliers, customers, competitors, expand) but several important operations run as standalone scripts outside the pipeline:

- `cleanup_garbage.py` -- delete LLM error responses, orphans, bad roles
- `finnhub_screener.py` -- enrich companies with Finnhub ground-truth data
- `quality_score.py` -- score companies on data completeness
- `dedup.py` -- merge duplicate companies

These must be run manually after the pipeline finishes a batch. Garbage entries pile up, companies lack Finnhub ground-truth data, and quality scores are stale.

Additionally, the suppliers/customers/competitors stages are three separate LLM calls that could be one.

## Design

### Pipeline: before and after

**Before** (5 stages, 3 LLM calls for relationships):
```
profile -> suppliers -> customers -> competitors -> expand
```

**After** (6 stages, 1 LLM call for relationships):
```
profile -> relationships -> expand -> validate -> enrich -> score
```

### Stage definitions

| Stage          | Call            | Conc. | What it does                                              |
|----------------|-----------------|-------|-----------------------------------------------------------|
| `profile`      | 1 LLM + web     | 3     | Company description, ticker, roles, segments              |
| `relationships`| 1 LLM + web     | 3     | All suppliers, customers, competitors -- exhaustive       |
| `expand`       | none (DB only)  | 1     | Report stats, identify next-wave candidates               |
| `validate`     | none (code)     | 5     | Garbage names, bad roles, dedup. Reject or clean.         |
| `enrich`       | 1 Finnhub API   | 5     | Ground-truth ticker/market_cap/exchange from Finnhub      |
| `score`        | none (SQL)      | 5     | Quality score 0-100 on data completeness                  |

### Stage details

#### `relationships` (merges suppliers + customers + competitors)

One LLM call returns all three relationship types:

```json
{
  "suppliers": [
    {"name": "...", "ticker": "...", "exchange": "...", "country": "...",
     "is_public": true, "market_cap_m": 1234, "supplies": "...",
     "category": "equipment|materials|EDA|...", "importance": "critical|major|minor",
     "confidence": "high|medium|low", "source_url": "..."}
  ],
  "customers": [
    {"name": "...", "ticker": "...", "exchange": "...", "country": "...",
     "is_public": true, "market_cap_m": 1234, "buys": "...",
     "revenue_significance": "major|moderate|minor",
     "confidence": "...", "source_url": "..."}
  ],
  "competitors": [
    {"name": "...", "ticker": "...", "exchange": "...", "country": "...",
     "is_public": true, "market_cap_m": 1234, "competes_in": "...",
     "relative_position": "leader|challenger|niche|emerging",
     "confidence": "...", "source_url": "..."}
  ]
}
```

No caps on list sizes -- exhaustive coverage is the point. The model lists everything it knows. grok-4-1-fast handles large JSON responses fine.

The prompt combines the existing SUPPLIER_PROMPT, CUSTOMER_PROMPT, and COMPETITOR_PROMPT into one, keeping the same thoroughness instructions.

DB writes use the same `_upsert_company` / `_add_relationship` / `_assign_roles` helpers as today.

#### `validate` (new -- absorbs cleanup_garbage.py + dedup.py logic)

Runs per-company, no external calls. Three checks in order:

1. **Garbage name check** -- pattern match against `GARBAGE_PREFIXES`, `GARBAGE_SUBSTRINGS`, `GARBAGE_EXACT` from cleanup_garbage.py. If matched: delete company from DB, fail the item.

2. **Duplicate check** -- `normalize_name()` the company, compare against all existing companies with same normalized name or same ticker+exchange. If duplicate found: merge into existing company (remap relationships), fail the item with `{"merged_into": existing_id}`.

3. **Role cleanup** -- delete product-type roles (GPU, CPU, FPGA, etc.) from company_roles. This is a fix, not a rejection -- item continues.

Return values:
- `{"status": "pass"}` -- company is clean
- `{"status": "cleaned", "roles_removed": [...]}` -- fixed roles, continue
- `{"status": "rejected", "reason": "garbage_name"}` -- item fails (raise error so runner marks it failed)
- `{"status": "rejected", "reason": "duplicate", "merged_into": id}` -- item fails

The hardcoded `WAVE99_DELETE_NAMES` list stays as a standalone script -- it's a one-time curated blocklist, not a per-item check.

#### `enrich` (new -- absorbs finnhub_screener.py enrich logic)

Per-company Finnhub API lookup:

1. If company has no ticker, skip: `return {"skipped": "no_ticker"}`
2. Call `GET /stock/profile2?symbol={ticker}` with Finnhub API key
3. If Finnhub returns data, update DB:
   - `market_cap_m` (convert to USD using existing `convert_to_usd`)
   - `exchange` (canonical)
   - `country`
   - `website`
   - `description` (Finnhub's `finnhubIndustry`)
4. If Finnhub returns no data, flag: `return {"skipped": "finnhub_no_data", "ticker": ticker}`

Rate limiting handled by job framework pacing + concurrency=5 (well under Finnhub's 60/min free tier).

#### `score` (new -- absorbs quality_score.py logic)

Per-company SQL scoring. Uses the existing `SCORING_SQL` from quality_score.py:

- Has ticker: +20
- Has market cap: +15
- Has country: +5
- Has description: +5
- Has roles: +10
- Has supplier relationships: +15
- Has competitor relationships: +10
- Wave 0 (anchor): +10
- Wave 1: +5
- Georgetown ChipExplorer match: +5

Writes score to `companies.quality_score` column. Returns `{"quality_score": N}`.

### jobs.yaml changes

Both `supplychain_anchors` and `supplychain_expand` get the same updated stage list:

```yaml
stages:
  - name: profile
    concurrency: 3
  - name: relationships
    concurrency: 3
  - name: expand
    concurrency: 1
  - name: validate
    concurrency: 5
  - name: enrich
    concurrency: 5
  - name: score
    concurrency: 5
```

### Handler version bump

All stages get new version strings since the handler is being restructured. Existing items that completed under the old stages will show as stale in the dashboard (expected -- they were never validated/enriched/scored).

### What stays as standalone scripts

These don't fit as per-item stages:

- `cleanup_garbage.py` step 2 (`WAVE99_DELETE_NAMES`) -- curated one-time blocklist
- `dedup.py` full run -- bulk historical dedup across all 9,490 companies
- `finnhub_screener.py` search/screen commands -- discovery tool, not per-item enrichment
- `export_universe.py` -- export/ranking tool, runs when needed

### Migration

Existing items in the pipeline that completed `suppliers`/`customers`/`competitors` but haven't hit the new stages: the runner will see them as needing `validate` → `enrich` → `score` and process them through. No manual reset needed -- the new stages are appended after `expand`.

Items that completed `suppliers` but not `customers`/`competitors` under the old schema: these need a one-time reset. Use `jobctl reset supplychain_anchors relationships` to mark them for reprocessing with the merged stage.
