🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 3: Multi-Agent Systems

Solving Complex Tasks through Collaboration of Multiple Agents

📖 Reading Time: 30-35 minutes 📊 Difficulty: Advanced 💻 Code Examples: 5

This chapter covers Multi. You will learn essential concepts and techniques.

What are Multi-Agent Systems?

Why Multiple Agents are Needed

Complex tasks that are difficult to solve with a single agent can be processed more efficiently and with higher quality when multiple specialized agents collaborate.

Advantages of Multi-Agent Systems:

Types of Multi-Agent Architectures

Architecture Characteristics Application Scenarios
Parallel Agents execute independently in parallel Data collection, multi-perspective analysis
Sequential Agents hand over processing in sequence Pipeline processing, incremental refinement
Hierarchical Manager controls subordinate workers Separation of complex planning and execution
Interactive Agents discuss and negotiate among themselves Decision making, consensus building

Multi-Agent Design

Agent Role Distribution

graph TD M[Manager Agent
Task decomposition and coordination] --> R[Researcher Agent
Information gathering] M --> W[Writer Agent
Document creation] M --> C[Critic Agent
Review and evaluation] R --> M W --> M C --> M style M fill:#e3f2fd style R fill:#fff3e0 style W fill:#f3e5f5 style C fill:#e8f5e9

Example of Role-Based Design

from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class AgentRole:
    """Agent role definition"""
    name: str
    description: str
    capabilities: List[str]
    system_prompt: str

# Agent role definitions
RESEARCHER_ROLE = AgentRole(
    name="Researcher",
    description="Expert in information gathering and analysis",
    capabilities=["web_search", "database_query", "data_analysis"],
    system_prompt="""You are an excellent researcher.

Role:
- Gather related information from web searches and databases
- Evaluate the credibility of collected information
- Summarize key points and report to the team

Important points:
- Clearly cite information sources
- Cross-check multiple information sources
- Explicitly indicate uncertain information"""
)

WRITER_ROLE = AgentRole(
    name="Writer",
    description="Expert in document creation",
    capabilities=["content_generation", "formatting", "editing"],
    system_prompt="""You are an excellent writer.

Role:
- Create high-quality documents based on researcher's information
- Readable and logical structure
- Writing style and tone appropriate for the target audience

Important points:
- Clear and concise expression
- Appropriate heading and paragraph structure
- Proper use of citations and references"""
)

CRITIC_ROLE = AgentRole(
    name="Critic",
    description="Expert in quality review",
    capabilities=["quality_check", "fact_verification", "feedback"],
    system_prompt="""You are a reviewer with critical thinking skills.

Role:
- Critically review created documents
- Verify factual accuracy
- Provide specific improvement points

Important points:
- Constructive feedback
- Specific improvement suggestions
- Clearly point out serious issues"""
)

Communication Protocols

Message Passing

Communication between agents is conducted through structured messages.

from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    """Message types"""
    TASK = "task"              # Task instruction
    RESULT = "result"          # Task result
    QUERY = "query"            # Information request
    RESPONSE = "response"      # Information response
    ERROR = "error"            # Error notification
    STATUS = "status"          # Status update

@dataclass
class Message:
    """Inter-agent message"""
    type: MessageType
    sender: str
    receiver: str
    content: Dict[str, Any]
    timestamp: datetime
    message_id: str
    reply_to: Optional[str] = None

class MessageBus:
    """Message bus (inter-agent communication)"""

    def __init__(self):
        self.messages: List[Message] = []
        self.subscribers: Dict[str, List[callable]] = {}

    def subscribe(self, agent_name: str, callback: callable):
        """Register agent for message reception"""
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(callback)

    def publish(self, message: Message):
        """Deliver message"""
        self.messages.append(message)

        # Deliver message to receiver
        if message.receiver in self.subscribers:
            for callback in self.subscribers[message.receiver]:
                callback(message)

    def broadcast(self, message: Message):
        """Broadcast to all agents"""
        self.messages.append(message)

        for agent_name, callbacks in self.subscribers.items():
            if agent_name != message.sender:
                for callback in callbacks:
                    callback(message)

