第3章: LLMの訓練と整合性

事前学習からRLHF、DPO、そして推論時スケーリングへ

読了時間: 35-40分 難易度: 上級 最終更新: 2026年1月
免責事項: 本章は2026年初頭時点の最新の訓練手法を解説しています。この分野は急速に進化しており、具体的な実装については最新の研究論文と公式ドキュメントを参照してください。

1. 事前学習(Pre-training)

LLMの訓練は、膨大なテキストデータを用いた自己教師あり学習から始まります。この「事前学習」フェーズでは、言語の統計的パターン、世界知識、推論能力の基盤が形成されます。

1.1 訓練目標

次トークン予測

Decoder-OnlyモデルのLLMは、与えられたコンテキストから次のトークンを予測するように訓練されます。この単純な目標が、驚くほど汎用的な能力を生み出します:

$\mathcal{L} = -\sum_{t=1}^{T} \log P(x_t | x_1, ..., x_{t-1}; \theta)$

この目標を最適化することで、モデルは文法、事実、推論パターン、さらには数学や コードまで学習します。

1.2 訓練データ

データソース 規模 品質 用途
Common Crawl 数兆トークン 低〜中 一般知識
Wikipedia 数十億トークン 事実知識
書籍 数十億トークン 長文理解
GitHub 数兆トークン 中〜高 コード生成
arXiv 数十億トークン 科学推論
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer

def pretrain_step(
    model: torch.nn.Module,
    batch: dict,
    optimizer: torch.optim.Optimizer,
    device: torch.device
) -> float:
    """
    事前学習の1ステップ

    Args:
        model: 言語モデル
        batch: 入力バッチ(input_ids, attention_mask, labels)
        optimizer: オプティマイザ
        device: 計算デバイス

    Returns:
        ステップの損失値
    """
    model.train()
    optimizer.zero_grad()

    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)

    # ラベルは入力を1トークンシフトしたもの
    labels = input_ids.clone()
    labels[:, :-1] = input_ids[:, 1:]
    labels[:, -1] = -100  # 最後のトークンは予測不要

    # 順伝播
    outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask,
        labels=labels
    )

    loss = outputs.loss

    # 逆伝播
    loss.backward()

    # 勾配クリッピング(安定性のため)
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

    return loss.item()

def calculate_perplexity(loss: float) -> float:
    """損失からパープレキシティを計算"""
    return torch.exp(torch.tensor(loss)).item()

2. RLHF(人間のフィードバックからの強化学習)

RLHFは、人間の選好データを用いてLLMを「整合」させる手法です。有害な出力を減らし、有用で誠実な応答を促進します。

2.1 RLHFパイプライン

flowchart LR subgraph "ステップ1: SFT" D1[デモデータ] --> M1[教師ありファインチューニング] M1 --> SFT[SFTモデル] end subgraph "ステップ2: 報酬モデル" SFT --> Gen[応答生成] Gen --> Pref[人間の選好] Pref --> RM[報酬モデル訓練] end subgraph "ステップ3: RLファインチューニング" SFT --> PPO[PPO最適化] RM --> PPO PPO --> Final[最終モデル] end

RLHFの3ステップ

  1. SFT(教師ありファインチューニング): 高品質なデモンストレーションデータで微調整
  2. 報酬モデル訓練: 人間の選好データから「良い応答」を学習
  3. PPO最適化: 報酬モデルを用いた強化学習で方策を改善
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple

class RewardModel(nn.Module):
    """
    報酬モデルの実装

    言語モデルをベースに、応答の品質を
    スカラー値として出力するようファインチューニング
    """

    def __init__(self, base_model: nn.Module, hidden_dim: int):
        super().__init__()
        self.base_model = base_model
        self.reward_head = nn.Linear(hidden_dim, 1)

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor
    ) -> torch.Tensor:
        """入力シーケンスの報酬スコアを計算"""
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        # 最後のトークンの隠れ状態を使用
        last_hidden = outputs.hidden_states[-1]
        last_token_idx = attention_mask.sum(dim=1) - 1

        # バッチ内の各サンプルの最後のトークンを取得
        batch_size = input_ids.shape[0]
        last_token_hidden = last_hidden[
            torch.arange(batch_size),
            last_token_idx
        ]

        reward = self.reward_head(last_token_hidden)
        return reward.squeeze(-1)

def reward_model_loss(
    reward_chosen: torch.Tensor,
    reward_rejected: torch.Tensor
) -> torch.Tensor:
    """
    報酬モデルのペアワイズ損失

    選好ペア(選択された応答、拒否された応答)から
    報酬モデルを訓練する

    損失 = -log(sigmoid(r_chosen - r_rejected))
    """
    return -F.logsigmoid(reward_chosen - reward_rejected).mean()

# PPOによるファインチューニング(簡略化)
class PPOTrainer:
    """
    PPO(Proximal Policy Optimization)トレーナー

    報酬モデルを使用してLLMを最適化
    """

    def __init__(
        self,
        policy_model: nn.Module,
        ref_model: nn.Module,
        reward_model: nn.Module,
        kl_coef: float = 0.1,
        clip_range: float = 0.2
    ):
        self.policy = policy_model
        self.ref = ref_model
        self.reward = reward_model
        self.kl_coef = kl_coef
        self.clip_range = clip_range

    def compute_rewards(
        self,
        input_ids: torch.Tensor,
        response_ids: torch.Tensor,
        attention_mask: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """報酬とKLペナルティを計算"""

        # 報酬モデルからのスコア
        full_ids = torch.cat([input_ids, response_ids], dim=1)
        full_mask = torch.cat([
            attention_mask,
            torch.ones_like(response_ids)
        ], dim=1)

        reward_score = self.reward(full_ids, full_mask)

        # KLダイバージェンス(参照モデルからの乖離ペナルティ)
        with torch.no_grad():
            ref_logits = self.ref(full_ids, full_mask).logits
        policy_logits = self.policy(full_ids, full_mask).logits

        kl = F.kl_div(
            F.log_softmax(policy_logits, dim=-1),
            F.softmax(ref_logits, dim=-1),
            reduction='batchmean'
        )

        # 最終報酬 = 報酬スコア - KLペナルティ
        final_reward = reward_score - self.kl_coef * kl

        return final_reward, kl

3. DPO(直接選好最適化)

DPOは、RLHFを単純化し、報酬モデルの明示的な訓練と強化学習ループを不要にする手法です。選好データから直接方策を最適化します。

整合手法の進化

RLHF(2022)DPO(2023)GRPO(2024-2025)

3.1 DPOの数学的背景

DPOは、最適な方策が報酬関数と直接関連しているという洞察に基づいています:

$\pi^*(y|x) = \frac{1}{Z(x)} \pi_{ref}(y|x) \exp\left(\frac{r(x,y)}{\beta}\right)$

この関係を逆転させることで、報酬モデルなしに直接選好から学習できます:

import torch
import torch.nn.functional as F
from typing import Dict, Tuple

def dpo_loss(
    policy_chosen_logps: torch.Tensor,
    policy_rejected_logps: torch.Tensor,
    reference_chosen_logps: torch.Tensor,
    reference_rejected_logps: torch.Tensor,
    beta: float = 0.1
) -> Tuple[torch.Tensor, Dict]:
    """
    DPO損失の計算

    選好データ(選択/拒否ペア)から直接方策を最適化

    Args:
        policy_chosen_logps: 現方策での選択応答の対数確率
        policy_rejected_logps: 現方策での拒否応答の対数確率
        reference_chosen_logps: 参照方策での選択応答の対数確率
        reference_rejected_logps: 参照方策での拒否応答の対数確率
        beta: KLペナルティの強度

    Returns:
        損失値とメトリクス辞書
    """
    # 対数確率比を計算
    chosen_logratios = policy_chosen_logps - reference_chosen_logps
    rejected_logratios = policy_rejected_logps - reference_rejected_logps

    # DPO損失
    logits = beta * (chosen_logratios - rejected_logratios)
    loss = -F.logsigmoid(logits).mean()

    # メトリクス
    chosen_rewards = beta * chosen_logratios.detach()
    rejected_rewards = beta * rejected_logratios.detach()

    metrics = {
        "loss": loss.item(),
        "chosen_reward": chosen_rewards.mean().item(),
        "rejected_reward": rejected_rewards.mean().item(),
        "reward_margin": (chosen_rewards - rejected_rewards).mean().item(),
        "accuracy": (chosen_rewards > rejected_rewards).float().mean().item()
    }

    return loss, metrics

class DPOTrainer:
    """
    DPOトレーナーの実装

    報酬モデルなしで選好データから直接学習
    """

    def __init__(
        self,
        model: torch.nn.Module,
        ref_model: torch.nn.Module,
        beta: float = 0.1,
        learning_rate: float = 1e-6
    ):
        self.model = model
        self.ref_model = ref_model
        self.beta = beta

        # 参照モデルは凍結
        for param in self.ref_model.parameters():
            param.requires_grad = False

        self.optimizer = torch.optim.AdamW(
            self.model.parameters(),
            lr=learning_rate
        )

    def get_logps(
        self,
        model: torch.nn.Module,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        labels: torch.Tensor
    ) -> torch.Tensor:
        """シーケンスの対数確率を計算"""
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        logits = outputs.logits

        # 対数確率を計算
        logps = F.log_softmax(logits, dim=-1)

        # ラベルに対応する対数確率を抽出
        labels_shifted = labels[:, 1:]
        logps_shifted = logps[:, :-1]

        per_token_logps = torch.gather(
            logps_shifted,
            dim=-1,
            index=labels_shifted.unsqueeze(-1)
        ).squeeze(-1)

        # パディングをマスク
        mask = (labels_shifted != -100).float()
        return (per_token_logps * mask).sum(dim=-1)

    def train_step(
        self,
        chosen_batch: Dict,
        rejected_batch: Dict
    ) -> Dict:
        """訓練ステップ"""
        self.model.train()

        # 現方策の対数確率
        policy_chosen_logps = self.get_logps(
            self.model,
            chosen_batch["input_ids"],
            chosen_batch["attention_mask"],
            chosen_batch["labels"]
        )
        policy_rejected_logps = self.get_logps(
            self.model,
            rejected_batch["input_ids"],
            rejected_batch["attention_mask"],
            rejected_batch["labels"]
        )

        # 参照方策の対数確率
        with torch.no_grad():
            ref_chosen_logps = self.get_logps(
                self.ref_model,
                chosen_batch["input_ids"],
                chosen_batch["attention_mask"],
                chosen_batch["labels"]
            )
            ref_rejected_logps = self.get_logps(
                self.ref_model,
                rejected_batch["input_ids"],
                rejected_batch["attention_mask"],
                rejected_batch["labels"]
            )

        # DPO損失を計算
        loss, metrics = dpo_loss(
            policy_chosen_logps,
            policy_rejected_logps,
            ref_chosen_logps,
            ref_rejected_logps,
            self.beta
        )

        # 最適化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return metrics

4. Constitutional AI

Constitutional AI(CAI)は、AIシステムに「憲法」として明示的な原則セットを与え、自己改善を促す手法です。人間のラベリングへの依存を減らしながら、価値観の整合を実現します。

憲法の例

flowchart TB subgraph "CAIプロセス" Input[プロンプト] --> Response[初期応答] Response --> Critique[自己批評] Critique --> Revision[修正版生成] Revision --> Check{原則に準拠?} Check -->|No| Critique Check -->|Yes| Output[最終応答] end
from typing import List, Dict

class ConstitutionalAI:
    """
    Constitutional AI の実装

    明示的な原則に基づいて応答を生成・改善
    """

    def __init__(self, model, principles: List[str]):
        self.model = model
        self.principles = principles

    def generate_initial_response(self, prompt: str) -> str:
        """初期応答を生成"""
        return self.model.generate(prompt)

    def critique_response(
        self,
        prompt: str,
        response: str,
        principle: str
    ) -> str:
        """原則に基づいて応答を批評"""
        critique_prompt = f"""以下の原則に照らして、この応答を批評してください。

原則: {principle}

元のプロンプト: {prompt}

応答: {response}

批評:"""
        return self.model.generate(critique_prompt)

    def revise_response(
        self,
        prompt: str,
        response: str,
        critique: str
    ) -> str:
        """批評に基づいて応答を修正"""
        revision_prompt = f"""以下の批評に基づいて、応答を改善してください。

元のプロンプト: {prompt}

元の応答: {response}

批評: {critique}

改善された応答:"""
        return self.model.generate(revision_prompt)

    def generate(self, prompt: str, max_iterations: int = 3) -> Dict:
        """Constitutional AIによる応答生成"""
        response = self.generate_initial_response(prompt)
        history = [{"type": "initial", "content": response}]

        for iteration in range(max_iterations):
            improved = False

            for principle in self.principles:
                # 批評
                critique = self.critique_response(prompt, response, principle)
                history.append({
                    "type": "critique",
                    "principle": principle,
                    "content": critique
                })

                # 問題が見つかった場合は修正
                if "問題" in critique or "改善" in critique:
                    response = self.revise_response(prompt, response, critique)
                    history.append({
                        "type": "revision",
                        "content": response
                    })
                    improved = True

            if not improved:
                break

        return {
            "final_response": response,
            "iterations": iteration + 1,
            "history": history
        }

5. 推論時スケーリング

推論時スケーリングは、訓練時のスケーリングに加えて、推論時の計算量を増やすことで性能を向上させる新しいパラダイムです。OpenAIのo1/o3モデルがこのアプローチを実証しました。

推論時スケーリングの原理

従来のスケーリング則は訓練時の計算に焦点を当てていましたが、推論時スケーリングでは:

モデル アプローチ 特徴
o1 (OpenAI) Chain-of-Thought 数学、コーディングに強い
o3 (OpenAI) 拡張推論 ARC-AGIで画期的スコア
Claude 3.5 Extended Thinking 透明な推論プロセス
Gemini 2.0 Deep Think マルチモーダル推論
DeepSeek-R1 推論特化 オープンソース
class InferenceTimeScaling:
    """
    推論時スケーリングのシミュレーション

    より多くの計算リソースを推論時に使用して
    回答品質を向上させる
    """

    def __init__(self, model, verifier_model=None):
        self.model = model
        self.verifier = verifier_model or model

    def chain_of_thought(
        self,
        prompt: str,
        num_reasoning_steps: int = 5
    ) -> Dict:
        """
        Chain-of-Thoughtによる段階的推論
        """
        cot_prompt = f"""{prompt}

この問題について段階的に考えましょう:
ステップ1:"""

        response = self.model.generate(cot_prompt, max_tokens=2000)

        return {
            "reasoning": response,
            "answer": self._extract_answer(response)
        }

    def self_consistency(
        self,
        prompt: str,
        num_samples: int = 5,
        temperature: float = 0.7
    ) -> Dict:
        """
        Self-Consistency: 複数の推論パスをサンプリングし、
        多数決で最終回答を決定
        """
        responses = []

        for _ in range(num_samples):
            response = self.model.generate(
                prompt,
                temperature=temperature
            )
            answer = self._extract_answer(response)
            responses.append({
                "full_response": response,
                "answer": answer
            })

        # 多数決
        answers = [r["answer"] for r in responses]
        from collections import Counter
        final_answer = Counter(answers).most_common(1)[0][0]

        return {
            "samples": responses,
            "final_answer": final_answer,
            "consistency": answers.count(final_answer) / len(answers)
        }

    def beam_search_reasoning(
        self,
        prompt: str,
        beam_width: int = 3,
        max_depth: int = 5
    ) -> Dict:
        """
        ビームサーチによる推論パス探索
        """
        beams = [{"path": [], "score": 0.0, "state": prompt}]

        for depth in range(max_depth):
            all_candidates = []

            for beam in beams:
                # 次のステップの候補を生成
                candidates = self._generate_next_steps(
                    beam["state"],
                    num_candidates=beam_width
                )

                for candidate in candidates:
                    score = self._score_reasoning_step(
                        beam["state"],
                        candidate
                    )
                    all_candidates.append({
                        "path": beam["path"] + [candidate],
                        "score": beam["score"] + score,
                        "state": beam["state"] + "\n" + candidate
                    })

            # 上位beam_width個を保持
            beams = sorted(
                all_candidates,
                key=lambda x: x["score"],
                reverse=True
            )[:beam_width]

        best = beams[0]
        return {
            "reasoning_path": best["path"],
            "total_score": best["score"],
            "final_answer": self._extract_answer(best["state"])
        }

    def _extract_answer(self, text: str) -> str:
        """回答を抽出(実装はタスク依存)"""
        if "答え:" in text:
            return text.split("答え:")[-1].strip()
        return text.split("\n")[-1].strip()

    def _generate_next_steps(self, state: str, num_candidates: int) -> List[str]:
        """次の推論ステップの候補を生成"""
        # 実際の実装ではモデルを使用
        return [f"ステップ: ..." for _ in range(num_candidates)]

    def _score_reasoning_step(self, context: str, step: str) -> float:
        """推論ステップをスコアリング"""
        # 実際の実装では検証モデルを使用
        return 0.0

6. スケーリング則

スケーリング則は、モデルサイズ、データ量、計算量と性能の関係を記述します。これらの法則は、最適なリソース配分を決定するための重要な指針となります。

6.1 Chinchillaスケーリング則

Chinchilla論文(2022)は、計算予算が固定された場合、モデルサイズとトレーニングデータを均等にスケールすべきことを示しました:

$N_{opt} \propto C^{0.5}$、$D_{opt} \propto C^{0.5}$

ここで$N$はパラメータ数、$D$はトークン数、$C$は計算量です。

6.2 Densing Law(2024-2025)

Densing Law

MIT、Anthropic、Databricksの共同研究で発見された新しいスケーリング則。「能力密度」(パラメータあたりの性能)が約3.5ヶ月ごとに倍増することを示しています。

これは、同じ性能を達成するのに必要なパラメータ数が指数関数的に減少していることを意味します。

時期 70Bパラメータモデルの性能 同等性能の小型モデル
2023年初頭 ベースライン 70B
2024年初頭 +20% ~35B
2025年初頭 +45% ~17B
2026年初頭 +75% ~8B

まとめ

第3章の重要ポイント

前へ: Transformerアーキテクチャ 次へ: LLMの推論と最適化
English