🌐 EN | 🇯🇵 JP | Last sync: 2026-01-12

Chapter 3: Multi-Agent Systems

Solving Complex Tasks through Collaboration of Multiple Agents

📖 Reading Time: 30-35 minutes 📊 Difficulty: Advanced 💻 Code Examples: 5

This chapter covers Multi. You will learn essential concepts and techniques.

What are Multi-Agent Systems?

Why Multiple Agents are Needed

Complex tasks that are difficult to solve with a single agent can be processed more efficiently and with higher quality when multiple specialized agents collaborate.

Advantages of Multi-Agent Systems:

Types of Multi-Agent Architectures

Architecture Characteristics Application Scenarios
Parallel Agents execute independently in parallel Data collection, multi-perspective analysis
Sequential Agents hand over processing in sequence Pipeline processing, incremental refinement
Hierarchical Manager controls subordinate workers Separation of complex planning and execution
Interactive Agents discuss and negotiate among themselves Decision making, consensus building

Multi-Agent Design

Agent Role Distribution

graph TD M[Manager Agent
Task decomposition and coordination] --> R[Researcher Agent
Information gathering] M --> W[Writer Agent
Document creation] M --> C[Critic Agent
Review and evaluation] R --> M W --> M C --> M style M fill:#e3f2fd style R fill:#fff3e0 style W fill:#f3e5f5 style C fill:#e8f5e9

Example of Role-Based Design

from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class AgentRole:
    """Agent role definition"""
    name: str
    description: str
    capabilities: List[str]
    system_prompt: str

# Agent role definitions
RESEARCHER_ROLE = AgentRole(
    name="Researcher",
    description="Expert in information gathering and analysis",
    capabilities=["web_search", "database_query", "data_analysis"],
    system_prompt="""You are an excellent researcher.

Role:
- Gather related information from web searches and databases
- Evaluate the credibility of collected information
- Summarize key points and report to the team

Important points:
- Clearly cite information sources
- Cross-check multiple information sources
- Explicitly indicate uncertain information"""
)

WRITER_ROLE = AgentRole(
    name="Writer",
    description="Expert in document creation",
    capabilities=["content_generation", "formatting", "editing"],
    system_prompt="""You are an excellent writer.

Role:
- Create high-quality documents based on researcher's information
- Readable and logical structure
- Writing style and tone appropriate for the target audience

Important points:
- Clear and concise expression
- Appropriate heading and paragraph structure
- Proper use of citations and references"""
)

CRITIC_ROLE = AgentRole(
    name="Critic",
    description="Expert in quality review",
    capabilities=["quality_check", "fact_verification", "feedback"],
    system_prompt="""You are a reviewer with critical thinking skills.

Role:
- Critically review created documents
- Verify factual accuracy
- Provide specific improvement points

Important points:
- Constructive feedback
- Specific improvement suggestions
- Clearly point out serious issues"""
)

Communication Protocols

Message Passing

Communication between agents is conducted through structured messages.

from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    """Message types"""
    TASK = "task"              # Task instruction
    RESULT = "result"          # Task result
    QUERY = "query"            # Information request
    RESPONSE = "response"      # Information response
    ERROR = "error"            # Error notification
    STATUS = "status"          # Status update

@dataclass
class Message:
    """Inter-agent message"""
    type: MessageType
    sender: str
    receiver: str
    content: Dict[str, Any]
    timestamp: datetime
    message_id: str
    reply_to: Optional[str] = None

class MessageBus:
    """Message bus (inter-agent communication)"""

    def __init__(self):
        self.messages: List[Message] = []
        self.subscribers: Dict[str, List[callable]] = {}

    def subscribe(self, agent_name: str, callback: callable):
        """Register agent for message reception"""
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(callback)

    def publish(self, message: Message):
        """Deliver message"""
        self.messages.append(message)

        # Deliver message to receiver
        if message.receiver in self.subscribers:
            for callback in self.subscribers[message.receiver]:
                callback(message)

    def broadcast(self, message: Message):
        """Broadcast to all agents"""
        self.messages.append(message)

        for agent_name, callbacks in self.subscribers.items():
            if agent_name != message.sender:
                for callback in callbacks:
                    callback(message)

# Usage example
import uuid

bus = MessageBus()