# Usage example
import uuid

bus = MessageBus()

def researcher_receive(message: Message):
    print(f"Researcher received: {message.type.value} from {message.sender}")

def writer_receive(message: Message):
    print(f"Writer received: {message.type.value} from {message.sender}")

# Register agents
bus.subscribe("researcher", researcher_receive)
bus.subscribe("writer", writer_receive)

# Send message
task_message = Message(
    type=MessageType.TASK,
    sender="manager",
    receiver="researcher",
    content={"task": "Research AI trends in 2024"},
    timestamp=datetime.now(),
    message_id=str(uuid.uuid4())
)

bus.publish(task_message)

Shared Memory Approach

from typing import Dict, Any, Optional
import threading

class SharedMemory:
    """Shared memory between agents"""

    def __init__(self):
        self.data: Dict[str, Any] = {}
        self.lock = threading.Lock()
        self.subscribers: Dict[str, List[callable]] = {}

    def write(self, key: str, value: Any, agent_name: str):
        """Write data"""
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_name,
                "timestamp": datetime.now()
            }

            # Notify changes
            self._notify_subscribers(key, value, agent_name)

    def read(self, key: str) -> Optional[Any]:
        """Read data"""
        with self.lock:
            if key in self.data:
                return self.data[key]["value"]
            return None

    def subscribe_to_key(self, key: str, callback: callable):
        """Watch for changes to a specific key"""
        if key not in self.subscribers:
            self.subscribers[key] = []
        self.subscribers[key].append(callback)

    def _notify_subscribers(self, key: str, value: Any, agent_name: str):
        """Notify subscribers"""
        if key in self.subscribers:
            for callback in self.subscribers[key]:
                callback(key, value, agent_name)

# Usage example
memory = SharedMemory()

def on_research_complete(key, value, agent_name):
    print(f"Research completed by {agent_name}: {value}")

memory.subscribe_to_key("research_result", on_research_complete)

# Researcher writes result
memory.write("research_result", "Major AI trends in 2024...", "researcher")

Collaboration Patterns

1. Parallel Execution Pattern

import asyncio
from typing import List, Dict, Any

class ParallelAgentSystem:
    """Parallel execution agent system"""

    def __init__(self, agents: List[Any]):
        self.agents = agents

    async def execute_parallel(self, task: str) -> List[Dict[str, Any]]:
        """Execute all agents in parallel"""
        tasks = [
            agent.process(task)
            for agent in self.agents
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Aggregate results
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Agent {i} failed: {str(result)}")
            else:
                successful_results.append({
                    "agent": self.agents[i].name,
                    "result": result
                })

        return successful_results

# Usage example (pseudo-code)
class ResearchAgent:
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty

    async def process(self, query: str) -> Dict[str, Any]:
        # Execute research asynchronously
        await asyncio.sleep(1)  # Simulate API call
        return {
            "specialty": self.specialty,
            "findings": f"Research results on {query} regarding {self.specialty}"
        }

# Execute multiple specialized agents in parallel
agents = [
    ResearchAgent("Tech Researcher", "Technology trends"),
    ResearchAgent("Market Researcher", "Market analysis"),
    ResearchAgent("Academic Researcher", "Academic research")
]

system = ParallelAgentSystem(agents)
results = asyncio.run(system.execute_parallel("AI in 2024"))
print(results)

2. Sequential Execution (Pipeline) Pattern

from typing import Any, List, Callable

class SequentialAgentSystem:
    """Sequential execution agent system (pipeline)"""

    def __init__(self):
        self.pipeline: List[Callable] = []

    def add_stage(self, agent: Callable):
        """Add agent to pipeline"""
        self.pipeline.append(agent)

    def execute(self, initial_input: Any) -> Any:
        """Execute pipeline"""
        current_data = initial_input

        for i, agent in enumerate(self.pipeline):
            print(f"Stage {i+1}: {agent.__name__}")
            current_data = agent(current_data)
            print(f"  Output: {current_data}\n")

        return current_data

# Agents for each pipeline stage
def data_collector(query: str) -> Dict[str, Any]:
    """Stage 1: Data collection"""
    return {
        "query": query,
        "raw_data": f"Raw data regarding {query}...",
        "sources": ["source1", "source2"]
    }

def data_analyzer(data: Dict[str, Any]) -> Dict[str, Any]:
    """Stage 2: Data analysis"""
    data["analysis"] = "Analysis results: Main trends are..."
    data["insights"] = ["Insight 1", "Insight 2"]
    return data

def report_generator(data: Dict[str, Any]) -> str:
    """Stage 3: Report generation"""
    report = f"""
Research Report: {data['query']}

Analysis Results:
{data['analysis']}

Key Insights:
- {data['insights'][0]}
- {data['insights'][1]}

Sources: {', '.join(data['sources'])}
    """
    return report.strip()

# Build and execute pipeline
pipeline = SequentialAgentSystem()
pipeline.add_stage(data_collector)
pipeline.add_stage(data_analyzer)
pipeline.add_stage(report_generator)

final_report = pipeline.execute("Latest AI agent trends")
print("=== Final Report ===")
print(final_report)

3. Hierarchical (Manager-Worker) Pattern

from typing import List, Dict, Any
from openai import OpenAI

class ManagerAgent:
    """Manager agent (task decomposition and coordination)"""

    def __init__(self, api_key: str, workers: List[Any]):
        self.client = OpenAI(api_key=api_key)
        self.workers = workers
        self.task_history = []

    def execute(self, user_request: str) -> str:
        """Process user request"""
        # Step 1: Decompose task
        subtasks = self.decompose_task(user_request)

        # Step 2: Delegate to workers
        results = self.delegate_to_workers(subtasks)

        # Step 3: Synthesize results
        final_result = self.synthesize_results(user_request, results)

        return final_result

    def decompose_task(self, request: str) -> List[Dict[str, Any]]:
        """Decompose task into subtasks"""
        worker_capabilities = "\n".join([
            f"- {w.name}: {w.capabilities}"
            for w in self.workers
        ])

        prompt = f"""Please decompose the following request into subtasks to be assigned to available workers.

Request: {request}

Available workers:
{worker_capabilities}

Output each subtask in the following format:
1. [Worker name] Task description
2. [Worker name] Task description
..."""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )

        # Parse subtasks (simplified version)
        subtasks = []
        for line in response.choices[0].message.content.split('\n'):
            if line.strip() and line[0].isdigit():
                parts = line.split(']', 1)
                if len(parts) == 2:
                    worker_name = parts[0].split('[')[1].strip()
                    task_desc = parts[1].strip()
                    subtasks.append({
                        "worker": worker_name,
                        "task": task_desc
                    })

        return subtasks

    def delegate_to_workers(self, subtasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Delegate tasks to workers"""
        results = []

        for subtask in subtasks:
            worker_name = subtask["worker"]
            task = subtask["task"]

            # Find the corresponding worker
            worker = next((w for w in self.workers if w.name == worker_name), None)

            if worker:
                result = worker.execute(task)
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": result
                })
            else:
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": f"Error: Worker {worker_name} not found"
                })

        return results

    def synthesize_results(self, original_request: str, results: List[Dict[str, Any]]) -> str:
        """Synthesize results and generate final answer"""
        results_text = "\n\n".join([
            f"{r['worker']}'s results:\n{r['result']}"
            for r in results
        ])

        prompt = f"""Please synthesize the results from each worker and generate a final answer for the following request.

Original request: {original_request}

Worker results:
{results_text}

Generate the synthesized answer:"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )

        return response.choices[0].message.content

class WorkerAgent:
    """Worker agent"""

    def __init__(self, name: str, capabilities: str, system_prompt: str, api_key: str):
        self.name = name
        self.capabilities = capabilities
        self.system_prompt = system_prompt
        self.client = OpenAI(api_key=api_key)

    def execute(self, task: str) -> str:
        """Execute task"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": task}
            ],
            temperature=0.7
        )

        return response.choices[0].message.content

# Usage example
researcher = WorkerAgent(
    name="Researcher",
    capabilities="Web search, data collection",
    system_prompt="You are a research expert.",
    api_key="your-api-key"
)

writer = WorkerAgent(
    name="Writer",
    capabilities="Document creation, editing",
    system_prompt="You are a document creation expert.",
    api_key="your-api-key"
)

manager = ManagerAgent(
    api_key="your-api-key",
    workers=[researcher, writer]
)

result = manager.execute("Please create a 1000-word report on AI trends in 2024")
print(result)

Orchestration Strategies

Dynamic Task Assignment

from typing import List, Dict, Any
import time

class TaskOrchestrator:
    """Task orchestrator"""

    def __init__(self, agents: List[Any]):
        self.agents = agents
        self.task_queue = []
        self.agent_status = {agent.name: "idle" for agent in agents}

    def add_task(self, task: Dict[str, Any]):
        """Add task to queue"""
        self.task_queue.append(task)

    def get_available_agent(self, required_capability: str = None):
        """Get available agent"""
        for agent in self.agents:
            if self.agent_status[agent.name] == "idle":
                if required_capability is None or required_capability in agent.capabilities:
                    return agent
        return None

    def execute_tasks(self):
        """Process task queue"""
        while self.task_queue:
            task = self.task_queue.pop(0)

            # Find appropriate agent
            agent = self.get_available_agent(task.get("required_capability"))

            if agent:
                print(f"Assigning task '{task['name']}' to {agent.name}")
                self.agent_status[agent.name] = "busy"

                # Execute task (assuming asynchronous)
                result = agent.execute(task)

                self.agent_status[agent.name] = "idle"
                print(f"{agent.name} completed task '{task['name']}'")
            else:
                # If agent unavailable, return to queue
                self.task_queue.append(task)
                time.sleep(1)

State Management and Conflict Resolution

Distributed State Synchronization

from typing import Dict, Any, Optional
from datetime import datetime
import json

class StateManager:
    """State management between agents"""

    def __init__(self):
        self.state: Dict[str, Any] = {}
        self.version: Dict[str, int] = {}
        self.history: List[Dict[str, Any]] = []

    def update_state(self, key: str, value: Any, agent_name: str) -> bool:
        """Update state (with versioning)"""
        current_version = self.version.get(key, 0)

        # Record update
        update_record = {
            "key": key,
            "value": value,
            "agent": agent_name,
            "version": current_version + 1,
            "timestamp": datetime.now().isoformat()
        }

        self.state[key] = value
        self.version[key] = current_version + 1
        self.history.append(update_record)

        return True

    def get_state(self, key: str, version: Optional[int] = None) -> Optional[Any]:
        """Get state (with specific version support)"""
        if version is None:
            return self.state.get(key)

        # Search for specific version in history
        for record in reversed(self.history):
            if record["key"] == key and record["version"] == version:
                return record["value"]

        return None

    def resolve_conflict(self, key: str, conflicting_values: List[Dict[str, Any]]) -> Any:
        """Resolve conflict"""
        # Adopt value with latest timestamp (Last-Write-Wins)
        latest = max(conflicting_values, key=lambda x: x["timestamp"])
        return latest["value"]

# Usage example
state_manager = StateManager()

# Multiple agents update same key
state_manager.update_state("document_title", "Introduction to AI Agents", "agent1")
state_manager.update_state("document_title", "Complete Guide to AI Agents", "agent2")

# Get latest value
current_title = state_manager.get_state("document_title")
print(f"Current title: {current_title}")

# Check history
print("\nUpdate history:")
for record in state_manager.history:
    print(f"  v{record['version']}: {record['value']} (by {record['agent']})")

Summary

What We Learned in This Chapter

Design Principles

Effective multi-agent systems are realized through clear role distribution, efficient communication, appropriate orchestration, and robust state management

Disclaimer

⚠️ Please Help Us Improve Content Quality

This content was created with AI assistance. If you find errors or areas for improvement, please report them using one of the following methods: