# LLM Pipelining

Patterns for chaining LLM outputs to other LLMs for real-time analysis, validation, or transformation.

## Use Cases

| Pattern              | Example                                           |
|----------------------|---------------------------------------------------|
| Real-time validation | Fact-checker analyzes claims as they stream       |
| Style enforcement    | Tone analyzer flags deviations mid-generation     |
| Translation          | Translator follows behind source LLM              |
| Code review          | Linter/reviewer analyzes code as it's written     |
| Summarization        | Summarizer produces running summary               |

## Streaming Strategies

### 1. Incremental Full Context

Send all accumulated output with each analysis call.

```python
full_output = ""
async for chunk in producer.stream():
    full_output += chunk
    yield chunk

    if should_analyze(full_output):
        analysis = await analyzer.analyze(full_output)
```

**Pros**: Analyzer can revise earlier assessments, full context
**Cons**: O(n²) token cost, latency grows over time
**Use for**: Fact-checking, coherence analysis, detecting contradictions

### 2. Delta-Only Chunks

Send only new content, no history.

```python
async for chunk in producer.stream():
    yield chunk
    analysis = await analyzer.analyze(chunk)
```

**Pros**: Minimal token cost, constant latency
**Cons**: No broader context, can't detect patterns across output
**Use for**: Per-sentence sentiment, toxicity detection, profanity filter

### 3. Sliding Window

Send recent N tokens plus new content.

```python
full_output = ""
WINDOW = 2000  # chars

async for chunk in producer.stream():
    full_output += chunk
    yield chunk

    if should_analyze(chunk):
        context = full_output[-WINDOW:]
        analysis = await analyzer.analyze(context)
```

**Pros**: Bounded cost, preserves local context
**Cons**: May miss important earlier content
**Use for**: Style consistency, tone monitoring, local coherence

### 4. Semantic Batching

Buffer until natural boundaries (sentences, paragraphs, code blocks).

```python
buffer = ""
async for chunk in producer.stream():
    buffer += chunk
    yield chunk

    if is_boundary(buffer):  # ends with .!? or \n\n
        analysis = await analyzer.analyze(buffer)
        buffer = ""
```

**Pros**: Natural analysis units, meaningful chunks
**Cons**: Variable latency, boundary detection complexity
**Use for**: Summarization, translation, code review

## Cancellation Pattern

Cancel in-flight analysis when new content arrives (debounce).

```python
import asyncio

class CancellingPipeline:
    def __init__(self, analyzer, debounce_ms=500):
        self.analyzer = analyzer
        self.debounce = debounce_ms / 1000
        self.pending_task: asyncio.Task | None = None
        self.full_output = ""

    async def on_chunk(self, chunk: str) -> str | None:
        """Process chunk, return analysis if ready."""
        self.full_output += chunk

        # Cancel any pending analysis
        if self.pending_task and not self.pending_task.done():
            self.pending_task.cancel()

        # Schedule new analysis after debounce
        self.pending_task = asyncio.create_task(
            self._debounced_analyze()
        )
        return None

    async def _debounced_analyze(self) -> str:
        await asyncio.sleep(self.debounce)
        return await self.analyzer.analyze(self.full_output)

    async def flush(self) -> str | None:
        """Get final analysis after stream ends."""
        if self.pending_task:
            self.pending_task.cancel()
        return await self.analyzer.analyze(self.full_output)
```

### With litellm

```python
import asyncio
import litellm

class StreamingAnalyzer:
    def __init__(self, model="openai/gpt-4o-mini", debounce_ms=300):
        self.model = model
        self.debounce = debounce_ms / 1000
        self.pending: asyncio.Task | None = None
        self.latest_analysis: str | None = None

    async def analyze_stream(
        self,
        producer_stream,
        system_prompt: str,
        window_size: int = 2000,
    ):
        """Yield (chunk, analysis) tuples. Analysis may be None."""
        full_output = ""

        async for chunk in producer_stream:
            full_output += chunk

            # Cancel pending analysis
            if self.pending and not self.pending.done():
                self.pending.cancel()
                try:
                    await self.pending
                except asyncio.CancelledError:
                    pass

            # Schedule new analysis
            context = full_output[-window_size:]
            self.pending = asyncio.create_task(
                self._analyze(context, system_prompt)
            )

            yield chunk, self.latest_analysis

        # Final analysis
        if self.pending:
            try:
                await self.pending
            except asyncio.CancelledError:
                pass

        final = await self._analyze(full_output[-window_size:], system_prompt)
        yield "", final

    async def _analyze(self, content: str, system_prompt: str) -> str:
        response = await litellm.acompletion(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": content},
            ],
            max_tokens=200,
        )
        self.latest_analysis = response.choices[0].message.content
        return self.latest_analysis
```

## Complete Example: Fact-Checking Pipeline

```python
import asyncio
import litellm

async def fact_checked_stream(
    prompt: str,
    producer_model: str = "anthropic/claude-sonnet-4-5-20250929",
    checker_model: str = "openai/gpt-4o-mini",
):
    """Stream content with real-time fact checking."""

    full_output = ""
    pending_check: asyncio.Task | None = None
    last_check_result: dict | None = None

    # Start producer stream
    response = await litellm.acompletion(
        model=producer_model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async def check_facts(content: str) -> dict:
        check_response = await litellm.acompletion(
            model=checker_model,
            messages=[
                {"role": "system", "content": (
                    "Analyze for factual claims. Return JSON: "
                    '{"claims": [...], "concerns": [...], "confidence": 0-1}'
                )},
                {"role": "user", "content": content},
            ],
            response_format={"type": "json_object"},
        )
        import json
        return json.loads(check_response.choices[0].message.content)

    async for part in response:
        chunk = part.choices[0].delta.content or ""
        full_output += chunk

        # Cancel and reschedule fact check
        if pending_check and not pending_check.done():
            pending_check.cancel()

        # Check every ~500 chars or sentence boundary
        if len(full_output) % 500 < len(chunk) or chunk.rstrip().endswith(('.', '!', '?')):
            pending_check = asyncio.create_task(check_facts(full_output[-2000:]))

        # Yield with latest check result
        if pending_check and pending_check.done():
            try:
                last_check_result = pending_check.result()
            except (asyncio.CancelledError, Exception):
                pass

        yield {
            "content": chunk,
            "fact_check": last_check_result,
        }

    # Final check
    final_check = await check_facts(full_output)
    yield {
        "content": "",
        "fact_check": final_check,
        "final": True,
    }
```

## Performance Considerations

| Factor            | Recommendation                                     |
|-------------------|----------------------------------------------------|
| Debounce interval | 200-500ms balances responsiveness vs. cost         |
| Window size       | 1500-3000 chars covers most context needs          |
| Analyzer model    | Use fast/cheap (gpt-4o-mini, haiku) for real-time  |
| Batch at          | Sentence boundaries reduce wasted partial analyses |
| Cancel policy     | Always cancel pending on new chunk arrival         |

## When NOT to Pipeline

- **Short outputs**: Just analyze after completion
- **High-stakes validation**: Run full analysis on complete output
- **Cost-sensitive**: Batch analysis is cheaper than streaming
- **Latency-insensitive**: Post-hoc analysis simpler to implement
