Chapter 5: Practical LLM Applications

Building Production Systems with RAG, Agents, and Multimodal AI

Reading Time: 40-45 minutes Difficulty: Advanced Last Updated: 2026-01
Disclaimer: This chapter covers practical LLM application patterns as of early 2026. The ecosystem evolves rapidly; always verify current best practices and API specifications with official documentation.

1. Retrieval-Augmented Generation (RAG)

RAG enhances LLMs with external knowledge, enabling accurate responses about private data, recent information, and domain-specific content. It addresses the fundamental limitation of LLMs: static training data that becomes outdated.

1.1 RAG Architecture

flowchart LR subgraph "Indexing Pipeline" D[Documents] --> C[Chunking] C --> E[Embedding] E --> V[(Vector DB)] end subgraph "Query Pipeline" Q[User Query] --> QE[Query Embedding] QE --> R[Retrieval] V --> R R --> Context[Retrieved Context] Context --> LLM[LLM Generation] Q --> LLM LLM --> A[Answer] end style V fill:#e3f2fd style LLM fill:#fff3e0

1.2 Building a RAG System

from typing import List, Optional
import numpy as np
from dataclasses import dataclass

@dataclass
class Document:
    """Represents a document chunk with metadata."""
    content: str
    metadata: dict
    embedding: Optional[np.ndarray] = None

class RAGSystem:
    """
    Production-ready RAG implementation with modern best practices.
    """

    def __init__(
        self,
        embedding_model: str = "text-embedding-3-large",
        llm_model: str = "gpt-4o",
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        top_k: int = 5
    ):
        self.embedding_model = embedding_model
        self.llm_model = llm_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.top_k = top_k
        self.documents: List[Document] = []

    def chunk_document(self, text: str, metadata: dict) -> List[Document]:
        """
        Split document into overlapping chunks.

        Uses semantic chunking boundaries when possible:
        - Paragraph breaks
        - Section headers
        - Sentence boundaries
        """
        chunks = []

        # Simple overlap-based chunking (production: use semantic chunking)
        start = 0
        while start < len(text):
            end = start + self.chunk_size

            # Find natural break point
            if end < len(text):
                # Look for paragraph break
                para_break = text.rfind('\n\n', start, end)
                if para_break > start + self.chunk_size // 2:
                    end = para_break

                # Or sentence boundary
                elif (period := text.rfind('. ', start, end)) > start + self.chunk_size // 2:
                    end = period + 1

            chunk_text = text[start:end].strip()
            if chunk_text:
                chunks.append(Document(
                    content=chunk_text,
                    metadata={**metadata, "chunk_start": start, "chunk_end": end}
                ))

            start = end - self.chunk_overlap

        return chunks

    def embed_documents(self, documents: List[Document]) -> List[Document]:
        """Generate embeddings for documents."""
        from openai import OpenAI
        client = OpenAI()

        texts = [doc.content for doc in documents]

        # Batch embedding for efficiency
        response = client.embeddings.create(
            model=self.embedding_model,
            input=texts
        )

        for doc, embedding_data in zip(documents, response.data):
            doc.embedding = np.array(embedding_data.embedding)

        return documents

    def index(self, documents: List[Document]):
        """Add documents to the index."""
        embedded_docs = self.embed_documents(documents)
        self.documents.extend(embedded_docs)

    def retrieve(
        self,
        query: str,
        top_k: Optional[int] = None
    ) -> List[Document]:
        """
        Retrieve relevant documents using cosine similarity.
        """
        from openai import OpenAI
        client = OpenAI()

        top_k = top_k or self.top_k

        # Embed query
        response = client.embeddings.create(
            model=self.embedding_model,
            input=[query]
        )
        query_embedding = np.array(response.data[0].embedding)

        # Calculate similarities
        similarities = []
        for doc in self.documents:
            if doc.embedding is not None:
                sim = np.dot(query_embedding, doc.embedding) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(doc.embedding)
                )
                similarities.append((sim, doc))

        # Sort and return top-k
        similarities.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in similarities[:top_k]]

    def generate(
        self,
        query: str,
        context_documents: List[Document],
        system_prompt: Optional[str] = None
    ) -> str:
        """
        Generate answer using retrieved context.
        """
        from openai import OpenAI
        client = OpenAI()

        # Build context string
        context = "\n\n---\n\n".join([
            f"[Source: {doc.metadata.get('source', 'unknown')}]\n{doc.content}"
            for doc in context_documents
        ])

        system = system_prompt or """You are a helpful assistant that answers questions based on the provided context.
If the answer cannot be found in the context, say so clearly.
Always cite the sources used in your answer."""

        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": f"""Context:
{context}

Question: {query}

Please answer based on the context provided above."""}
        ]

        response = client.chat.completions.create(
            model=self.llm_model,
            messages=messages,
            temperature=0.7
        )

        return response.choices[0].message.content

    def query(self, question: str) -> dict:
        """
        End-to-end RAG query pipeline.
        """
        # Retrieve relevant documents
        retrieved_docs = self.retrieve(question)

        # Generate answer
        answer = self.generate(question, retrieved_docs)

        return {
            "question": question,
            "answer": answer,
            "sources": [
                {"content": doc.content[:200] + "...", "metadata": doc.metadata}
                for doc in retrieved_docs
            ]
        }

# Usage example
rag = RAGSystem(
    embedding_model="text-embedding-3-large",
    llm_model="gpt-4o",
    chunk_size=512,
    top_k=5
)

# Index documents
documents = [
    Document(content="Your document text here...", metadata={"source": "doc1.pdf"}),
    # ... more documents
]
rag.index(documents)

# Query
result = rag.query("What are the key findings?")
print(result["answer"])

1.3 Advanced RAG Techniques

RAG Evolution (2024-2026)

Technique Description Use Case
Naive RAG Basic retrieve-then-generate Simple Q&A
Hybrid Search Vector + keyword (BM25) fusion Mixed queries
HyDE Hypothetical Document Embeddings Sparse data
Reranking Cross-encoder reranking of results High precision needs
Agentic RAG Multi-step retrieval with reasoning Complex queries
GraphRAG Knowledge graph + vector retrieval Relationship queries
class AdvancedRAG(RAGSystem):
    """
    Advanced RAG with hybrid search and reranking.
    """

    def __init__(self, *args, reranker_model: str = "BAAI/bge-reranker-v2-m3", **kwargs):
        super().__init__(*args, **kwargs)
        self.reranker_model = reranker_model

    def hybrid_retrieve(
        self,
        query: str,
        top_k: int = 20,
        alpha: float = 0.7
    ) -> List[Document]:
        """
        Hybrid retrieval combining dense and sparse search.

        alpha: Weight for dense search (1-alpha for BM25)
        """
        # Dense retrieval (vector similarity)
        dense_results = self.retrieve(query, top_k=top_k)

        # Sparse retrieval (BM25)
        sparse_results = self._bm25_retrieve(query, top_k=top_k)

        # Reciprocal Rank Fusion
        fused_scores = {}

        for rank, doc in enumerate(dense_results):
            doc_id = id(doc)
            fused_scores[doc_id] = fused_scores.get(doc_id, 0) + alpha / (rank + 60)

        for rank, doc in enumerate(sparse_results):
            doc_id = id(doc)
            fused_scores[doc_id] = fused_scores.get(doc_id, 0) + (1 - alpha) / (rank + 60)

        # Create doc lookup and sort
        all_docs = {id(doc): doc for doc in dense_results + sparse_results}
        sorted_ids = sorted(fused_scores.keys(), key=lambda x: fused_scores[x], reverse=True)

        return [all_docs[doc_id] for doc_id in sorted_ids[:top_k]]

    def _bm25_retrieve(self, query: str, top_k: int) -> List[Document]:
        """BM25 sparse retrieval."""
        from rank_bm25 import BM25Okapi

        # Tokenize documents
        tokenized_docs = [doc.content.lower().split() for doc in self.documents]
        bm25 = BM25Okapi(tokenized_docs)

        # Score and rank
        query_tokens = query.lower().split()
        scores = bm25.get_scores(query_tokens)

        ranked_indices = np.argsort(scores)[::-1][:top_k]
        return [self.documents[i] for i in ranked_indices]

    def rerank(
        self,
        query: str,
        documents: List[Document],
        top_k: int = 5
    ) -> List[Document]:
        """
        Rerank documents using cross-encoder.

        Cross-encoders are more accurate but slower than bi-encoders.
        Use for final reranking of retrieved candidates.
        """
        from sentence_transformers import CrossEncoder

        reranker = CrossEncoder(self.reranker_model)

        # Score each document
        pairs = [[query, doc.content] for doc in documents]
        scores = reranker.predict(pairs)

        # Sort by score
        ranked = sorted(zip(scores, documents), key=lambda x: x[0], reverse=True)
        return [doc for _, doc in ranked[:top_k]]

    def query_with_hyde(self, question: str) -> dict:
        """
        HyDE: Hypothetical Document Embeddings.

        Generate a hypothetical answer first, then use it for retrieval.
        Effective when documents don't match query style.
        """
        from openai import OpenAI
        client = OpenAI()

        # Generate hypothetical document
        hyde_prompt = f"""Write a detailed passage that would answer this question:
{question}

Write as if you are quoting from a document that contains the answer."""

        hyde_response = client.chat.completions.create(
            model=self.llm_model,
            messages=[{"role": "user", "content": hyde_prompt}],
            temperature=0.7
        )

        hypothetical_doc = hyde_response.choices[0].message.content

        # Retrieve using hypothetical document
        retrieved = self.retrieve(hypothetical_doc)

        # Rerank with original query
        reranked = self.rerank(question, retrieved)

        # Generate final answer
        answer = self.generate(question, reranked)

        return {
            "question": question,
            "hypothetical_doc": hypothetical_doc,
            "answer": answer,
            "sources": [{"content": doc.content[:200]} for doc in reranked]
        }

2. Model Context Protocol (MCP)

MCP is an open standard developed by Anthropic for connecting LLMs to external tools and data sources. It provides a unified protocol that allows AI models to access capabilities beyond their training data through a standardized interface.

2.1 MCP Architecture

flowchart TB subgraph "MCP Host (Client)" App[Application] Client[MCP Client] App --> Client end subgraph "MCP Server" Server[MCP Server] subgraph "Capabilities" T[Tools] R[Resources] P[Prompts] end Server --> T Server --> R Server --> P end Client <-->|JSON-RPC| Server subgraph "External Systems" DB[(Database)] API[APIs] FS[File System] end T --> DB T --> API R --> FS

MCP Core Concepts

2.2 Building an MCP Server

"""
MCP Server Implementation Example

This server provides tools for file operations and web search,
demonstrating the MCP protocol patterns.
"""

from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.types import (
    Tool, TextContent, ImageContent,
    Resource, ResourceTemplate,
    Prompt, PromptMessage, PromptArgument
)
import mcp.server.stdio
import asyncio
import httpx
from pathlib import Path

# Create server instance
server = Server("example-mcp-server")

# ========== TOOLS ==========

@server.list_tools()
async def list_tools() -> list[Tool]:
    """Define available tools."""
    return [
        Tool(
            name="read_file",
            description="Read contents of a file from the filesystem",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "Path to the file to read"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="write_file",
            description="Write content to a file",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path"},
                    "content": {"type": "string", "description": "Content to write"}
                },
                "required": ["path", "content"]
            }
        ),
        Tool(
            name="web_search",
            description="Search the web for information",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query"
                    },
                    "num_results": {
                        "type": "integer",
                        "description": "Number of results (default: 5)",
                        "default": 5
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="execute_sql",
            description="Execute a SQL query against the database",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "SQL query"},
                    "params": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Query parameters"
                    }
                },
                "required": ["query"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent]:
    """Handle tool execution."""

    if name == "read_file":
        path = Path(arguments["path"])
        if not path.exists():
            return [TextContent(type="text", text=f"Error: File not found: {path}")]
        if not path.is_file():
            return [TextContent(type="text", text=f"Error: Not a file: {path}")]

        content = path.read_text(encoding="utf-8")
        return [TextContent(type="text", text=content)]

    elif name == "write_file":
        path = Path(arguments["path"])
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(arguments["content"], encoding="utf-8")
        return [TextContent(type="text", text=f"Successfully wrote to {path}")]

    elif name == "web_search":
        # Example using a search API
        query = arguments["query"]
        num_results = arguments.get("num_results", 5)

        async with httpx.AsyncClient() as client:
            # Replace with actual search API
            response = await client.get(
                "https://api.search.example.com/search",
                params={"q": query, "limit": num_results}
            )
            results = response.json()

        formatted = "\n\n".join([
            f"**{r['title']}**\n{r['snippet']}\nURL: {r['url']}"
            for r in results.get("results", [])
        ])
        return [TextContent(type="text", text=formatted)]

    elif name == "execute_sql":
        # Example SQL execution (use with caution!)
        import sqlite3
        conn = sqlite3.connect("database.db")
        cursor = conn.cursor()

        try:
            params = arguments.get("params", [])
            cursor.execute(arguments["query"], params)

            if arguments["query"].strip().upper().startswith("SELECT"):
                rows = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]
                result = [dict(zip(columns, row)) for row in rows]
                return [TextContent(type="text", text=str(result))]
            else:
                conn.commit()
                return [TextContent(type="text", text=f"Query executed. Rows affected: {cursor.rowcount}")]
        finally:
            conn.close()

    return [TextContent(type="text", text=f"Unknown tool: {name}")]

# ========== RESOURCES ==========

@server.list_resources()
async def list_resources() -> list[Resource]:
    """List available resources."""
    return [
        Resource(
            uri="file:///config/settings.json",
            name="Application Settings",
            description="Current application configuration",
            mimeType="application/json"
        ),
        Resource(
            uri="db://users/schema",
            name="Users Table Schema",
            description="Schema definition for the users table",
            mimeType="text/plain"
        )
    ]

@server.read_resource()
async def read_resource(uri: str) -> str:
    """Read a specific resource."""
    if uri == "file:///config/settings.json":
        return Path("config/settings.json").read_text()
    elif uri == "db://users/schema":
        return """
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username VARCHAR(100) NOT NULL,
            email VARCHAR(255) UNIQUE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
    raise ValueError(f"Unknown resource: {uri}")

# ========== PROMPTS ==========

@server.list_prompts()
async def list_prompts() -> list[Prompt]:
    """List available prompt templates."""
    return [
        Prompt(
            name="code_review",
            description="Review code for issues and improvements",
            arguments=[
                PromptArgument(
                    name="code",
                    description="The code to review",
                    required=True
                ),
                PromptArgument(
                    name="language",
                    description="Programming language",
                    required=False
                )
            ]
        ),
        Prompt(
            name="explain_error",
            description="Explain an error message and suggest fixes",
            arguments=[
                PromptArgument(
                    name="error",
                    description="The error message",
                    required=True
                )
            ]
        )
    ]

@server.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    """Get a specific prompt with arguments filled in."""

    if name == "code_review":
        language = arguments.get("language", "unknown")
        return [
            PromptMessage(
                role="user",
                content=TextContent(
                    type="text",
                    text=f"""Please review this {language} code:

```{language}
{arguments['code']}
```

Analyze for:
1. Bugs and potential issues
2. Performance concerns
3. Security vulnerabilities
4. Code style and readability
5. Suggested improvements"""
                )
            )
        ]

    elif name == "explain_error":
        return [
            PromptMessage(
                role="user",
                content=TextContent(
                    type="text",
                    text=f"""I encountered this error:

{arguments['error']}

Please:
1. Explain what this error means
2. Identify the likely cause
3. Suggest how to fix it
4. Provide example code if helpful"""
                )
            )
        ]

    raise ValueError(f"Unknown prompt: {name}")

# ========== SERVER STARTUP ==========

async def main():
    """Run the MCP server."""
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="example-mcp-server",
                server_version="1.0.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={}
                )
            )
        )

if __name__ == "__main__":
    asyncio.run(main())

2.3 MCP Client Integration

"""
Using MCP with Claude and other LLM clients.
"""

import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def use_mcp_with_claude():
    """
    Example: Using MCP tools with Claude API.
    """
    # Connect to MCP server
    server_params = StdioServerParameters(
        command="python",
        args=["mcp_server.py"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize session
            await session.initialize()

            # Get available tools
            tools_response = await session.list_tools()
            tools = tools_response.tools

            # Convert MCP tools to Claude format
            claude_tools = [
                {
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema
                }
                for tool in tools
            ]

            # Use with Claude
            client = anthropic.Anthropic()

            messages = [
                {"role": "user", "content": "Search the web for recent news about AI agents"}
            ]

            response = client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                tools=claude_tools,
                messages=messages
            )

            # Handle tool use
            while response.stop_reason == "tool_use":
                tool_use_block = next(
                    b for b in response.content if b.type == "tool_use"
                )

                # Execute tool via MCP
                tool_result = await session.call_tool(
                    tool_use_block.name,
                    tool_use_block.input
                )

                # Continue conversation
                messages.append({"role": "assistant", "content": response.content})
                messages.append({
                    "role": "user",
                    "content": [{
                        "type": "tool_result",
                        "tool_use_id": tool_use_block.id,
                        "content": tool_result.content[0].text
                    }]
                })

                response = client.messages.create(
                    model="claude-sonnet-4-20250514",
                    max_tokens=4096,
                    tools=claude_tools,
                    messages=messages
                )

            # Final response
            return response.content[0].text

3. AI Agents

AI Agents are LLM-powered systems that can reason, plan, and take actions to accomplish goals. They combine the reasoning capabilities of LLMs with tool use, memory, and multi-step execution.

3.1 Agent Architecture

flowchart TB subgraph "Agent Core" LLM[LLM Brain] P[Planner] E[Executor] R[Reasoner] end subgraph "Memory" WM[Working Memory] LTM[Long-term Memory] EM[Episodic Memory] end subgraph "Tools" T1[Web Search] T2[Code Execution] T3[File Operations] T4[API Calls] end User[User Goal] --> P P --> LLM LLM --> R R --> E E --> T1 & T2 & T3 & T4 E --> WM WM --> LTM LTM --> LLM EM --> LLM

3.2 ReAct Agent Pattern

"""
ReAct Agent: Reasoning + Acting

The ReAct pattern interleaves reasoning traces with actions,
allowing the model to plan, act, and observe iteratively.
"""

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
import json

@dataclass
class AgentState:
    """Agent execution state."""
    goal: str
    history: List[Dict[str, Any]] = field(default_factory=list)
    observations: List[str] = field(default_factory=list)
    final_answer: Optional[str] = None
    iterations: int = 0
    max_iterations: int = 10

class ReActAgent:
    """
    ReAct Agent implementation.

    Pattern: Thought -> Action -> Observation -> Repeat
    """

    def __init__(
        self,
        llm_client,
        tools: Dict[str, callable],
        model: str = "claude-sonnet-4-20250514",
        verbose: bool = True
    ):
        self.llm = llm_client
        self.tools = tools
        self.model = model
        self.verbose = verbose

        self.system_prompt = """You are a helpful AI agent that solves problems step by step.

For each step, you must output in this exact format:
Thought: 
Action: 
Action Input: 

OR if you have the final answer:
Thought: 
Final Answer: 

Available tools:
{tool_descriptions}

Rules:
1. Always think before acting
2. Use observations to inform next steps
3. If a tool fails, try a different approach
4. Provide a Final Answer when you have enough information"""

    def _format_tool_descriptions(self) -> str:
        """Format tool descriptions for the prompt."""
        descriptions = []
        for name, tool in self.tools.items():
            desc = getattr(tool, '__doc__', 'No description')
            descriptions.append(f"- {name}: {desc}")
        return "\n".join(descriptions)

    def _parse_response(self, response: str) -> Dict[str, Any]:
        """Parse agent response into structured format."""
        lines = response.strip().split('\n')
        result = {}

        current_key = None
        current_value = []

        for line in lines:
            if line.startswith('Thought:'):
                if current_key:
                    result[current_key] = '\n'.join(current_value).strip()
                current_key = 'thought'
                current_value = [line[8:].strip()]
            elif line.startswith('Action:'):
                if current_key:
                    result[current_key] = '\n'.join(current_value).strip()
                current_key = 'action'
                current_value = [line[7:].strip()]
            elif line.startswith('Action Input:'):
                if current_key:
                    result[current_key] = '\n'.join(current_value).strip()
                current_key = 'action_input'
                current_value = [line[13:].strip()]
            elif line.startswith('Final Answer:'):
                if current_key:
                    result[current_key] = '\n'.join(current_value).strip()
                current_key = 'final_answer'
                current_value = [line[13:].strip()]
            else:
                current_value.append(line)

        if current_key:
            result[current_key] = '\n'.join(current_value).strip()

        return result

    def _execute_tool(self, tool_name: str, tool_input: str) -> str:
        """Execute a tool and return observation."""
        if tool_name not in self.tools:
            return f"Error: Unknown tool '{tool_name}'"

        try:
            # Parse JSON input
            if tool_input.strip():
                input_dict = json.loads(tool_input)
            else:
                input_dict = {}

            # Execute tool
            result = self.tools[tool_name](**input_dict)
            return str(result)

        except json.JSONDecodeError as e:
            return f"Error: Invalid JSON input - {e}"
        except Exception as e:
            return f"Error executing {tool_name}: {e}"

    async def run(self, goal: str) -> str:
        """
        Run the agent to accomplish a goal.
        """
        state = AgentState(goal=goal)

        # Format system prompt with tools
        system = self.system_prompt.format(
            tool_descriptions=self._format_tool_descriptions()
        )

        messages = [
            {"role": "user", "content": f"Goal: {goal}"}
        ]

        while state.iterations < state.max_iterations:
            state.iterations += 1

            if self.verbose:
                print(f"\n--- Iteration {state.iterations} ---")

            # Get agent response
            response = self.llm.messages.create(
                model=self.model,
                system=system,
                max_tokens=2048,
                messages=messages
            )

            agent_response = response.content[0].text
            parsed = self._parse_response(agent_response)

            if self.verbose:
                print(f"Thought: {parsed.get('thought', 'N/A')}")

            # Check for final answer
            if 'final_answer' in parsed:
                state.final_answer = parsed['final_answer']
                if self.verbose:
                    print(f"Final Answer: {state.final_answer}")
                return state.final_answer

            # Execute action
            if 'action' in parsed:
                action = parsed['action']
                action_input = parsed.get('action_input', '{}')

                if self.verbose:
                    print(f"Action: {action}")
                    print(f"Action Input: {action_input}")

                observation = self._execute_tool(action, action_input)
                state.observations.append(observation)

                if self.verbose:
                    print(f"Observation: {observation[:500]}...")

                # Add to conversation
                messages.append({"role": "assistant", "content": agent_response})
                messages.append({"role": "user", "content": f"Observation: {observation}"})

            state.history.append({
                "iteration": state.iterations,
                "response": agent_response,
                "parsed": parsed,
                "observation": observation if 'action' in parsed else None
            })

        return "Agent reached maximum iterations without finding an answer."

# Tool definitions
def web_search(query: str, num_results: int = 5) -> str:
    """Search the web for information."""
    # Implementation
    return f"Search results for '{query}'..."

def calculate(expression: str) -> str:
    """Evaluate a mathematical expression."""
    try:
        result = eval(expression)  # Use safer evaluation in production
        return str(result)
    except Exception as e:
        return f"Error: {e}"

def get_weather(city: str) -> str:
    """Get current weather for a city."""
    # Implementation
    return f"Weather in {city}: 72°F, Sunny"

# Usage
import anthropic

agent = ReActAgent(
    llm_client=anthropic.Anthropic(),
    tools={
        "web_search": web_search,
        "calculate": calculate,
        "get_weather": get_weather
    },
    verbose=True
)

# Run agent
result = await agent.run("What is the weather in Tokyo and calculate 15% tip on $85?")

3.3 Multi-Agent Systems

"""
Multi-Agent System with Specialized Roles

Agents collaborate by passing messages and delegating tasks
based on their specializations.
"""

from typing import Dict, List, Optional
from enum import Enum
import asyncio

class AgentRole(Enum):
    COORDINATOR = "coordinator"
    RESEARCHER = "researcher"
    CODER = "coder"
    REVIEWER = "reviewer"

class SpecializedAgent:
    """An agent with a specific role and capabilities."""

    def __init__(
        self,
        role: AgentRole,
        llm_client,
        tools: Dict[str, callable] = None
    ):
        self.role = role
        self.llm = llm_client
        self.tools = tools or {}

        self.system_prompts = {
            AgentRole.COORDINATOR: """You are a project coordinator agent.
Your job is to:
1. Understand the overall goal
2. Break it into subtasks
3. Assign tasks to specialized agents
4. Synthesize results into a final deliverable

Respond with a JSON plan: {"tasks": [{"agent": "researcher|coder|reviewer", "task": "description"}]}""",

            AgentRole.RESEARCHER: """You are a research specialist agent.
Your job is to:
1. Search for relevant information
2. Analyze and summarize findings
3. Provide factual, well-sourced answers

Use web_search tool when needed.""",

            AgentRole.CODER: """You are a coding specialist agent.
Your job is to:
1. Write clean, efficient code
2. Follow best practices
3. Include error handling
4. Add comments for complex logic

Output code in markdown code blocks.""",

            AgentRole.REVIEWER: """You are a code review specialist agent.
Your job is to:
1. Check for bugs and errors
2. Suggest improvements
3. Verify correctness
4. Ensure code quality

Provide structured feedback with severity levels."""
        }

    async def process(self, task: str, context: str = "") -> str:
        """Process a task according to agent's role."""
        messages = [
            {"role": "user", "content": f"Context:\n{context}\n\nTask: {task}"}
        ]

        response = self.llm.messages.create(
            model="claude-sonnet-4-20250514",
            system=self.system_prompts[self.role],
            max_tokens=4096,
            messages=messages
        )

        return response.content[0].text

class MultiAgentSystem:
    """
    Orchestrates multiple specialized agents to complete complex tasks.
    """

    def __init__(self, llm_client):
        self.llm = llm_client
        self.agents = {
            AgentRole.COORDINATOR: SpecializedAgent(AgentRole.COORDINATOR, llm_client),
            AgentRole.RESEARCHER: SpecializedAgent(AgentRole.RESEARCHER, llm_client),
            AgentRole.CODER: SpecializedAgent(AgentRole.CODER, llm_client),
            AgentRole.REVIEWER: SpecializedAgent(AgentRole.REVIEWER, llm_client)
        }

    async def execute(self, goal: str) -> Dict[str, any]:
        """
        Execute a multi-agent workflow.
        """
        results = {"goal": goal, "steps": []}

        # Step 1: Coordinator creates plan
        print("Coordinator: Creating plan...")
        plan_response = await self.agents[AgentRole.COORDINATOR].process(goal)
        results["plan"] = plan_response

        # Parse plan (simplified)
        import json
        try:
            plan = json.loads(plan_response)
            tasks = plan.get("tasks", [])
        except:
            tasks = [{"agent": "coder", "task": goal}]

        # Step 2: Execute tasks
        context = f"Goal: {goal}\n"

        for task_info in tasks:
            agent_type = task_info["agent"]
            task = task_info["task"]

            role = {
                "researcher": AgentRole.RESEARCHER,
                "coder": AgentRole.CODER,
                "reviewer": AgentRole.REVIEWER
            }.get(agent_type, AgentRole.CODER)

            print(f"{role.value.title()}: Working on '{task}'...")
            result = await self.agents[role].process(task, context)

            results["steps"].append({
                "agent": role.value,
                "task": task,
                "result": result
            })

            context += f"\n{role.value} completed: {result[:500]}..."

        # Step 3: Coordinator synthesizes
        print("Coordinator: Synthesizing results...")
        final_context = json.dumps(results["steps"], indent=2)
        synthesis = await self.agents[AgentRole.COORDINATOR].process(
            "Synthesize all results into a final deliverable",
            final_context
        )
        results["final"] = synthesis

        return results

# Usage
import anthropic

mas = MultiAgentSystem(anthropic.Anthropic())
result = await mas.execute(
    "Create a Python function to parse CSV files with error handling and write unit tests for it"
)

4. Multimodal LLMs

Multimodal LLMs process and generate multiple types of content: text, images, audio, and video. This enables powerful applications like visual question answering, image generation from text, and document understanding.

4.1 Vision-Language Models

Model Capabilities Context Provider
GPT-4o Text, Image, Audio input/output 128K OpenAI
Claude 3.5 Sonnet Text, Image input 200K Anthropic
Gemini 2.0 Flash Text, Image, Audio, Video 1M Google
Llama 3.2 Vision Text, Image input 128K Meta (Open)

4.2 Working with Images

"""
Multimodal LLM usage examples with images.
"""

import anthropic
import base64
from pathlib import Path

def encode_image(image_path: str) -> tuple[str, str]:
    """Encode image to base64 with media type detection."""
    path = Path(image_path)
    suffix = path.suffix.lower()

    media_types = {
        '.jpg': 'image/jpeg',
        '.jpeg': 'image/jpeg',
        '.png': 'image/png',
        '.gif': 'image/gif',
        '.webp': 'image/webp'
    }

    media_type = media_types.get(suffix, 'image/png')

    with open(path, 'rb') as f:
        data = base64.standard_b64encode(f.read()).decode('utf-8')

    return data, media_type

def analyze_image(image_path: str, question: str) -> str:
    """
    Analyze an image with Claude Vision.
    """
    client = anthropic.Anthropic()

    image_data, media_type = encode_image(image_path)

    message = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": media_type,
                            "data": image_data
                        }
                    },
                    {
                        "type": "text",
                        "text": question
                    }
                ]
            }
        ]
    )

    return message.content[0].text

def compare_images(image_paths: list[str], comparison_prompt: str) -> str:
    """
    Compare multiple images.
    """
    client = anthropic.Anthropic()

    content = []
    for i, path in enumerate(image_paths):
        image_data, media_type = encode_image(path)
        content.append({
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": media_type,
                "data": image_data
            }
        })

    content.append({
        "type": "text",
        "text": comparison_prompt
    })

    message = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": content}]
    )

    return message.content[0].text

def extract_text_from_document(image_path: str) -> str:
    """
    OCR and document understanding.
    """
    return analyze_image(
        image_path,
        """Extract all text from this document image.
Preserve the structure and formatting as much as possible.
If there are tables, format them properly.
If there are forms, identify fields and values."""
    )

def analyze_chart(image_path: str) -> dict:
    """
    Analyze a chart or graph.
    """
    import json

    response = analyze_image(
        image_path,
        """Analyze this chart/graph and provide:
1. Chart type (bar, line, pie, etc.)
2. Title and axis labels
3. Key data points
4. Main trends or insights
5. Any anomalies or notable patterns

Respond in JSON format."""
    )

    try:
        return json.loads(response)
    except:
        return {"raw_analysis": response}

# Usage examples
# Single image analysis
result = analyze_image(
    "screenshot.png",
    "What UI issues do you see in this screenshot?"
)

# Document OCR
text = extract_text_from_document("receipt.jpg")

# Chart analysis
chart_data = analyze_chart("sales_graph.png")

5. Production Deployment

5.1 Cost Optimization

LLM API Pricing Comparison (2026)

Model Input ($/1M tokens) Output ($/1M tokens) Best For
GPT-4o $2.50 $10.00 Complex reasoning
GPT-4o-mini $0.15 $0.60 General tasks
Claude Sonnet $3.00 $15.00 Code, analysis
Claude Haiku $0.25 $1.25 Simple tasks
Gemini 2.0 Flash $0.075 $0.30 High volume

*Prices approximate; check provider documentation for current rates

"""
Cost optimization strategies for LLM applications.
"""

from typing import Optional
from dataclasses import dataclass
import tiktoken

@dataclass
class CostEstimate:
    """Cost estimation for an LLM request."""
    input_tokens: int
    output_tokens: int
    input_cost: float
    output_cost: float
    total_cost: float

class CostOptimizer:
    """
    Utilities for LLM cost optimization.
    """

    # Pricing per million tokens (example rates)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
        "claude-3-5-haiku-20241022": {"input": 0.25, "output": 1.25},
        "gemini-2.0-flash": {"input": 0.075, "output": 0.30}
    }

    def __init__(self, default_model: str = "gpt-4o-mini"):
        self.default_model = default_model
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoder.encode(text))

    def estimate_cost(
        self,
        prompt: str,
        expected_output_tokens: int = 500,
        model: Optional[str] = None
    ) -> CostEstimate:
        """
        Estimate cost for a request.
        """
        model = model or self.default_model
        pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])

        input_tokens = self.count_tokens(prompt)
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (expected_output_tokens / 1_000_000) * pricing["output"]

        return CostEstimate(
            input_tokens=input_tokens,
            output_tokens=expected_output_tokens,
            input_cost=input_cost,
            output_cost=output_cost,
            total_cost=input_cost + output_cost
        )

    def select_model(
        self,
        task_complexity: str,
        budget_per_request: float = 0.01
    ) -> str:
        """
        Select appropriate model based on task and budget.

        task_complexity: "simple", "medium", "complex"
        """
        model_tiers = {
            "simple": ["gemini-2.0-flash", "claude-3-5-haiku-20241022", "gpt-4o-mini"],
            "medium": ["gpt-4o-mini", "claude-sonnet-4-20250514"],
            "complex": ["claude-sonnet-4-20250514", "gpt-4o"]
        }

        candidates = model_tiers.get(task_complexity, model_tiers["medium"])

        # Select cheapest that fits budget
        for model in candidates:
            # Rough estimate for 1000 input + 500 output tokens
            cost = self.estimate_cost("x" * 4000, 500, model).total_cost
            if cost <= budget_per_request:
                return model

        return candidates[-1]  # Return most capable if budget allows

    @staticmethod
    def optimize_prompt(prompt: str, max_tokens: int = 4000) -> str:
        """
        Compress prompt to reduce costs.

        Strategies:
        1. Remove unnecessary whitespace
        2. Use abbreviations
        3. Remove redundant instructions
        """
        # Remove extra whitespace
        import re
        prompt = re.sub(r'\n\s*\n', '\n\n', prompt)
        prompt = re.sub(r' +', ' ', prompt)

        # Truncate if too long
        encoder = tiktoken.get_encoding("cl100k_base")
        tokens = encoder.encode(prompt)

        if len(tokens) > max_tokens:
            # Keep beginning and end, truncate middle
            keep = max_tokens // 2
            tokens = tokens[:keep] + tokens[-keep:]
            prompt = encoder.decode(tokens)

        return prompt.strip()

# Caching for cost reduction
from functools import lru_cache
import hashlib

class CachedLLM:
    """
    LLM client with response caching.

    Caching identical requests can reduce costs by 50-90%
    for applications with repeated queries.
    """

    def __init__(self, client, cache_size: int = 1000):
        self.client = client
        self.cache = {}
        self.cache_size = cache_size
        self.cache_hits = 0
        self.total_requests = 0

    def _cache_key(self, model: str, messages: list, **kwargs) -> str:
        """Generate cache key from request parameters."""
        key_data = {
            "model": model,
            "messages": messages,
            **kwargs
        }
        key_str = str(sorted(key_data.items()))
        return hashlib.md5(key_str.encode()).hexdigest()

    def create(self, model: str, messages: list, **kwargs):
        """
        Create completion with caching.
        """
        self.total_requests += 1

        # Check cache
        cache_key = self._cache_key(model, messages, **kwargs)
        if cache_key in self.cache:
            self.cache_hits += 1
            return self.cache[cache_key]

        # Make request
        response = self.client.messages.create(
            model=model,
            messages=messages,
            **kwargs
        )

        # Cache response
        if len(self.cache) >= self.cache_size:
            # Simple LRU: remove oldest
            oldest = next(iter(self.cache))
            del self.cache[oldest]

        self.cache[cache_key] = response
        return response

    @property
    def cache_hit_rate(self) -> float:
        """Return cache hit rate."""
        if self.total_requests == 0:
            return 0.0
        return self.cache_hits / self.total_requests

5.2 Monitoring and Observability

"""
LLM application monitoring and logging.
"""

import time
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime
import json

@dataclass
class LLMRequestLog:
    """Log entry for an LLM request."""
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    status: str
    error: str = None
    metadata: Dict[str, Any] = field(default_factory=dict)

class LLMMonitor:
    """
    Monitor LLM requests for cost, latency, and errors.
    """

    def __init__(self):
        self.logs: List[LLMRequestLog] = []
        self.logger = logging.getLogger("llm_monitor")

    def log_request(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        status: str = "success",
        error: str = None,
        **metadata
    ):
        """Log an LLM request."""
        log = LLMRequestLog(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=latency_ms,
            status=status,
            error=error,
            metadata=metadata
        )
        self.logs.append(log)

        # Also log to standard logger
        self.logger.info(json.dumps({
            "timestamp": log.timestamp.isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "latency_ms": latency_ms,
            "status": status,
            "error": error
        }))

    def get_metrics(self, window_hours: int = 24) -> Dict[str, Any]:
        """Get aggregated metrics."""
        from datetime import timedelta

        cutoff = datetime.now() - timedelta(hours=window_hours)
        recent_logs = [l for l in self.logs if l.timestamp > cutoff]

        if not recent_logs:
            return {"message": "No logs in window"}

        total_input = sum(l.input_tokens for l in recent_logs)
        total_output = sum(l.output_tokens for l in recent_logs)
        avg_latency = sum(l.latency_ms for l in recent_logs) / len(recent_logs)
        error_rate = sum(1 for l in recent_logs if l.status == "error") / len(recent_logs)

        # Per-model breakdown
        models = {}
        for log in recent_logs:
            if log.model not in models:
                models[log.model] = {"requests": 0, "input_tokens": 0, "output_tokens": 0}
            models[log.model]["requests"] += 1
            models[log.model]["input_tokens"] += log.input_tokens
            models[log.model]["output_tokens"] += log.output_tokens

        return {
            "window_hours": window_hours,
            "total_requests": len(recent_logs),
            "total_input_tokens": total_input,
            "total_output_tokens": total_output,
            "avg_latency_ms": round(avg_latency, 2),
            "error_rate": round(error_rate, 4),
            "per_model": models
        }

# Wrapper for automatic monitoring
class MonitoredClient:
    """
    LLM client wrapper with automatic monitoring.
    """

    def __init__(self, client, monitor: LLMMonitor):
        self.client = client
        self.monitor = monitor

    def create(self, model: str, messages: list, **kwargs):
        """Create completion with monitoring."""
        start_time = time.time()
        status = "success"
        error = None

        try:
            response = self.client.messages.create(
                model=model,
                messages=messages,
                **kwargs
            )
            return response

        except Exception as e:
            status = "error"
            error = str(e)
            raise

        finally:
            latency_ms = (time.time() - start_time) * 1000

            # Extract token counts (varies by provider)
            input_tokens = getattr(response, 'usage', {}).get('input_tokens', 0) if 'response' in dir() else 0
            output_tokens = getattr(response, 'usage', {}).get('output_tokens', 0) if 'response' in dir() else 0

            self.monitor.log_request(
                model=model,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                latency_ms=latency_ms,
                status=status,
                error=error
            )

5.3 Production Checklist

LLM Application Production Checklist

Security:

Reliability:

Performance:

Monitoring:

Summary

Chapter 5 Key Takeaways

Previous: LLM Inference and Optimization Back to Series Index
日本語