def researcher_receive(message: Message):
    print(f"Researcher received: {message.type.value} from {message.sender}")

def writer_receive(message: Message):
    print(f"Writer received: {message.type.value} from {message.sender}")

# Register agents
bus.subscribe("researcher", researcher_receive)
bus.subscribe("writer", writer_receive)

# Send message
task_message = Message(
    type=MessageType.TASK,
    sender="manager",
    receiver="researcher",
    content={"task": "Research AI trends in 2024"},
    timestamp=datetime.now(),
    message_id=str(uuid.uuid4())
)

bus.publish(task_message)

Shared Memory Approach

from typing import Dict, Any, Optional
import threading

class SharedMemory:
    """Shared memory between agents"""

    def __init__(self):
        self.data: Dict[str, Any] = {}
        self.lock = threading.Lock()
        self.subscribers: Dict[str, List[callable]] = {}

    def write(self, key: str, value: Any, agent_name: str):
        """Write data"""
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_name,
                "timestamp": datetime.now()
            }

            # Notify changes
            self._notify_subscribers(key, value, agent_name)

    def read(self, key: str) -> Optional[Any]:
        """Read data"""
        with self.lock:
            if key in self.data:
                return self.data[key]["value"]
            return None

    def subscribe_to_key(self, key: str, callback: callable):
        """Watch for changes to a specific key"""
        if key not in self.subscribers:
            self.subscribers[key] = []
        self.subscribers[key].append(callback)

    def _notify_subscribers(self, key: str, value: Any, agent_name: str):
        """Notify subscribers"""
        if key in self.subscribers:
            for callback in self.subscribers[key]:
                callback(key, value, agent_name)

# Usage example
memory = SharedMemory()

def on_research_complete(key, value, agent_name):
    print(f"Research completed by {agent_name}: {value}")

memory.subscribe_to_key("research_result", on_research_complete)

# Researcher writes result
memory.write("research_result", "Major AI trends in 2024...", "researcher")

Collaboration Patterns

1. Parallel Execution Pattern

import asyncio
from typing import List, Dict, Any

class ParallelAgentSystem:
    """Parallel execution agent system"""

    def __init__(self, agents: List[Any]):
        self.agents = agents

    async def execute_parallel(self, task: str) -> List[Dict[str, Any]]:
        """Execute all agents in parallel"""
        tasks = [
            agent.process(task)
            for agent in self.agents
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Aggregate results
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Agent {i} failed: {str(result)}")
            else:
                successful_results.append({
                    "agent": self.agents[i].name,
                    "result": result
                })

        return successful_results

# Usage example (pseudo-code)
class ResearchAgent:
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty

    async def process(self, query: str) -> Dict[str, Any]:
        # Execute research asynchronously
        await asyncio.sleep(1)  # Simulate API call
        return {
            "specialty": self.specialty,
            "findings": f"Research results on {query} regarding {self.specialty}"
        }

# Execute multiple specialized agents in parallel
agents = [
    ResearchAgent("Tech Researcher", "Technology trends"),
    ResearchAgent("Market Researcher", "Market analysis"),
    ResearchAgent("Academic Researcher", "Academic research")
]

system = ParallelAgentSystem(agents)
results = asyncio.run(system.execute_parallel("AI in 2024"))
print(results)

2. Sequential Execution (Pipeline) Pattern

from typing import Any, List, Callable

class SequentialAgentSystem:
    """Sequential execution agent system (pipeline)"""

    def __init__(self):
        self.pipeline: List[Callable] = []

    def add_stage(self, agent: Callable):
        """Add agent to pipeline"""
        self.pipeline.append(agent)

    def execute(self, initial_input: Any) -> Any:
        """Execute pipeline"""
        current_data = initial_input

        for i, agent in enumerate(self.pipeline):
            print(f"Stage {i+1}: {agent.__name__}")
            current_data = agent(current_data)
            print(f"  Output: {current_data}\n")

        return current_data

# Agents for each pipeline stage
def data_collector(query: str) -> Dict[str, Any]:
    """Stage 1: Data collection"""
    return {
        "query": query,
        "raw_data": f"Raw data regarding {query}...",
        "sources": ["source1", "source2"]
    }

def data_analyzer(data: Dict[str, Any]) -> Dict[str, Any]:
    """Stage 2: Data analysis"""
    data["analysis"] = "Analysis results: Main trends are..."
    data["insights"] = ["Insight 1", "Insight 2"]
    return data

def report_generator(data: Dict[str, Any]) -> str:
    """Stage 3: Report generation"""
    report = f"""
Research Report: {data['query']}

Analysis Results:
{data['analysis']}

Key Insights:
- {data['insights'][0]}
- {data['insights'][1]}

Sources: {', '.join(data['sources'])}
    """
    return report.strip()

# Build and execute pipeline
pipeline = SequentialAgentSystem()
pipeline.add_stage(data_collector)
pipeline.add_stage(data_analyzer)
pipeline.add_stage(report_generator)

final_report = pipeline.execute("Latest AI agent trends")
print("=== Final Report ===")
print(final_report)

3. Hierarchical (Manager-Worker) Pattern

from typing import List, Dict, Any
from openai import OpenAI

class ManagerAgent:
    """Manager agent (task decomposition and coordination)"""

    def __init__(self, api_key: str, workers: List[Any]):
        self.client = OpenAI(api_key=api_key)
        self.workers = workers
        self.task_history = []

    def execute(self, user_request: str) -> str:
        """Process user request"""
        # Step 1: Decompose task
        subtasks = self.decompose_task(user_request)

        # Step 2: Delegate to workers
        results = self.delegate_to_workers(subtasks)

        # Step 3: Synthesize results
        final_result = self.synthesize_results(user_request, results)

        return final_result

    def decompose_task(self, request: str) -> List[Dict[str, Any]]:
        """Decompose task into subtasks"""
        worker_capabilities = "\n".join([
            f"- {w.name}: {w.capabilities}"
            for w in self.workers
        ])

        prompt = f"""Please decompose the following request into subtasks to be assigned to available workers.

Request: {request}

Available workers:
{worker_capabilities}

Output each subtask in the following format:
1. [Worker name] Task description
2. [Worker name] Task description
..."""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )

        # Parse subtasks (simplified version)
        subtasks = []
        for line in response.choices[0].message.content.split('\n'):
            if line.strip() and line[0].isdigit():
                parts = line.split(']', 1)
                if len(parts) == 2:
                    worker_name = parts[0].split('[')[1].strip()
                    task_desc = parts[1].strip()
                    subtasks.append({
                        "worker": worker_name,
                        "task": task_desc
                    })

        return subtasks

    def delegate_to_workers(self, subtasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Delegate tasks to workers"""
        results = []

        for subtask in subtasks:
            worker_name = subtask["worker"]
            task = subtask["task"]

            # Find the corresponding worker
            worker = next((w for w in self.workers if w.name == worker_name), None)

            if worker:
                result = worker.execute(task)
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": result
                })
            else:
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": f"Error: Worker {worker_name} not found"
                })

        return results

    def synthesize_results(self, original_request: str, results: List[Dict[str, Any]]) -> str:
        """Synthesize results and generate final answer"""
        results_text = "\n\n".join([
            f"{r['worker']}'s results:\n{r['result']}"
            for r in results
        ])

        prompt = f"""Please synthesize the results from each worker and generate a final answer for the following request.

Original request: {original_request}

Worker results:
{results_text}

Generate the synthesized answer:"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )

        return response.choices[0].message.content

class WorkerAgent:
    """Worker agent"""

    def __init__(self, name: str, capabilities: str, system_prompt: str, api_key: str):
        self.name = name
        self.capabilities = capabilities
        self.system_prompt = system_prompt
        self.client = OpenAI(api_key=api_key)

    def execute(self, task: str) -> str:
        """Execute task"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": task}
            ],
            temperature=0.7
        )

        return response.choices[0].message.content

# Usage example
researcher = WorkerAgent(
    name="Researcher",
    capabilities="Web search, data collection",
    system_prompt="You are a research expert.",
    api_key="your-api-key"
)

writer = WorkerAgent(
    name="Writer",
    capabilities="Document creation, editing",
    system_prompt="You are a document creation expert.",
    api_key="your-api-key"
)

manager = ManagerAgent(
    api_key="your-api-key",
    workers=[researcher, writer]
)

result = manager.execute("Please create a 1000-word report on AI trends in 2024")
print(result)

Orchestration Strategies

Dynamic Task Assignment

from typing import List, Dict, Any
import time

class TaskOrchestrator:
    """Task orchestrator"""

    def __init__(self, agents: List[Any]):
        self.agents = agents
        self.task_queue = []
        self.agent_status = {agent.name: "idle" for agent in agents}

    def add_task(self, task: Dict[str, Any]):
        """Add task to queue"""
        self.task_queue.append(task)

    def get_available_agent(self, required_capability: str = None):
        """Get available agent"""
        for agent in self.agents:
            if self.agent_status[agent.name] == "idle":
                if required_capability is None or required_capability in agent.capabilities:
                    return agent
        return None

    def execute_tasks(self):
        """Process task queue"""
        while self.task_queue:
            task = self.task_queue.pop(0)

            # Find appropriate agent
            agent = self.get_available_agent(task.get("required_capability"))

            if agent:
                print(f"Assigning task '{task['name']}' to {agent.name}")
                self.agent_status[agent.name] = "busy"

                # Execute task (assuming asynchronous)
                result = agent.execute(task)

                self.agent_status[agent.name] = "idle"
                print(f"{agent.name} completed task '{task['name']}'")
            else:
                # If agent unavailable, return to queue
                self.task_queue.append(task)
                time.sleep(1)

State Management and Conflict Resolution

Distributed State Synchronization

from typing import Dict, Any, Optional
from datetime import datetime
import json

class StateManager:
    """State management between agents"""

    def __init__(self):
        self.state: Dict[str, Any] = {}
        self.version: Dict[str, int] = {}
        self.history: List[Dict[str, Any]] = []

    def update_state(self, key: str, value: Any, agent_name: str) -> bool:
        """Update state (with versioning)"""
        current_version = self.version.get(key, 0)

        # Record update
        update_record = {
            "key": key,
            "value": value,
            "agent": agent_name,
            "version": current_version + 1,
            "timestamp": datetime.now().isoformat()
        }

        self.state[key] = value
        self.version[key] = current_version + 1
        self.history.append(update_record)

        return True

    def get_state(self, key: str, version: Optional[int] = None) -> Optional[Any]:
        """Get state (with specific version support)"""
        if version is None:
            return self.state.get(key)

        # Search for specific version in history
        for record in reversed(self.history):
            if record["key"] == key and record["version"] == version:
                return record["value"]

        return None

    def resolve_conflict(self, key: str, conflicting_values: List[Dict[str, Any]]) -> Any:
        """Resolve conflict"""
        # Adopt value with latest timestamp (Last-Write-Wins)
        latest = max(conflicting_values, key=lambda x: x["timestamp"])
        return latest["value"]

# Usage example
state_manager = StateManager()

# Multiple agents update same key
state_manager.update_state("document_title", "Introduction to AI Agents", "agent1")
state_manager.update_state("document_title", "Complete Guide to AI Agents", "agent2")

# Get latest value
current_title = state_manager.get_state("document_title")
print(f"Current title: {current_title}")

# Check history
print("\nUpdate history:")
for record in state_manager.history:
    print(f"  v{record['version']}: {record['value']} (by {record['agent']})")

Modern Multi-Agent Frameworks (2025)

Framework Comparison

The multi-agent ecosystem has matured with several production-ready frameworks:

Framework Architecture Best For Key Feature
LangGraph State machine / Graph Complex workflows with branching Explicit state transitions, cycles
AutoGen Conversational Research, collaborative problem-solving Agent-to-agent dialogue
CrewAI Role-based crews Enterprise automation Pre-defined roles, task delegation
OpenAI Swarm Handoff-based Simple multi-agent routing Lightweight, educational

CrewAI Example

from crewai import Agent, Task, Crew

# Define agents with specific roles
researcher = Agent(
    role="Senior Research Analyst",
    goal="Discover cutting-edge developments in AI",
    backstory="Expert at finding and synthesizing information",
    tools=[search_tool, web_scraper],
    llm="gpt-4"
)

writer = Agent(
    role="Technical Writer",
    goal="Create engaging technical content",
    backstory="Skilled at explaining complex topics simply",
    tools=[],
    llm="gpt-4"
)

# Define tasks
research_task = Task(
    description="Research the latest trends in multi-agent AI systems",
    agent=researcher,
    expected_output="Detailed research report with key findings"
)

writing_task = Task(
    description="Write a blog post based on the research",
    agent=writer,
    expected_output="Engaging 1000-word blog post",
    context=[research_task]  # Depends on research output
)

# Create and run crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=True
)

result = crew.kickoff()

Reflection and Self-Critique Patterns

Why Reflection Matters

Reflection enables agents to evaluate and improve their own outputs, leading to higher quality results without human intervention.

graph LR A[Generate] --> B[Evaluate] B --> C{Good enough?} C -->|No| D[Critique] D --> A C -->|Yes| E[Output] style A fill:#e3f2fd style B fill:#fff3e0 style D fill:#f3e5f5 style E fill:#e8f5e9

Self-Critique Implementation

class ReflectiveAgent:
    """Agent with self-reflection capabilities"""

    def __init__(self, llm, max_iterations=3):
        self.llm = llm
        self.max_iterations = max_iterations

    def generate_with_reflection(self, task: str) -> str:
        """Generate output with iterative self-improvement"""
        output = self.generate(task)

        for i in range(self.max_iterations):
            # Self-critique
            critique = self.critique(task, output)

            if critique["is_satisfactory"]:
                break

            # Improve based on critique
            output = self.improve(task, output, critique["feedback"])

        return output

    def critique(self, task: str, output: str) -> dict:
        """Evaluate output quality"""
        prompt = f"""
        Task: {task}
        Output: {output}

        Evaluate this output:
        1. Does it fully address the task?
        2. Is it accurate and well-reasoned?
        3. What could be improved?

        Return JSON: {{"is_satisfactory": bool, "feedback": str}}
        """
        return self.llm.generate(prompt, json_mode=True)

    def improve(self, task: str, output: str, feedback: str) -> str:
        """Improve output based on feedback"""
        prompt = f"""
        Task: {task}
        Previous output: {output}
        Feedback: {feedback}

        Generate an improved version addressing the feedback.
        """
        return self.llm.generate(prompt)

Agent Memory Systems

Types of Memory

Type Scope Use Case
Working Memory Current conversation Context window, recent messages
Episodic Memory Past experiences Similar task recall, learning from history
Semantic Memory Factual knowledge Domain knowledge, entity relationships
Procedural Memory Skills and procedures Learned workflows, tool usage patterns

Memory Implementation Pattern

from typing import List, Dict
import numpy as np
from sentence_transformers import SentenceTransformer

class AgentMemory:
    """Multi-tier memory system for agents"""

    def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
        self.encoder = SentenceTransformer(embedding_model)
        self.episodic: List[Dict] = []  # Past experiences
        self.semantic: Dict = {}         # Knowledge base
        self.working: List[Dict] = []    # Current context

    def add_experience(self, experience: Dict):
        """Store an experience with embedding for retrieval"""
        text = experience.get("description", str(experience))
        embedding = self.encoder.encode(text)
        self.episodic.append({
            **experience,
            "embedding": embedding
        })

    def recall_similar(self, query: str, k: int = 5) -> List[Dict]:
        """Retrieve k most similar past experiences"""
        if not self.episodic:
            return []

        query_embedding = self.encoder.encode(query)
        similarities = [
            np.dot(query_embedding, exp["embedding"])
            for exp in self.episodic
        ]

        top_indices = np.argsort(similarities)[-k:][::-1]
        return [self.episodic[i] for i in top_indices]

    def update_knowledge(self, key: str, value: any):
        """Update semantic memory"""
        self.semantic[key] = value

    def get_context(self, query: str) -> str:
        """Build context from all memory types"""
        similar = self.recall_similar(query, k=3)
        relevant_knowledge = {k: v for k, v in self.semantic.items()
                             if k.lower() in query.lower()}

        return f"""
        Similar past experiences: {similar}
        Relevant knowledge: {relevant_knowledge}
        Recent context: {self.working[-5:]}
        """

Summary

What We Learned in This Chapter

Design Principles

Effective multi-agent systems are realized through clear role distribution, efficient communication, appropriate orchestration, and robust state management

Disclaimer

⚠️ Please Help Us Improve Content Quality

This content was created with AI assistance. If you find errors or areas for improvement, please report them using one of the following methods: