第3章: マルチエージェントシステム

複数エージェントの協調による複雑タスクの解決

📖 読了時間: 30-35分 📊 難易度: 上級 💻 コード例: 5個

マルチエージェントシステムとは

なぜ複数のエージェントが必要か

単一のエージェントでは解決が困難な複雑タスクを、専門化された複数のエージェントが協調して処理することで、より効率的かつ高品質な結果を得られます。

マルチエージェントの利点

マルチエージェントアーキテクチャの種類

アーキテクチャ 特徴 適用場面
並列型 エージェントが独立して並行実行 データ収集、複数視点の分析
順次型 エージェントが順番に処理を引き継ぐ パイプライン処理、段階的改善
階層型 マネージャーが配下のワーカーを制御 複雑な計画と実行の分離
対話型 エージェント間で議論・交渉 意思決定、合意形成

マルチエージェント設計

エージェントの役割分担

graph TD M[Manager Agent
タスク分解と調整] --> R[Researcher Agent
情報収集] M --> W[Writer Agent
文書作成] M --> C[Critic Agent
レビューと評価] R --> M W --> M C --> M style M fill:#e3f2fd style R fill:#fff3e0 style W fill:#f3e5f5 style C fill:#e8f5e9

役割ベース設計の例

from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class AgentRole:
    """エージェントの役割定義"""
    name: str
    description: str
    capabilities: List[str]
    system_prompt: str

# エージェント役割の定義
RESEARCHER_ROLE = AgentRole(
    name="Researcher",
    description="情報収集と分析の専門家",
    capabilities=["web_search", "database_query", "data_analysis"],
    system_prompt="""あなたは優秀なリサーチャーです。

役割:
- Web検索やデータベースから関連情報を収集
- 収集した情報の信頼性を評価
- 重要なポイントを要約してチームに報告

重要事項:
- 情報源を明記する
- 複数の情報源をクロスチェック
- 不確実な情報は明示する"""
)

WRITER_ROLE = AgentRole(
    name="Writer",
    description="文書作成の専門家",
    capabilities=["content_generation", "formatting", "editing"],
    system_prompt="""あなたは優秀なライターです。

役割:
- リサーチャーの情報を基に高品質な文書を作成
- 読みやすく、論理的な構成
- 対象読者に適した文体とトーン

重要事項:
- 明確で簡潔な表現
- 適切な見出しと段落構成
- 引用と参照を適切に使用"""
)

CRITIC_ROLE = AgentRole(
    name="Critic",
    description="品質レビューの専門家",
    capabilities=["quality_check", "fact_verification", "feedback"],
    system_prompt="""あなたは批評的思考力のあるレビュアーです。

役割:
- 作成された文書を批判的にレビュー
- 事実の正確性を検証
- 改善点を具体的に提示

重要事項:
- 建設的なフィードバック
- 具体的な改善提案
- 重大な問題は明確に指摘"""
)

通信プロトコル

メッセージパッシング

エージェント間の通信は、構造化されたメッセージを介して行います。

from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    """メッセージタイプ"""
    TASK = "task"              # タスク指示
    RESULT = "result"          # タスク結果
    QUERY = "query"            # 情報要求
    RESPONSE = "response"      # 情報応答
    ERROR = "error"            # エラー通知
    STATUS = "status"          # 状態更新

@dataclass
class Message:
    """エージェント間メッセージ"""
    type: MessageType
    sender: str
    receiver: str
    content: Dict[str, Any]
    timestamp: datetime
    message_id: str
    reply_to: Optional[str] = None

class MessageBus:
    """メッセージバス(エージェント間通信)"""

    def __init__(self):
        self.messages: List[Message] = []
        self.subscribers: Dict[str, List[callable]] = {}

    def subscribe(self, agent_name: str, callback: callable):
        """エージェントをメッセージ受信に登録"""
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(callback)

    def publish(self, message: Message):
        """メッセージを配信"""
        self.messages.append(message)

        # 受信者にメッセージを配信
        if message.receiver in self.subscribers:
            for callback in self.subscribers[message.receiver]:
                callback(message)

    def broadcast(self, message: Message):
        """全エージェントにブロードキャスト"""
        self.messages.append(message)

        for agent_name, callbacks in self.subscribers.items():
            if agent_name != message.sender:
                for callback in callbacks:
                    callback(message)

# 使用例
import uuid

bus = MessageBus()

def researcher_receive(message: Message):
    print(f"Researcher received: {message.type.value} from {message.sender}")

def writer_receive(message: Message):
    print(f"Writer received: {message.type.value} from {message.sender}")

# エージェントを登録
bus.subscribe("researcher", researcher_receive)
bus.subscribe("writer", writer_receive)

# メッセージ送信
task_message = Message(
    type=MessageType.TASK,
    sender="manager",
    receiver="researcher",
    content={"task": "2024年のAI動向を調査"},
    timestamp=datetime.now(),
    message_id=str(uuid.uuid4())
)

bus.publish(task_message)

共有メモリアプローチ

from typing import Dict, Any, Optional
import threading

class SharedMemory:
    """エージェント間の共有メモリ"""

    def __init__(self):
        self.data: Dict[str, Any] = {}
        self.lock = threading.Lock()
        self.subscribers: Dict[str, List[callable]] = {}

    def write(self, key: str, value: Any, agent_name: str):
        """データを書き込み"""
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_name,
                "timestamp": datetime.now()
            }

            # 変更を通知
            self._notify_subscribers(key, value, agent_name)

    def read(self, key: str) -> Optional[Any]:
        """データを読み込み"""
        with self.lock:
            if key in self.data:
                return self.data[key]["value"]
            return None

    def subscribe_to_key(self, key: str, callback: callable):
        """特定のキーの変更を監視"""
        if key not in self.subscribers:
            self.subscribers[key] = []
        self.subscribers[key].append(callback)

    def _notify_subscribers(self, key: str, value: Any, agent_name: str):
        """サブスクライバーに通知"""
        if key in self.subscribers:
            for callback in self.subscribers[key]:
                callback(key, value, agent_name)

# 使用例
memory = SharedMemory()

def on_research_complete(key, value, agent_name):
    print(f"Research completed by {agent_name}: {value}")

memory.subscribe_to_key("research_result", on_research_complete)

# Researcherが結果を書き込み
memory.write("research_result", "2024年のAI主要トレンド...", "researcher")

協調パターン

1. 並列実行パターン

import asyncio
from typing import List, Dict, Any

class ParallelAgentSystem:
    """並列実行エージェントシステム"""

    def __init__(self, agents: List[Any]):
        self.agents = agents

    async def execute_parallel(self, task: str) -> List[Dict[str, Any]]:
        """全エージェントを並列実行"""
        tasks = [
            agent.process(task)
            for agent in self.agents
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 結果を集約
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Agent {i} failed: {str(result)}")
            else:
                successful_results.append({
                    "agent": self.agents[i].name,
                    "result": result
                })

        return successful_results

# 使用例(擬似コード)
class ResearchAgent:
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty

    async def process(self, query: str) -> Dict[str, Any]:
        # 非同期でリサーチを実行
        await asyncio.sleep(1)  # API呼び出しをシミュレート
        return {
            "specialty": self.specialty,
            "findings": f"{self.specialty}に関する{query}の調査結果"
        }

# 複数の専門エージェントを並列実行
agents = [
    ResearchAgent("Tech Researcher", "技術動向"),
    ResearchAgent("Market Researcher", "市場分析"),
    ResearchAgent("Academic Researcher", "学術研究")
]

system = ParallelAgentSystem(agents)
results = asyncio.run(system.execute_parallel("2024年のAI"))
print(results)

2. 順次実行(パイプライン)パターン

from typing import Any, List, Callable

class SequentialAgentSystem:
    """順次実行エージェントシステム(パイプライン)"""

    def __init__(self):
        self.pipeline: List[Callable] = []

    def add_stage(self, agent: Callable):
        """パイプラインにエージェントを追加"""
        self.pipeline.append(agent)

    def execute(self, initial_input: Any) -> Any:
        """パイプラインを実行"""
        current_data = initial_input

        for i, agent in enumerate(self.pipeline):
            print(f"Stage {i+1}: {agent.__name__}")
            current_data = agent(current_data)
            print(f"  Output: {current_data}\n")

        return current_data

# パイプライン各段階のエージェント
def data_collector(query: str) -> Dict[str, Any]:
    """ステージ1: データ収集"""
    return {
        "query": query,
        "raw_data": f"{query}に関する生データ...",
        "sources": ["source1", "source2"]
    }

def data_analyzer(data: Dict[str, Any]) -> Dict[str, Any]:
    """ステージ2: データ分析"""
    data["analysis"] = "分析結果: 主要なトレンドは..."
    data["insights"] = ["インサイト1", "インサイト2"]
    return data

def report_generator(data: Dict[str, Any]) -> str:
    """ステージ3: レポート生成"""
    report = f"""
調査レポート: {data['query']}

分析結果:
{data['analysis']}

主要インサイト:
- {data['insights'][0]}
- {data['insights'][1]}

情報源: {', '.join(data['sources'])}
    """
    return report.strip()

# パイプライン構築と実行
pipeline = SequentialAgentSystem()
pipeline.add_stage(data_collector)
pipeline.add_stage(data_analyzer)
pipeline.add_stage(report_generator)

final_report = pipeline.execute("AIエージェントの最新動向")
print("=== 最終レポート ===")
print(final_report)

3. 階層型(Manager-Worker)パターン

from typing import List, Dict, Any
from openai import OpenAI

class ManagerAgent:
    """マネージャーエージェント(タスク分解と調整)"""

    def __init__(self, api_key: str, workers: List[Any]):
        self.client = OpenAI(api_key=api_key)
        self.workers = workers
        self.task_history = []

    def execute(self, user_request: str) -> str:
        """ユーザーリクエストを処理"""
        # ステップ1: タスクを分解
        subtasks = self.decompose_task(user_request)

        # ステップ2: ワーカーに割り当て
        results = self.delegate_to_workers(subtasks)

        # ステップ3: 結果を統合
        final_result = self.synthesize_results(user_request, results)

        return final_result

    def decompose_task(self, request: str) -> List[Dict[str, Any]]:
        """タスクをサブタスクに分解"""
        worker_capabilities = "\n".join([
            f"- {w.name}: {w.capabilities}"
            for w in self.workers
        ])

        prompt = f"""以下のリクエストを、利用可能なワーカーに割り当てるサブタスクに分解してください。

リクエスト: {request}

利用可能なワーカー:
{worker_capabilities}

各サブタスクを以下の形式で出力してください:
1. [ワーカー名] タスク内容
2. [ワーカー名] タスク内容
..."""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )

        # サブタスクをパース(簡易版)
        subtasks = []
        for line in response.choices[0].message.content.split('\n'):
            if line.strip() and line[0].isdigit():
                parts = line.split(']', 1)
                if len(parts) == 2:
                    worker_name = parts[0].split('[')[1].strip()
                    task_desc = parts[1].strip()
                    subtasks.append({
                        "worker": worker_name,
                        "task": task_desc
                    })

        return subtasks

    def delegate_to_workers(self, subtasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """ワーカーにタスクを委譲"""
        results = []

        for subtask in subtasks:
            worker_name = subtask["worker"]
            task = subtask["task"]

            # 該当するワーカーを見つける
            worker = next((w for w in self.workers if w.name == worker_name), None)

            if worker:
                result = worker.execute(task)
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": result
                })
            else:
                results.append({
                    "worker": worker_name,
                    "task": task,
                    "result": f"エラー: ワーカー {worker_name} が見つかりません"
                })

        return results

    def synthesize_results(self, original_request: str, results: List[Dict[str, Any]]) -> str:
        """結果を統合して最終回答を生成"""
        results_text = "\n\n".join([
            f"{r['worker']}の結果:\n{r['result']}"
            for r in results
        ])

        prompt = f"""以下のリクエストに対して、各ワーカーの結果を統合して最終回答を生成してください。

元のリクエスト: {original_request}

ワーカーの結果:
{results_text}

統合された回答を生成してください:"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )

        return response.choices[0].message.content

class WorkerAgent:
    """ワーカーエージェント"""

    def __init__(self, name: str, capabilities: str, system_prompt: str, api_key: str):
        self.name = name
        self.capabilities = capabilities
        self.system_prompt = system_prompt
        self.client = OpenAI(api_key=api_key)

    def execute(self, task: str) -> str:
        """タスクを実行"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": task}
            ],
            temperature=0.7
        )

        return response.choices[0].message.content

# 使用例
researcher = WorkerAgent(
    name="Researcher",
    capabilities="Web検索、データ収集",
    system_prompt="あなたはリサーチの専門家です。",
    api_key="your-api-key"
)

writer = WorkerAgent(
    name="Writer",
    capabilities="文書作成、編集",
    system_prompt="あなたは文書作成の専門家です。",
    api_key="your-api-key"
)

manager = ManagerAgent(
    api_key="your-api-key",
    workers=[researcher, writer]
)

result = manager.execute("2024年のAI動向について1000字のレポートを作成してください")
print(result)

オーケストレーション戦略

動的タスク割り当て

from typing import List, Dict, Any
import time

class TaskOrchestrator:
    """タスクオーケストレーター"""

    def __init__(self, agents: List[Any]):
        self.agents = agents
        self.task_queue = []
        self.agent_status = {agent.name: "idle" for agent in agents}

    def add_task(self, task: Dict[str, Any]):
        """タスクをキューに追加"""
        self.task_queue.append(task)

    def get_available_agent(self, required_capability: str = None):
        """利用可能なエージェントを取得"""
        for agent in self.agents:
            if self.agent_status[agent.name] == "idle":
                if required_capability is None or required_capability in agent.capabilities:
                    return agent
        return None

    def execute_tasks(self):
        """タスクキューを処理"""
        while self.task_queue:
            task = self.task_queue.pop(0)

            # 適切なエージェントを見つける
            agent = self.get_available_agent(task.get("required_capability"))

            if agent:
                print(f"タスク '{task['name']}' を {agent.name} に割り当て")
                self.agent_status[agent.name] = "busy"

                # タスク実行(非同期を想定)
                result = agent.execute(task)

                self.agent_status[agent.name] = "idle"
                print(f"{agent.name} がタスク '{task['name']}' を完了")
            else:
                # エージェントが利用不可の場合、キューに戻す
                self.task_queue.append(task)
                time.sleep(1)

状態管理とコンフリクト解決

分散状態の同期

from typing import Dict, Any, Optional
from datetime import datetime
import json

class StateManager:
    """エージェント間の状態管理"""

    def __init__(self):
        self.state: Dict[str, Any] = {}
        self.version: Dict[str, int] = {}
        self.history: List[Dict[str, Any]] = []

    def update_state(self, key: str, value: Any, agent_name: str) -> bool:
        """状態を更新(バージョン管理)"""
        current_version = self.version.get(key, 0)

        # 更新を記録
        update_record = {
            "key": key,
            "value": value,
            "agent": agent_name,
            "version": current_version + 1,
            "timestamp": datetime.now().isoformat()
        }

        self.state[key] = value
        self.version[key] = current_version + 1
        self.history.append(update_record)

        return True

    def get_state(self, key: str, version: Optional[int] = None) -> Optional[Any]:
        """状態を取得(特定バージョン対応)"""
        if version is None:
            return self.state.get(key)

        # 履歴から特定バージョンを検索
        for record in reversed(self.history):
            if record["key"] == key and record["version"] == version:
                return record["value"]

        return None

    def resolve_conflict(self, key: str, conflicting_values: List[Dict[str, Any]]) -> Any:
        """コンフリクトを解決"""
        # 最新タイムスタンプの値を採用(Last-Write-Wins)
        latest = max(conflicting_values, key=lambda x: x["timestamp"])
        return latest["value"]

# 使用例
state_manager = StateManager()

# 複数エージェントが同じキーを更新
state_manager.update_state("document_title", "AIエージェント入門", "agent1")
state_manager.update_state("document_title", "AIエージェント完全ガイド", "agent2")

# 最新の値を取得
current_title = state_manager.get_state("document_title")
print(f"現在のタイトル: {current_title}")

# 履歴を確認
print("\n更新履歴:")
for record in state_manager.history:
    print(f"  v{record['version']}: {record['value']} (by {record['agent']})")

まとめ

この章で学んだこと

設計の原則

効果的なマルチエージェントシステムは、明確な役割分担、効率的な通信、適切なオーケストレーション、そして堅牢な状態管理によって実現される

⚠️ コンテンツの品質向上にご協力ください

このコンテンツはAIを活用して作成されています。誤りや改善点を見つけられた場合は、以下の方法でご報告ください: