1. Retrieval-Augmented Generation (RAG)
RAG enhances LLMs with external knowledge, enabling accurate responses about private data, recent information, and domain-specific content. It addresses the fundamental limitation of LLMs: static training data that becomes outdated.
1.1 RAG Architecture
1.2 Building a RAG System
from typing import List, Optional
import numpy as np
from dataclasses import dataclass
@dataclass
class Document:
"""Represents a document chunk with metadata."""
content: str
metadata: dict
embedding: Optional[np.ndarray] = None
class RAGSystem:
"""
Production-ready RAG implementation with modern best practices.
"""
def __init__(
self,
embedding_model: str = "text-embedding-3-large",
llm_model: str = "gpt-4o",
chunk_size: int = 512,
chunk_overlap: int = 50,
top_k: int = 5
):
self.embedding_model = embedding_model
self.llm_model = llm_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.top_k = top_k
self.documents: List[Document] = []
def chunk_document(self, text: str, metadata: dict) -> List[Document]:
"""
Split document into overlapping chunks.
Uses semantic chunking boundaries when possible:
- Paragraph breaks
- Section headers
- Sentence boundaries
"""
chunks = []
# Simple overlap-based chunking (production: use semantic chunking)
start = 0
while start < len(text):
end = start + self.chunk_size
# Find natural break point
if end < len(text):
# Look for paragraph break
para_break = text.rfind('\n\n', start, end)
if para_break > start + self.chunk_size // 2:
end = para_break
# Or sentence boundary
elif (period := text.rfind('. ', start, end)) > start + self.chunk_size // 2:
end = period + 1
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(Document(
content=chunk_text,
metadata={**metadata, "chunk_start": start, "chunk_end": end}
))
start = end - self.chunk_overlap
return chunks
def embed_documents(self, documents: List[Document]) -> List[Document]:
"""Generate embeddings for documents."""
from openai import OpenAI
client = OpenAI()
texts = [doc.content for doc in documents]
# Batch embedding for efficiency
response = client.embeddings.create(
model=self.embedding_model,
input=texts
)
for doc, embedding_data in zip(documents, response.data):
doc.embedding = np.array(embedding_data.embedding)
return documents
def index(self, documents: List[Document]):
"""Add documents to the index."""
embedded_docs = self.embed_documents(documents)
self.documents.extend(embedded_docs)
def retrieve(
self,
query: str,
top_k: Optional[int] = None
) -> List[Document]:
"""
Retrieve relevant documents using cosine similarity.
"""
from openai import OpenAI
client = OpenAI()
top_k = top_k or self.top_k
# Embed query
response = client.embeddings.create(
model=self.embedding_model,
input=[query]
)
query_embedding = np.array(response.data[0].embedding)
# Calculate similarities
similarities = []
for doc in self.documents:
if doc.embedding is not None:
sim = np.dot(query_embedding, doc.embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc.embedding)
)
similarities.append((sim, doc))
# Sort and return top-k
similarities.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in similarities[:top_k]]
def generate(
self,
query: str,
context_documents: List[Document],
system_prompt: Optional[str] = None
) -> str:
"""
Generate answer using retrieved context.
"""
from openai import OpenAI
client = OpenAI()
# Build context string
context = "\n\n---\n\n".join([
f"[Source: {doc.metadata.get('source', 'unknown')}]\n{doc.content}"
for doc in context_documents
])
system = system_prompt or """You are a helpful assistant that answers questions based on the provided context.
If the answer cannot be found in the context, say so clearly.
Always cite the sources used in your answer."""
messages = [
{"role": "system", "content": system},
{"role": "user", "content": f"""Context:
{context}
Question: {query}
Please answer based on the context provided above."""}
]
response = client.chat.completions.create(
model=self.llm_model,
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
def query(self, question: str) -> dict:
"""
End-to-end RAG query pipeline.
"""
# Retrieve relevant documents
retrieved_docs = self.retrieve(question)
# Generate answer
answer = self.generate(question, retrieved_docs)
return {
"question": question,
"answer": answer,
"sources": [
{"content": doc.content[:200] + "...", "metadata": doc.metadata}
for doc in retrieved_docs
]
}
# Usage example
rag = RAGSystem(
embedding_model="text-embedding-3-large",
llm_model="gpt-4o",
chunk_size=512,
top_k=5
)
# Index documents
documents = [
Document(content="Your document text here...", metadata={"source": "doc1.pdf"}),
# ... more documents
]
rag.index(documents)
# Query
result = rag.query("What are the key findings?")
print(result["answer"])
1.3 Advanced RAG Techniques
RAG Evolution (2024-2026)
| Technique | Description | Use Case |
|---|---|---|
| Naive RAG | Basic retrieve-then-generate | Simple Q&A |
| Hybrid Search | Vector + keyword (BM25) fusion | Mixed queries |
| HyDE | Hypothetical Document Embeddings | Sparse data |
| Reranking | Cross-encoder reranking of results | High precision needs |
| Agentic RAG | Multi-step retrieval with reasoning | Complex queries |
| GraphRAG | Knowledge graph + vector retrieval | Relationship queries |
class AdvancedRAG(RAGSystem):
"""
Advanced RAG with hybrid search and reranking.
"""
def __init__(self, *args, reranker_model: str = "BAAI/bge-reranker-v2-m3", **kwargs):
super().__init__(*args, **kwargs)
self.reranker_model = reranker_model
def hybrid_retrieve(
self,
query: str,
top_k: int = 20,
alpha: float = 0.7
) -> List[Document]:
"""
Hybrid retrieval combining dense and sparse search.
alpha: Weight for dense search (1-alpha for BM25)
"""
# Dense retrieval (vector similarity)
dense_results = self.retrieve(query, top_k=top_k)
# Sparse retrieval (BM25)
sparse_results = self._bm25_retrieve(query, top_k=top_k)
# Reciprocal Rank Fusion
fused_scores = {}
for rank, doc in enumerate(dense_results):
doc_id = id(doc)
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + alpha / (rank + 60)
for rank, doc in enumerate(sparse_results):
doc_id = id(doc)
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + (1 - alpha) / (rank + 60)
# Create doc lookup and sort
all_docs = {id(doc): doc for doc in dense_results + sparse_results}
sorted_ids = sorted(fused_scores.keys(), key=lambda x: fused_scores[x], reverse=True)
return [all_docs[doc_id] for doc_id in sorted_ids[:top_k]]
def _bm25_retrieve(self, query: str, top_k: int) -> List[Document]:
"""BM25 sparse retrieval."""
from rank_bm25 import BM25Okapi
# Tokenize documents
tokenized_docs = [doc.content.lower().split() for doc in self.documents]
bm25 = BM25Okapi(tokenized_docs)
# Score and rank
query_tokens = query.lower().split()
scores = bm25.get_scores(query_tokens)
ranked_indices = np.argsort(scores)[::-1][:top_k]
return [self.documents[i] for i in ranked_indices]
def rerank(
self,
query: str,
documents: List[Document],
top_k: int = 5
) -> List[Document]:
"""
Rerank documents using cross-encoder.
Cross-encoders are more accurate but slower than bi-encoders.
Use for final reranking of retrieved candidates.
"""
from sentence_transformers import CrossEncoder
reranker = CrossEncoder(self.reranker_model)
# Score each document
pairs = [[query, doc.content] for doc in documents]
scores = reranker.predict(pairs)
# Sort by score
ranked = sorted(zip(scores, documents), key=lambda x: x[0], reverse=True)
return [doc for _, doc in ranked[:top_k]]
def query_with_hyde(self, question: str) -> dict:
"""
HyDE: Hypothetical Document Embeddings.
Generate a hypothetical answer first, then use it for retrieval.
Effective when documents don't match query style.
"""
from openai import OpenAI
client = OpenAI()
# Generate hypothetical document
hyde_prompt = f"""Write a detailed passage that would answer this question:
{question}
Write as if you are quoting from a document that contains the answer."""
hyde_response = client.chat.completions.create(
model=self.llm_model,
messages=[{"role": "user", "content": hyde_prompt}],
temperature=0.7
)
hypothetical_doc = hyde_response.choices[0].message.content
# Retrieve using hypothetical document
retrieved = self.retrieve(hypothetical_doc)
# Rerank with original query
reranked = self.rerank(question, retrieved)
# Generate final answer
answer = self.generate(question, reranked)
return {
"question": question,
"hypothetical_doc": hypothetical_doc,
"answer": answer,
"sources": [{"content": doc.content[:200]} for doc in reranked]
}
2. Model Context Protocol (MCP)
MCP is an open standard developed by Anthropic for connecting LLMs to external tools and data sources. It provides a unified protocol that allows AI models to access capabilities beyond their training data through a standardized interface.
2.1 MCP Architecture
MCP Core Concepts
- Tools: Functions the LLM can call (e.g., search, calculate, query database)
- Resources: Data the LLM can read (e.g., files, database records)
- Prompts: Reusable prompt templates with arguments
- Sampling: LLM completion requests from server to client
2.2 Building an MCP Server
"""
MCP Server Implementation Example
This server provides tools for file operations and web search,
demonstrating the MCP protocol patterns.
"""
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool, TextContent, ImageContent,
Resource, ResourceTemplate,
Prompt, PromptMessage, PromptArgument
)
import mcp.server.stdio
import asyncio
import httpx
from pathlib import Path
# Create server instance
server = Server("example-mcp-server")
# ========== TOOLS ==========
@server.list_tools()
async def list_tools() -> list[Tool]:
"""Define available tools."""
return [
Tool(
name="read_file",
description="Read contents of a file from the filesystem",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read"
}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="Write content to a file",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"content": {"type": "string", "description": "Content to write"}
},
"required": ["path", "content"]
}
),
Tool(
name="web_search",
description="Search the web for information",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"num_results": {
"type": "integer",
"description": "Number of results (default: 5)",
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="execute_sql",
description="Execute a SQL query against the database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query"},
"params": {
"type": "array",
"items": {"type": "string"},
"description": "Query parameters"
}
},
"required": ["query"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent]:
"""Handle tool execution."""
if name == "read_file":
path = Path(arguments["path"])
if not path.exists():
return [TextContent(type="text", text=f"Error: File not found: {path}")]
if not path.is_file():
return [TextContent(type="text", text=f"Error: Not a file: {path}")]
content = path.read_text(encoding="utf-8")
return [TextContent(type="text", text=content)]
elif name == "write_file":
path = Path(arguments["path"])
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(arguments["content"], encoding="utf-8")
return [TextContent(type="text", text=f"Successfully wrote to {path}")]
elif name == "web_search":
# Example using a search API
query = arguments["query"]
num_results = arguments.get("num_results", 5)
async with httpx.AsyncClient() as client:
# Replace with actual search API
response = await client.get(
"https://api.search.example.com/search",
params={"q": query, "limit": num_results}
)
results = response.json()
formatted = "\n\n".join([
f"**{r['title']}**\n{r['snippet']}\nURL: {r['url']}"
for r in results.get("results", [])
])
return [TextContent(type="text", text=formatted)]
elif name == "execute_sql":
# Example SQL execution (use with caution!)
import sqlite3
conn = sqlite3.connect("database.db")
cursor = conn.cursor()
try:
params = arguments.get("params", [])
cursor.execute(arguments["query"], params)
if arguments["query"].strip().upper().startswith("SELECT"):
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
result = [dict(zip(columns, row)) for row in rows]
return [TextContent(type="text", text=str(result))]
else:
conn.commit()
return [TextContent(type="text", text=f"Query executed. Rows affected: {cursor.rowcount}")]
finally:
conn.close()
return [TextContent(type="text", text=f"Unknown tool: {name}")]
# ========== RESOURCES ==========
@server.list_resources()
async def list_resources() -> list[Resource]:
"""List available resources."""
return [
Resource(
uri="file:///config/settings.json",
name="Application Settings",
description="Current application configuration",
mimeType="application/json"
),
Resource(
uri="db://users/schema",
name="Users Table Schema",
description="Schema definition for the users table",
mimeType="text/plain"
)
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""Read a specific resource."""
if uri == "file:///config/settings.json":
return Path("config/settings.json").read_text()
elif uri == "db://users/schema":
return """
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
raise ValueError(f"Unknown resource: {uri}")
# ========== PROMPTS ==========
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""List available prompt templates."""
return [
Prompt(
name="code_review",
description="Review code for issues and improvements",
arguments=[
PromptArgument(
name="code",
description="The code to review",
required=True
),
PromptArgument(
name="language",
description="Programming language",
required=False
)
]
),
Prompt(
name="explain_error",
description="Explain an error message and suggest fixes",
arguments=[
PromptArgument(
name="error",
description="The error message",
required=True
)
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
"""Get a specific prompt with arguments filled in."""
if name == "code_review":
language = arguments.get("language", "unknown")
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""Please review this {language} code:
```{language}
{arguments['code']}
```
Analyze for:
1. Bugs and potential issues
2. Performance concerns
3. Security vulnerabilities
4. Code style and readability
5. Suggested improvements"""
)
)
]
elif name == "explain_error":
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""I encountered this error:
{arguments['error']}
Please:
1. Explain what this error means
2. Identify the likely cause
3. Suggest how to fix it
4. Provide example code if helpful"""
)
)
]
raise ValueError(f"Unknown prompt: {name}")
# ========== SERVER STARTUP ==========
async def main():
"""Run the MCP server."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="example-mcp-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
asyncio.run(main())
2.3 MCP Client Integration
"""
Using MCP with Claude and other LLM clients.
"""
import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def use_mcp_with_claude():
"""
Example: Using MCP tools with Claude API.
"""
# Connect to MCP server
server_params = StdioServerParameters(
command="python",
args=["mcp_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize session
await session.initialize()
# Get available tools
tools_response = await session.list_tools()
tools = tools_response.tools
# Convert MCP tools to Claude format
claude_tools = [
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
for tool in tools
]
# Use with Claude
client = anthropic.Anthropic()
messages = [
{"role": "user", "content": "Search the web for recent news about AI agents"}
]
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=claude_tools,
messages=messages
)
# Handle tool use
while response.stop_reason == "tool_use":
tool_use_block = next(
b for b in response.content if b.type == "tool_use"
)
# Execute tool via MCP
tool_result = await session.call_tool(
tool_use_block.name,
tool_use_block.input
)
# Continue conversation
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_use_block.id,
"content": tool_result.content[0].text
}]
})
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=claude_tools,
messages=messages
)
# Final response
return response.content[0].text
3. AI Agents
AI Agents are LLM-powered systems that can reason, plan, and take actions to accomplish goals. They combine the reasoning capabilities of LLMs with tool use, memory, and multi-step execution.
3.1 Agent Architecture
3.2 ReAct Agent Pattern
"""
ReAct Agent: Reasoning + Acting
The ReAct pattern interleaves reasoning traces with actions,
allowing the model to plan, act, and observe iteratively.
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
import json
@dataclass
class AgentState:
"""Agent execution state."""
goal: str
history: List[Dict[str, Any]] = field(default_factory=list)
observations: List[str] = field(default_factory=list)
final_answer: Optional[str] = None
iterations: int = 0
max_iterations: int = 10
class ReActAgent:
"""
ReAct Agent implementation.
Pattern: Thought -> Action -> Observation -> Repeat
"""
def __init__(
self,
llm_client,
tools: Dict[str, callable],
model: str = "claude-sonnet-4-20250514",
verbose: bool = True
):
self.llm = llm_client
self.tools = tools
self.model = model
self.verbose = verbose
self.system_prompt = """You are a helpful AI agent that solves problems step by step.
For each step, you must output in this exact format:
Thought:
Action:
Action Input:
OR if you have the final answer:
Thought:
Final Answer:
Available tools:
{tool_descriptions}
Rules:
1. Always think before acting
2. Use observations to inform next steps
3. If a tool fails, try a different approach
4. Provide a Final Answer when you have enough information"""
def _format_tool_descriptions(self) -> str:
"""Format tool descriptions for the prompt."""
descriptions = []
for name, tool in self.tools.items():
desc = getattr(tool, '__doc__', 'No description')
descriptions.append(f"- {name}: {desc}")
return "\n".join(descriptions)
def _parse_response(self, response: str) -> Dict[str, Any]:
"""Parse agent response into structured format."""
lines = response.strip().split('\n')
result = {}
current_key = None
current_value = []
for line in lines:
if line.startswith('Thought:'):
if current_key:
result[current_key] = '\n'.join(current_value).strip()
current_key = 'thought'
current_value = [line[8:].strip()]
elif line.startswith('Action:'):
if current_key:
result[current_key] = '\n'.join(current_value).strip()
current_key = 'action'
current_value = [line[7:].strip()]
elif line.startswith('Action Input:'):
if current_key:
result[current_key] = '\n'.join(current_value).strip()
current_key = 'action_input'
current_value = [line[13:].strip()]
elif line.startswith('Final Answer:'):
if current_key:
result[current_key] = '\n'.join(current_value).strip()
current_key = 'final_answer'
current_value = [line[13:].strip()]
else:
current_value.append(line)
if current_key:
result[current_key] = '\n'.join(current_value).strip()
return result
def _execute_tool(self, tool_name: str, tool_input: str) -> str:
"""Execute a tool and return observation."""
if tool_name not in self.tools:
return f"Error: Unknown tool '{tool_name}'"
try:
# Parse JSON input
if tool_input.strip():
input_dict = json.loads(tool_input)
else:
input_dict = {}
# Execute tool
result = self.tools[tool_name](**input_dict)
return str(result)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON input - {e}"
except Exception as e:
return f"Error executing {tool_name}: {e}"
async def run(self, goal: str) -> str:
"""
Run the agent to accomplish a goal.
"""
state = AgentState(goal=goal)
# Format system prompt with tools
system = self.system_prompt.format(
tool_descriptions=self._format_tool_descriptions()
)
messages = [
{"role": "user", "content": f"Goal: {goal}"}
]
while state.iterations < state.max_iterations:
state.iterations += 1
if self.verbose:
print(f"\n--- Iteration {state.iterations} ---")
# Get agent response
response = self.llm.messages.create(
model=self.model,
system=system,
max_tokens=2048,
messages=messages
)
agent_response = response.content[0].text
parsed = self._parse_response(agent_response)
if self.verbose:
print(f"Thought: {parsed.get('thought', 'N/A')}")
# Check for final answer
if 'final_answer' in parsed:
state.final_answer = parsed['final_answer']
if self.verbose:
print(f"Final Answer: {state.final_answer}")
return state.final_answer
# Execute action
if 'action' in parsed:
action = parsed['action']
action_input = parsed.get('action_input', '{}')
if self.verbose:
print(f"Action: {action}")
print(f"Action Input: {action_input}")
observation = self._execute_tool(action, action_input)
state.observations.append(observation)
if self.verbose:
print(f"Observation: {observation[:500]}...")
# Add to conversation
messages.append({"role": "assistant", "content": agent_response})
messages.append({"role": "user", "content": f"Observation: {observation}"})
state.history.append({
"iteration": state.iterations,
"response": agent_response,
"parsed": parsed,
"observation": observation if 'action' in parsed else None
})
return "Agent reached maximum iterations without finding an answer."
# Tool definitions
def web_search(query: str, num_results: int = 5) -> str:
"""Search the web for information."""
# Implementation
return f"Search results for '{query}'..."
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = eval(expression) # Use safer evaluation in production
return str(result)
except Exception as e:
return f"Error: {e}"
def get_weather(city: str) -> str:
"""Get current weather for a city."""
# Implementation
return f"Weather in {city}: 72°F, Sunny"
# Usage
import anthropic
agent = ReActAgent(
llm_client=anthropic.Anthropic(),
tools={
"web_search": web_search,
"calculate": calculate,
"get_weather": get_weather
},
verbose=True
)
# Run agent
result = await agent.run("What is the weather in Tokyo and calculate 15% tip on $85?")
3.3 Multi-Agent Systems
"""
Multi-Agent System with Specialized Roles
Agents collaborate by passing messages and delegating tasks
based on their specializations.
"""
from typing import Dict, List, Optional
from enum import Enum
import asyncio
class AgentRole(Enum):
COORDINATOR = "coordinator"
RESEARCHER = "researcher"
CODER = "coder"
REVIEWER = "reviewer"
class SpecializedAgent:
"""An agent with a specific role and capabilities."""
def __init__(
self,
role: AgentRole,
llm_client,
tools: Dict[str, callable] = None
):
self.role = role
self.llm = llm_client
self.tools = tools or {}
self.system_prompts = {
AgentRole.COORDINATOR: """You are a project coordinator agent.
Your job is to:
1. Understand the overall goal
2. Break it into subtasks
3. Assign tasks to specialized agents
4. Synthesize results into a final deliverable
Respond with a JSON plan: {"tasks": [{"agent": "researcher|coder|reviewer", "task": "description"}]}""",
AgentRole.RESEARCHER: """You are a research specialist agent.
Your job is to:
1. Search for relevant information
2. Analyze and summarize findings
3. Provide factual, well-sourced answers
Use web_search tool when needed.""",
AgentRole.CODER: """You are a coding specialist agent.
Your job is to:
1. Write clean, efficient code
2. Follow best practices
3. Include error handling
4. Add comments for complex logic
Output code in markdown code blocks.""",
AgentRole.REVIEWER: """You are a code review specialist agent.
Your job is to:
1. Check for bugs and errors
2. Suggest improvements
3. Verify correctness
4. Ensure code quality
Provide structured feedback with severity levels."""
}
async def process(self, task: str, context: str = "") -> str:
"""Process a task according to agent's role."""
messages = [
{"role": "user", "content": f"Context:\n{context}\n\nTask: {task}"}
]
response = self.llm.messages.create(
model="claude-sonnet-4-20250514",
system=self.system_prompts[self.role],
max_tokens=4096,
messages=messages
)
return response.content[0].text
class MultiAgentSystem:
"""
Orchestrates multiple specialized agents to complete complex tasks.
"""
def __init__(self, llm_client):
self.llm = llm_client
self.agents = {
AgentRole.COORDINATOR: SpecializedAgent(AgentRole.COORDINATOR, llm_client),
AgentRole.RESEARCHER: SpecializedAgent(AgentRole.RESEARCHER, llm_client),
AgentRole.CODER: SpecializedAgent(AgentRole.CODER, llm_client),
AgentRole.REVIEWER: SpecializedAgent(AgentRole.REVIEWER, llm_client)
}
async def execute(self, goal: str) -> Dict[str, any]:
"""
Execute a multi-agent workflow.
"""
results = {"goal": goal, "steps": []}
# Step 1: Coordinator creates plan
print("Coordinator: Creating plan...")
plan_response = await self.agents[AgentRole.COORDINATOR].process(goal)
results["plan"] = plan_response
# Parse plan (simplified)
import json
try:
plan = json.loads(plan_response)
tasks = plan.get("tasks", [])
except:
tasks = [{"agent": "coder", "task": goal}]
# Step 2: Execute tasks
context = f"Goal: {goal}\n"
for task_info in tasks:
agent_type = task_info["agent"]
task = task_info["task"]
role = {
"researcher": AgentRole.RESEARCHER,
"coder": AgentRole.CODER,
"reviewer": AgentRole.REVIEWER
}.get(agent_type, AgentRole.CODER)
print(f"{role.value.title()}: Working on '{task}'...")
result = await self.agents[role].process(task, context)
results["steps"].append({
"agent": role.value,
"task": task,
"result": result
})
context += f"\n{role.value} completed: {result[:500]}..."
# Step 3: Coordinator synthesizes
print("Coordinator: Synthesizing results...")
final_context = json.dumps(results["steps"], indent=2)
synthesis = await self.agents[AgentRole.COORDINATOR].process(
"Synthesize all results into a final deliverable",
final_context
)
results["final"] = synthesis
return results
# Usage
import anthropic
mas = MultiAgentSystem(anthropic.Anthropic())
result = await mas.execute(
"Create a Python function to parse CSV files with error handling and write unit tests for it"
)
4. Multimodal LLMs
Multimodal LLMs process and generate multiple types of content: text, images, audio, and video. This enables powerful applications like visual question answering, image generation from text, and document understanding.
4.1 Vision-Language Models
| Model | Capabilities | Context | Provider |
|---|---|---|---|
| GPT-4o | Text, Image, Audio input/output | 128K | OpenAI |
| Claude 3.5 Sonnet | Text, Image input | 200K | Anthropic |
| Gemini 2.0 Flash | Text, Image, Audio, Video | 1M | |
| Llama 3.2 Vision | Text, Image input | 128K | Meta (Open) |
4.2 Working with Images
"""
Multimodal LLM usage examples with images.
"""
import anthropic
import base64
from pathlib import Path
def encode_image(image_path: str) -> tuple[str, str]:
"""Encode image to base64 with media type detection."""
path = Path(image_path)
suffix = path.suffix.lower()
media_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}
media_type = media_types.get(suffix, 'image/png')
with open(path, 'rb') as f:
data = base64.standard_b64encode(f.read()).decode('utf-8')
return data, media_type
def analyze_image(image_path: str, question: str) -> str:
"""
Analyze an image with Claude Vision.
"""
client = anthropic.Anthropic()
image_data, media_type = encode_image(image_path)
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_data
}
},
{
"type": "text",
"text": question
}
]
}
]
)
return message.content[0].text
def compare_images(image_paths: list[str], comparison_prompt: str) -> str:
"""
Compare multiple images.
"""
client = anthropic.Anthropic()
content = []
for i, path in enumerate(image_paths):
image_data, media_type = encode_image(path)
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_data
}
})
content.append({
"type": "text",
"text": comparison_prompt
})
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": content}]
)
return message.content[0].text
def extract_text_from_document(image_path: str) -> str:
"""
OCR and document understanding.
"""
return analyze_image(
image_path,
"""Extract all text from this document image.
Preserve the structure and formatting as much as possible.
If there are tables, format them properly.
If there are forms, identify fields and values."""
)
def analyze_chart(image_path: str) -> dict:
"""
Analyze a chart or graph.
"""
import json
response = analyze_image(
image_path,
"""Analyze this chart/graph and provide:
1. Chart type (bar, line, pie, etc.)
2. Title and axis labels
3. Key data points
4. Main trends or insights
5. Any anomalies or notable patterns
Respond in JSON format."""
)
try:
return json.loads(response)
except:
return {"raw_analysis": response}
# Usage examples
# Single image analysis
result = analyze_image(
"screenshot.png",
"What UI issues do you see in this screenshot?"
)
# Document OCR
text = extract_text_from_document("receipt.jpg")
# Chart analysis
chart_data = analyze_chart("sales_graph.png")
5. Production Deployment
5.1 Cost Optimization
LLM API Pricing Comparison (2026)
| Model | Input ($/1M tokens) | Output ($/1M tokens) | Best For |
|---|---|---|---|
| GPT-4o | $2.50 | $10.00 | Complex reasoning |
| GPT-4o-mini | $0.15 | $0.60 | General tasks |
| Claude Sonnet | $3.00 | $15.00 | Code, analysis |
| Claude Haiku | $0.25 | $1.25 | Simple tasks |
| Gemini 2.0 Flash | $0.075 | $0.30 | High volume |
*Prices approximate; check provider documentation for current rates
"""
Cost optimization strategies for LLM applications.
"""
from typing import Optional
from dataclasses import dataclass
import tiktoken
@dataclass
class CostEstimate:
"""Cost estimation for an LLM request."""
input_tokens: int
output_tokens: int
input_cost: float
output_cost: float
total_cost: float
class CostOptimizer:
"""
Utilities for LLM cost optimization.
"""
# Pricing per million tokens (example rates)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku-20241022": {"input": 0.25, "output": 1.25},
"gemini-2.0-flash": {"input": 0.075, "output": 0.30}
}
def __init__(self, default_model: str = "gpt-4o-mini"):
self.default_model = default_model
self.encoder = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoder.encode(text))
def estimate_cost(
self,
prompt: str,
expected_output_tokens: int = 500,
model: Optional[str] = None
) -> CostEstimate:
"""
Estimate cost for a request.
"""
model = model or self.default_model
pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
input_tokens = self.count_tokens(prompt)
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (expected_output_tokens / 1_000_000) * pricing["output"]
return CostEstimate(
input_tokens=input_tokens,
output_tokens=expected_output_tokens,
input_cost=input_cost,
output_cost=output_cost,
total_cost=input_cost + output_cost
)
def select_model(
self,
task_complexity: str,
budget_per_request: float = 0.01
) -> str:
"""
Select appropriate model based on task and budget.
task_complexity: "simple", "medium", "complex"
"""
model_tiers = {
"simple": ["gemini-2.0-flash", "claude-3-5-haiku-20241022", "gpt-4o-mini"],
"medium": ["gpt-4o-mini", "claude-sonnet-4-20250514"],
"complex": ["claude-sonnet-4-20250514", "gpt-4o"]
}
candidates = model_tiers.get(task_complexity, model_tiers["medium"])
# Select cheapest that fits budget
for model in candidates:
# Rough estimate for 1000 input + 500 output tokens
cost = self.estimate_cost("x" * 4000, 500, model).total_cost
if cost <= budget_per_request:
return model
return candidates[-1] # Return most capable if budget allows
@staticmethod
def optimize_prompt(prompt: str, max_tokens: int = 4000) -> str:
"""
Compress prompt to reduce costs.
Strategies:
1. Remove unnecessary whitespace
2. Use abbreviations
3. Remove redundant instructions
"""
# Remove extra whitespace
import re
prompt = re.sub(r'\n\s*\n', '\n\n', prompt)
prompt = re.sub(r' +', ' ', prompt)
# Truncate if too long
encoder = tiktoken.get_encoding("cl100k_base")
tokens = encoder.encode(prompt)
if len(tokens) > max_tokens:
# Keep beginning and end, truncate middle
keep = max_tokens // 2
tokens = tokens[:keep] + tokens[-keep:]
prompt = encoder.decode(tokens)
return prompt.strip()
# Caching for cost reduction
from functools import lru_cache
import hashlib
class CachedLLM:
"""
LLM client with response caching.
Caching identical requests can reduce costs by 50-90%
for applications with repeated queries.
"""
def __init__(self, client, cache_size: int = 1000):
self.client = client
self.cache = {}
self.cache_size = cache_size
self.cache_hits = 0
self.total_requests = 0
def _cache_key(self, model: str, messages: list, **kwargs) -> str:
"""Generate cache key from request parameters."""
key_data = {
"model": model,
"messages": messages,
**kwargs
}
key_str = str(sorted(key_data.items()))
return hashlib.md5(key_str.encode()).hexdigest()
def create(self, model: str, messages: list, **kwargs):
"""
Create completion with caching.
"""
self.total_requests += 1
# Check cache
cache_key = self._cache_key(model, messages, **kwargs)
if cache_key in self.cache:
self.cache_hits += 1
return self.cache[cache_key]
# Make request
response = self.client.messages.create(
model=model,
messages=messages,
**kwargs
)
# Cache response
if len(self.cache) >= self.cache_size:
# Simple LRU: remove oldest
oldest = next(iter(self.cache))
del self.cache[oldest]
self.cache[cache_key] = response
return response
@property
def cache_hit_rate(self) -> float:
"""Return cache hit rate."""
if self.total_requests == 0:
return 0.0
return self.cache_hits / self.total_requests
5.2 Monitoring and Observability
"""
LLM application monitoring and logging.
"""
import time
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime
import json
@dataclass
class LLMRequestLog:
"""Log entry for an LLM request."""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
latency_ms: float
status: str
error: str = None
metadata: Dict[str, Any] = field(default_factory=dict)
class LLMMonitor:
"""
Monitor LLM requests for cost, latency, and errors.
"""
def __init__(self):
self.logs: List[LLMRequestLog] = []
self.logger = logging.getLogger("llm_monitor")
def log_request(
self,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
status: str = "success",
error: str = None,
**metadata
):
"""Log an LLM request."""
log = LLMRequestLog(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
status=status,
error=error,
metadata=metadata
)
self.logs.append(log)
# Also log to standard logger
self.logger.info(json.dumps({
"timestamp": log.timestamp.isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_ms": latency_ms,
"status": status,
"error": error
}))
def get_metrics(self, window_hours: int = 24) -> Dict[str, Any]:
"""Get aggregated metrics."""
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=window_hours)
recent_logs = [l for l in self.logs if l.timestamp > cutoff]
if not recent_logs:
return {"message": "No logs in window"}
total_input = sum(l.input_tokens for l in recent_logs)
total_output = sum(l.output_tokens for l in recent_logs)
avg_latency = sum(l.latency_ms for l in recent_logs) / len(recent_logs)
error_rate = sum(1 for l in recent_logs if l.status == "error") / len(recent_logs)
# Per-model breakdown
models = {}
for log in recent_logs:
if log.model not in models:
models[log.model] = {"requests": 0, "input_tokens": 0, "output_tokens": 0}
models[log.model]["requests"] += 1
models[log.model]["input_tokens"] += log.input_tokens
models[log.model]["output_tokens"] += log.output_tokens
return {
"window_hours": window_hours,
"total_requests": len(recent_logs),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"avg_latency_ms": round(avg_latency, 2),
"error_rate": round(error_rate, 4),
"per_model": models
}
# Wrapper for automatic monitoring
class MonitoredClient:
"""
LLM client wrapper with automatic monitoring.
"""
def __init__(self, client, monitor: LLMMonitor):
self.client = client
self.monitor = monitor
def create(self, model: str, messages: list, **kwargs):
"""Create completion with monitoring."""
start_time = time.time()
status = "success"
error = None
try:
response = self.client.messages.create(
model=model,
messages=messages,
**kwargs
)
return response
except Exception as e:
status = "error"
error = str(e)
raise
finally:
latency_ms = (time.time() - start_time) * 1000
# Extract token counts (varies by provider)
input_tokens = getattr(response, 'usage', {}).get('input_tokens', 0) if 'response' in dir() else 0
output_tokens = getattr(response, 'usage', {}).get('output_tokens', 0) if 'response' in dir() else 0
self.monitor.log_request(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
status=status,
error=error
)
5.3 Production Checklist
LLM Application Production Checklist
Security:
- API keys stored in secure vault (not in code)
- Input sanitization to prevent prompt injection
- Output filtering for sensitive data
- Rate limiting per user/API key
- Audit logging for compliance
Reliability:
- Retry logic with exponential backoff
- Circuit breaker for provider outages
- Fallback models (e.g., GPT-4o → GPT-4o-mini)
- Timeout configuration
- Graceful degradation
Performance:
- Response caching where appropriate
- Prompt optimization for token efficiency
- Batch processing for bulk operations
- Async processing for non-blocking operations
- Connection pooling
Monitoring:
- Request/response logging
- Cost tracking per user/feature
- Latency monitoring with alerts
- Error rate dashboards
- Model performance tracking
Summary
Chapter 5 Key Takeaways
- RAG: Enhances LLMs with external knowledge; hybrid search + reranking for best results
- MCP: Standardized protocol for tool integration; enables portable, composable AI applications
- Agents: LLMs + reasoning + tools + memory; ReAct pattern for step-by-step problem solving
- Multimodal: Vision-language models enable image understanding, document processing, visual Q&A
- Production: Focus on cost optimization, monitoring, security, and reliability for real-world deployment