第3章:FMEAと故障の木解析

システム故障モードの体系的分析と信頼性評価

📚 プロセス安全性評価入門 ⏱️ 読了時間: 30-35分 🎯 難易度: 中級

この章で学ぶこと

3.1 FMEA(故障モード影響解析)

FMEA(Failure Mode and Effects Analysis)は、システムの各コンポーネントについて、 考えられる故障モード、その影響、発生確率、検出性を体系的に分析する手法です。 プロセス産業では、設備の信頼性評価や保全計画の策定に広く活用されています。

3.1.1 基本的なFMEA実装

FMEAの核心は、リスク優先度数(RPN: Risk Priority Number)の計算です。 RPN = 深刻度(S) × 発生度(O) × 検出度(D) で算出され、1000点満点で評価します。

# Example 1: Basic FMEA Implementation
import pandas as pd
import numpy as np
from typing import List, Dict

class FMEA:
    """故障モード影響解析(FMEA)システム

    各機器の故障モード、影響、リスクを体系的に分析
    """

    def __init__(self):
        self.analysis_data = []

    def add_failure_mode(self, component: str, failure_mode: str,
                        effects: str, causes: str,
                        severity: int, occurrence: int, detection: int,
                        current_controls: str = ""):
        """故障モードを追加

        Args:
            component: 対象コンポーネント
            failure_mode: 故障モード
            effects: 影響
            causes: 原因
            severity: 深刻度 (1-10)
            occurrence: 発生度 (1-10)
            detection: 検出度 (1-10)
            current_controls: 現在の管理策
        """
        rpn = severity * occurrence * detection

        self.analysis_data.append({
            'Component': component,
            'Failure Mode': failure_mode,
            'Effects': effects,
            'Causes': causes,
            'Severity': severity,
            'Occurrence': occurrence,
            'Detection': detection,
            'RPN': rpn,
            'Current Controls': current_controls
        })

    def get_analysis(self) -> pd.DataFrame:
        """FMEA分析結果を取得"""
        df = pd.DataFrame(self.analysis_data)
        return df.sort_values('RPN', ascending=False)

    def get_high_risk_items(self, threshold: int = 200) -> pd.DataFrame:
        """高リスク項目を抽出

        Args:
            threshold: RPNしきい値(デフォルト200以上を高リスクと判定)
        """
        df = self.get_analysis()
        return df[df['RPN'] >= threshold]

# 実使用例:化学反応器のFMEA
fmea = FMEA()

# 温度制御システムの故障モード
fmea.add_failure_mode(
    component="温度センサー",
    failure_mode="センサー故障(高温側)",
    effects="反応暴走、圧力上昇、爆発リスク",
    causes="センサー劣化、配線断線",
    severity=9,
    occurrence=3,
    detection=4,
    current_controls="冗長センサー、定期校正"
)

fmea.add_failure_mode(
    component="冷却水ポンプ",
    failure_mode="ポンプ停止",
    effects="反応器過熱、製品品質低下",
    causes="モーター故障、軸受摩耗",
    severity=8,
    occurrence=4,
    detection=2,
    current_controls="流量計アラーム、予備ポンプ"
)

fmea.add_failure_mode(
    component="圧力安全弁",
    failure_mode="作動不良(開かない)",
    effects="過圧による容器破裂",
    causes="弁座固着、スプリング劣化",
    severity=10,
    occurrence=2,
    detection=8,
    current_controls="年次点検、圧力試験"
)

# 分析結果表示
analysis_df = fmea.get_analysis()
print("=== FMEA分析結果(RPN順) ===")
print(analysis_df[['Component', 'Failure Mode', 'RPN']].to_string(index=False))
print(f"\n平均RPN: {analysis_df['RPN'].mean():.1f}")
print(f"最大RPN: {analysis_df['RPN'].max()}")

# 高リスク項目の抽出
high_risk = fmea.get_high_risk_items(threshold=150)
print(f"\n高リスク項目(RPN≥150): {len(high_risk)}件")
print(high_risk[['Component', 'RPN']].to_string(index=False))

# 出力例:
# === FMEA分析結果(RPN順) ===
# Component        Failure Mode                    RPN
# 圧力安全弁       作動不良(開かない)            160
# 温度センサー     センサー故障(高温側)          108
# 冷却水ポンプ     ポンプ停止                       64
#
# 平均RPN: 110.7
# 最大RPN: 160
#
# 高リスク項目(RPN≥150): 1件
# Component    RPN
# 圧力安全弁   160

3.1.2 臨界度解析(Criticality Analysis)

臨界度解析は、FMEAを拡張し、故障モードの影響を定量的に評価します。 各故障モードの臨界度 = 発生確率 × 深刻度 で計算し、重要度をランク付けします。

# Example 2: Criticality Analysis
import matplotlib.pyplot as plt

class CriticalityAnalysis:
    """臨界度解析(Criticality Analysis)

    故障モードの重要度を定量的に評価し、優先順位を決定
    """

    def __init__(self, fmea_data: pd.DataFrame):
        self.data = fmea_data.copy()
        self._calculate_criticality()

    def _calculate_criticality(self):
        """臨界度を計算

        Criticality = (Occurrence/10) * Severity * α
        α: 検出性による重み係数(検出困難ほど大きい)
        """
        # 検出度から重み係数を計算(検出困難 = 係数大)
        detection_weight = self.data['Detection'] / 10

        # 臨界度 = 発生確率 × 深刻度 × 検出困難度
        self.data['Criticality'] = (
            (self.data['Occurrence'] / 10) *
            self.data['Severity'] *
            detection_weight
        )

        # 臨界度ランク付け(1-4)
        criticality_values = self.data['Criticality']
        self.data['Criticality Rank'] = pd.cut(
            criticality_values,
            bins=[0, 2, 4, 6, 10],
            labels=['Low', 'Medium', 'High', 'Critical']
        )

    def get_critical_items(self, rank: str = 'Critical') -> pd.DataFrame:
        """臨界度ランク別の項目取得"""
        return self.data[self.data['Criticality Rank'] == rank]

    def plot_criticality_matrix(self):
        """臨界度マトリクスをプロット(深刻度 vs 発生度)"""
        fig, ax = plt.subplots(figsize=(10, 8))

        # バブルチャート(バブルサイズ = 検出度)
        scatter = ax.scatter(
            self.data['Occurrence'],
            self.data['Severity'],
            s=self.data['Detection'] * 50,  # バブルサイズ
            c=self.data['Criticality'],
            cmap='YlOrRd',
            alpha=0.6,
            edgecolors='black'
        )

        # 各点にラベル付け
        for idx, row in self.data.iterrows():
            ax.annotate(
                row['Component'],
                (row['Occurrence'], row['Severity']),
                fontsize=8,
                ha='center'
            )

        ax.set_xlabel('Occurrence (発生度)', fontsize=12)
        ax.set_ylabel('Severity (深刻度)', fontsize=12)
        ax.set_title('Criticality Matrix (臨界度マトリクス)', fontsize=14)
        ax.grid(True, alpha=0.3)

        # カラーバー
        cbar = plt.colorbar(scatter)
        cbar.set_label('Criticality (臨界度)', fontsize=10)

        return fig

# 実使用例
ca = CriticalityAnalysis(analysis_df)

print("=== 臨界度解析結果 ===")
result = ca.data[['Component', 'Criticality', 'Criticality Rank']]
result = result.sort_values('Criticality', ascending=False)
print(result.to_string(index=False))

# ランク別の統計
print("\n=== 臨界度ランク別統計 ===")
rank_stats = ca.data['Criticality Rank'].value_counts().sort_index()
print(rank_stats)

# Critical項目の詳細
critical_items = ca.get_critical_items('Critical')
if len(critical_items) > 0:
    print(f"\n⚠️  Critical項目(最優先対策): {len(critical_items)}件")
    print(critical_items[['Component', 'Failure Mode', 'Criticality']].to_string(index=False))

# 出力例:
# === 臨界度解析結果 ===
# Component        Criticality  Criticality Rank
# 圧力安全弁            1.60            Low
# 温度センサー          1.08            Low
# 冷却水ポンプ          0.64            Low
#
# === 臨界度ランク別統計 ===
# Low         3
# Medium      0
# High        0
# Critical    0

3.2 故障の木解析(FTA: Fault Tree Analysis)

故障の木解析(FTA)は、望ましくない事象(トップイベント)から、 その原因となる基本事象までを論理ゲート(AND/OR)で結んで木構造で表現します。 これにより、システム全体の信頼性を定量的に評価できます。

3.2.1 故障の木構築

# Example 3: Fault Tree Construction
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum

class GateType(Enum):
    """論理ゲートの種類"""
    AND = "AND"  # すべての入力が真の時に真
    OR = "OR"    # 少なくとも1つの入力が真の時に真

@dataclass
class FaultTreeNode:
    """故障の木のノード"""
    name: str
    probability: float = 0.0  # 故障確率
    gate_type: Optional[GateType] = None
    children: List['FaultTreeNode'] = None

    def __post_init__(self):
        if self.children is None:
            self.children = []

class FaultTree:
    """故障の木解析(FTA)システム"""

    def __init__(self, top_event: str):
        self.root = FaultTreeNode(name=top_event)

    def add_gate(self, parent_name: str, gate_type: GateType,
                 children: List[tuple]):
        """論理ゲートを追加

        Args:
            parent_name: 親ノード名
            gate_type: ゲートタイプ(AND/OR)
            children: [(子ノード名, 故障確率), ...]
        """
        parent = self._find_node(self.root, parent_name)
        if parent is None:
            raise ValueError(f"Node '{parent_name}' not found")

        parent.gate_type = gate_type
        for child_name, prob in children:
            child_node = FaultTreeNode(name=child_name, probability=prob)
            parent.children.append(child_node)

    def _find_node(self, node: FaultTreeNode, name: str) -> Optional[FaultTreeNode]:
        """ノードを名前で検索"""
        if node.name == name:
            return node
        for child in node.children:
            result = self._find_node(child, name)
            if result:
                return result
        return None

    def calculate_top_event_probability(self) -> float:
        """トップイベントの発生確率を計算"""
        return self._calculate_node_probability(self.root)

    def _calculate_node_probability(self, node: FaultTreeNode) -> float:
        """ノードの故障確率を再帰的に計算"""
        # 基本事象(リーフノード)の場合
        if not node.children:
            return node.probability

        # 子ノードの確率を計算
        child_probs = [self._calculate_node_probability(child)
                      for child in node.children]

        # ゲートタイプに応じて計算
        if node.gate_type == GateType.AND:
            # ANDゲート: すべてが故障する確率
            node.probability = np.prod(child_probs)
        elif node.gate_type == GateType.OR:
            # ORゲート: 少なくとも1つが故障する確率
            # P(A∪B) = 1 - P(A'∩B') = 1 - (1-P(A))(1-P(B))
            node.probability = 1 - np.prod([1 - p for p in child_probs])

        return node.probability

    def print_tree(self, node: Optional[FaultTreeNode] = None,
                   level: int = 0):
        """故障の木を表示"""
        if node is None:
            node = self.root

        indent = "  " * level
        gate_str = f" [{node.gate_type.value}]" if node.gate_type else ""
        prob_str = f" (P={node.probability:.4f})" if node.probability > 0 else ""

        print(f"{indent}{node.name}{gate_str}{prob_str}")

        for child in node.children:
            self.print_tree(child, level + 1)

# 実使用例:反応器過圧事故の故障の木
ft = FaultTree(top_event="反応器過圧事故")

# レベル1: 過圧の直接原因(ORゲート)
ft.add_gate(
    parent_name="反応器過圧事故",
    gate_type=GateType.OR,
    children=[
        ("反応暴走", 0.0),
        ("圧力制御系故障", 0.0),
        ("安全弁作動不良", 0.002)  # 年間故障確率0.2%
    ]
)

# レベル2: 反応暴走の原因(ORゲート)
ft.add_gate(
    parent_name="反応暴走",
    gate_type=GateType.OR,
    children=[
        ("温度制御失敗", 0.0),
        ("原料過剰投入", 0.001)
    ]
)

# レベル3: 温度制御失敗の原因(ANDゲート - すべて同時に発生)
ft.add_gate(
    parent_name="温度制御失敗",
    gate_type=GateType.AND,
    children=[
        ("冷却系故障", 0.01),      # 年間1%
        ("温度センサー故障", 0.005)  # 年間0.5%
    ]
)

# レベル3: 圧力制御系故障の原因(ORゲート)
ft.add_gate(
    parent_name="圧力制御系故障",
    gate_type=GateType.OR,
    children=[
        ("圧力センサー故障", 0.003),
        ("制御弁故障", 0.004)
    ]
)

# トップイベント確率の計算
top_prob = ft.calculate_top_event_probability()

print("=== 故障の木解析結果 ===\n")
ft.print_tree()
print(f"\n【トップイベント発生確率】")
print(f"反応器過圧事故: {top_prob:.6f} (年間{top_prob*100:.4f}%)")
print(f"期待発生頻度: {1/top_prob:.1f}年に1回" if top_prob > 0 else "")

# 出力例:
# === 故障の木解析結果 ===
#
# 反応器過圧事故 [OR] (P=0.0090)
#   反応暴走 [OR] (P=0.0010)
#     温度制御失敗 [AND] (P=0.0001)
#       冷却系故障 (P=0.0100)
#       温度センサー故障 (P=0.0050)
#     原料過剰投入 (P=0.0010)
#   圧力制御系故障 [OR] (P=0.0070)
#     圧力センサー故障 (P=0.0030)
#     制御弁故障 (P=0.0040)
#   安全弁作動不良 (P=0.0020)
#
# 【トップイベント発生確率】
# 反応器過圧事故: 0.009000 (年間0.9000%)
# 期待発生頻度: 111.1年に1回

3.2.2 最小カットセット解析

最小カットセット(Minimal Cut Set)は、同時に故障するとトップイベントが発生する 基本事象の最小組合せです。これを特定することで、最も重要な故障経路を把握できます。

# Example 4: Minimal Cut Set Analysis
from itertools import combinations

class MinimalCutSetAnalyzer:
    """最小カットセット解析

    トップイベントを引き起こす基本事象の最小組合せを導出
    """

    def __init__(self, fault_tree: FaultTree):
        self.fault_tree = fault_tree
        self.minimal_cut_sets = []

    def find_minimal_cut_sets(self) -> List[List[str]]:
        """最小カットセットを導出"""
        self.minimal_cut_sets = []
        self._traverse_tree(self.fault_tree.root, [])

        # 最小性チェック(他のカットセットの部分集合を除外)
        self.minimal_cut_sets = self._remove_supersets(self.minimal_cut_sets)

        return self.minimal_cut_sets

    def _traverse_tree(self, node: FaultTreeNode, current_path: List[str]):
        """故障の木を走査してカットセットを抽出"""
        # リーフノード(基本事象)に到達
        if not node.children:
            self.minimal_cut_sets.append(current_path + [node.name])
            return

        if node.gate_type == GateType.AND:
            # ANDゲート: すべての子を同じパスに追加
            new_path = current_path.copy()
            for child in node.children:
                if not child.children:  # 基本事象
                    new_path.append(child.name)
                else:
                    self._traverse_tree(child, new_path)

            # すべて基本事象の場合
            if all(not child.children for child in node.children):
                self.minimal_cut_sets.append(new_path)

        elif node.gate_type == GateType.OR:
            # ORゲート: 各子を個別のパスとして展開
            for child in node.children:
                self._traverse_tree(child, current_path.copy())

    def _remove_supersets(self, cut_sets: List[List[str]]) -> List[List[str]]:
        """最小カットセット以外(上位集合)を除外"""
        minimal = []
        sorted_sets = sorted(cut_sets, key=len)

        for cs in sorted_sets:
            cs_set = set(cs)
            is_minimal = True

            for existing in minimal:
                if set(existing).issubset(cs_set):
                    is_minimal = False
                    break

            if is_minimal:
                minimal.append(cs)

        return minimal

    def calculate_cut_set_probabilities(self) -> pd.DataFrame:
        """各カットセットの発生確率を計算"""
        results = []

        for i, cut_set in enumerate(self.minimal_cut_sets, 1):
            # カットセット内の基本事象の確率を取得
            probabilities = []
            for event_name in cut_set:
                node = self.fault_tree._find_node(
                    self.fault_tree.root, event_name
                )
                if node:
                    probabilities.append(node.probability)

            # カットセット発生確率 = すべての事象が同時発生
            cut_set_prob = np.prod(probabilities) if probabilities else 0

            results.append({
                'Cut Set ID': f"CS{i}",
                'Events': ' AND '.join(cut_set),
                'Order': len(cut_set),
                'Probability': cut_set_prob
            })

        df = pd.DataFrame(results)
        return df.sort_values('Probability', ascending=False)

    def get_importance_measures(self) -> pd.DataFrame:
        """各基本事象の重要度指標を計算

        Fussell-Vesely重要度: その事象を含むカットセットの寄与度
        """
        # すべての基本事象を抽出
        all_events = set()
        for cut_set in self.minimal_cut_sets:
            all_events.update(cut_set)

        # トップイベント確率
        top_prob = self.fault_tree.calculate_top_event_probability()

        results = []
        for event in all_events:
            # この事象を含むカットセットの確率合計
            containing_prob = 0
            for cut_set in self.minimal_cut_sets:
                if event in cut_set:
                    probs = []
                    for e in cut_set:
                        node = self.fault_tree._find_node(
                            self.fault_tree.root, e
                        )
                        if node:
                            probs.append(node.probability)
                    containing_prob += np.prod(probs)

            # Fussell-Vesely重要度
            fv_importance = containing_prob / top_prob if top_prob > 0 else 0

            results.append({
                'Event': event,
                'FV Importance': fv_importance,
                'Cut Sets': sum(1 for cs in self.minimal_cut_sets if event in cs)
            })

        df = pd.DataFrame(results)
        return df.sort_values('FV Importance', ascending=False)

# 実使用例
mcs_analyzer = MinimalCutSetAnalyzer(ft)
minimal_cuts = mcs_analyzer.find_minimal_cut_sets()

print("\n=== 最小カットセット ===")
for i, cut_set in enumerate(minimal_cuts, 1):
    print(f"CS{i}: {' AND '.join(cut_set)}")

# カットセット確率
cut_set_probs = mcs_analyzer.calculate_cut_set_probabilities()
print("\n=== カットセット発生確率 ===")
print(cut_set_probs.to_string(index=False))

# 重要度解析
importance = mcs_analyzer.get_importance_measures()
print("\n=== 基本事象の重要度 ===")
print(importance.to_string(index=False))

# 出力例:
# === 最小カットセット ===
# CS1: 冷却系故障 AND 温度センサー故障
# CS2: 原料過剰投入
# CS3: 圧力センサー故障
# CS4: 制御弁故障
# CS5: 安全弁作動不良
#
# === カットセット発生確率 ===
# Cut Set ID                         Events  Order  Probability
#       CS2                     原料過剰投入      1       0.0010
#       CS4                       制御弁故障      1       0.0004
#       CS3                   圧力センサー故障      1       0.0003
#       CS5                   安全弁作動不良      1       0.0002
#       CS1  冷却系故障 AND 温度センサー故障      2       0.0001

3.2.3 信頼性計算の実装

# Example 5: Advanced Reliability Calculation
import scipy.stats as stats

class ReliabilityCalculator:
    """システム信頼性計算

    故障率、MTBF、アベイラビリティを計算
    """

    @staticmethod
    def failure_rate_to_probability(lambda_rate: float, time: float) -> float:
        """故障率から故障確率を計算

        Args:
            lambda_rate: 故障率 [1/hour]
            time: 時間 [hour]

        Returns:
            故障確率 P(t) = 1 - exp(-λt)
        """
        return 1 - np.exp(-lambda_rate * time)

    @staticmethod
    def calculate_mtbf(failure_rate: float) -> float:
        """平均故障間隔(MTBF)を計算

        Args:
            failure_rate: 故障率 [1/hour]

        Returns:
            MTBF [hour]
        """
        return 1 / failure_rate if failure_rate > 0 else float('inf')

    @staticmethod
    def calculate_availability(mtbf: float, mttr: float) -> float:
        """アベイラビリティ(稼働率)を計算

        Args:
            mtbf: 平均故障間隔 [hour]
            mttr: 平均修復時間 [hour]

        Returns:
            Availability = MTBF / (MTBF + MTTR)
        """
        return mtbf / (mtbf + mttr)

    @staticmethod
    def weibull_reliability(t: float, beta: float, eta: float) -> float:
        """ワイブル分布による信頼度関数

        Args:
            t: 時間
            beta: 形状パラメータ(β<1:初期故障、β=1:ランダム故障、β>1:摩耗故障)
            eta: 尺度パラメータ(特性寿命)

        Returns:
            R(t) = exp(-(t/η)^β)
        """
        return np.exp(-((t / eta) ** beta))

    @staticmethod
    def series_system_reliability(reliabilities: List[float]) -> float:
        """直列システムの信頼度

        R_sys = R1 × R2 × ... × Rn
        """
        return np.prod(reliabilities)

    @staticmethod
    def parallel_system_reliability(reliabilities: List[float]) -> float:
        """並列システム(冗長化)の信頼度

        R_sys = 1 - (1-R1) × (1-R2) × ... × (1-Rn)
        """
        return 1 - np.prod([1 - r for r in reliabilities])

# 実使用例
rc = ReliabilityCalculator()

# 機器仕様
equipment = {
    '温度センサー': {'lambda': 5.7e-6, 'mttr': 4},    # 故障率 [1/h], MTTR [h]
    '圧力センサー': {'lambda': 6.2e-6, 'mttr': 4},
    '制御弁': {'lambda': 3.8e-6, 'mttr': 8},
    '冷却ポンプ': {'lambda': 1.2e-5, 'mttr': 12}
}

print("=== 機器信頼性解析(1年間運転) ===\n")

operation_time = 8760  # 1年 = 8760時間

for name, spec in equipment.items():
    lambda_rate = spec['lambda']
    mttr = spec['mttr']

    # 各種指標計算
    failure_prob = rc.failure_rate_to_probability(lambda_rate, operation_time)
    reliability = 1 - failure_prob
    mtbf = rc.calculate_mtbf(lambda_rate)
    availability = rc.calculate_availability(mtbf, mttr)

    print(f"【{name}】")
    print(f"  故障率: {lambda_rate:.2e} [1/h]")
    print(f"  MTBF: {mtbf:.0f} h ({mtbf/8760:.1f} 年)")
    print(f"  MTTR: {mttr} h")
    print(f"  年間故障確率: {failure_prob*100:.2f}%")
    print(f"  年間信頼度: {reliability*100:.2f}%")
    print(f"  アベイラビリティ: {availability*100:.4f}%\n")

# 直列システム(すべて正常でないとシステム稼働不可)
series_reliabilities = [
    1 - rc.failure_rate_to_probability(spec['lambda'], operation_time)
    for spec in equipment.values()
]
series_reliability = rc.series_system_reliability(series_reliabilities)

print(f"【直列システム全体】")
print(f"  システム年間信頼度: {series_reliability*100:.2f}%")
print(f"  システム年間故障確率: {(1-series_reliability)*100:.2f}%")

# 並列システム(センサーを2重化した場合)
sensor_reliability = 1 - rc.failure_rate_to_probability(5.7e-6, operation_time)
redundant_reliability = rc.parallel_system_reliability([sensor_reliability, sensor_reliability])

print(f"\n【温度センサー2重化の効果】")
print(f"  単独: {sensor_reliability*100:.2f}%")
print(f"  2重化: {redundant_reliability*100:.4f}%")
print(f"  信頼度向上: {(redundant_reliability-sensor_reliability)*100:.4f}%")

# 出力例:
# === 機器信頼性解析(1年間運転) ===
#
# 【温度センサー】
#   故障率: 5.70e-06 [1/h]
#   MTBF: 175439 h (20.0 年)
#   MTTR: 4 h
#   年間故障確率: 4.90%
#   年間信頼度: 95.10%
#   アベイラビリティ: 99.9977%

3.2.4 共通原因故障解析

# Example 6: Common Cause Failure Analysis
class CommonCauseFailureAnalysis:
    """共通原因故障(CCF)解析

    冗長システムにおける共通原因による同時故障を評価
    """

    def __init__(self):
        self.beta_factors = {}  # β因子(CCF発生割合)

    def set_beta_factor(self, component: str, beta: float):
        """β因子を設定

        Args:
            component: 対象コンポーネント
            beta: β因子(0-1、典型的には0.05-0.15)
        """
        if not 0 <= beta <= 1:
            raise ValueError("β因子は0-1の範囲で指定してください")
        self.beta_factors[component] = beta

    def calculate_redundant_reliability_with_ccf(
        self,
        component: str,
        base_reliability: float,
        n_redundant: int = 2
    ) -> float:
        """CCFを考慮した冗長システムの信頼度

        Args:
            component: コンポーネント名
            base_reliability: 単体の信頼度
            n_redundant: 冗長数

        Returns:
            CCF考慮後のシステム信頼度
        """
        beta = self.beta_factors.get(component, 0)

        # 独立故障確率
        independent_failure = (1 - base_reliability) * (1 - beta)

        # CCF故障確率
        ccf_failure = (1 - base_reliability) * beta

        # 冗長システムの信頼度
        # R_sys = (独立故障を考慮した冗長) × (CCF非発生)
        independent_reliability = 1 - (independent_failure ** n_redundant)
        ccf_reliability = 1 - ccf_failure

        total_reliability = independent_reliability * ccf_reliability

        return total_reliability

    def compare_ccf_impact(
        self,
        component: str,
        base_reliability: float,
        n_redundant: int = 2
    ) -> Dict:
        """CCFの影響を比較

        Returns:
            CCF考慮前後の信頼度比較
        """
        # CCF無視した場合の冗長化効果
        without_ccf = 1 - ((1 - base_reliability) ** n_redundant)

        # CCF考慮した場合
        with_ccf = self.calculate_redundant_reliability_with_ccf(
            component, base_reliability, n_redundant
        )

        # 信頼度低下
        degradation = without_ccf - with_ccf

        return {
            'Component': component,
            'Base Reliability': base_reliability,
            'Redundancy': n_redundant,
            'Beta Factor': self.beta_factors.get(component, 0),
            'Without CCF': without_ccf,
            'With CCF': with_ccf,
            'Degradation': degradation,
            'Degradation %': (degradation / without_ccf * 100) if without_ccf > 0 else 0
        }

# 実使用例
ccf = CommonCauseFailureAnalysis()

# 各コンポーネントのβ因子設定(産業データに基づく典型値)
ccf.set_beta_factor('温度センサー', beta=0.10)  # 10%がCCF
ccf.set_beta_factor('圧力センサー', beta=0.12)
ccf.set_beta_factor('制御弁', beta=0.08)

# 単体信頼度(1年間)
base_reliabilities = {
    '温度センサー': 0.95,
    '圧力センサー': 0.95,
    '制御弁': 0.97
}

print("=== 共通原因故障(CCF)の影響評価 ===\n")

results = []
for component, base_rel in base_reliabilities.items():
    comparison = ccf.compare_ccf_impact(component, base_rel, n_redundant=2)
    results.append(comparison)

    print(f"【{component}】(2重化)")
    print(f"  単体信頼度: {comparison['Base Reliability']*100:.2f}%")
    print(f"  β因子: {comparison['Beta Factor']:.2f}")
    print(f"  CCF無視: {comparison['Without CCF']*100:.4f}%")
    print(f"  CCF考慮: {comparison['With CCF']*100:.4f}%")
    print(f"  信頼度低下: {comparison['Degradation']*100:.4f}% "
          f"({comparison['Degradation %']:.2f}%低下)\n")

# 結果をDataFrameで比較
df_ccf = pd.DataFrame(results)
print("=== CCF影響まとめ ===")
print(df_ccf[['Component', 'Beta Factor', 'Without CCF', 'With CCF', 'Degradation %']])

# 出力例:
# === 共通原因故障(CCF)の影響評価 ===
#
# 【温度センサー】(2重化)
#   単体信頼度: 95.00%
#   β因子: 0.10
#   CCF無視: 99.7500%
#   CCF考慮: 99.2525%
#   信頼度低下: 0.4975% (0.50%低下)

3.3 FMEA/FTA統合分析

FMEAとFTAを統合することで、コンポーネントレベルからシステムレベルまでの 包括的な安全性評価が可能になります。FMEAで特定した高リスク項目をFTAで定量評価します。

# Example 7: Integrated FMEA/FTA Analysis System
class IntegratedSafetyAnalysis:
    """FMEA/FTA統合安全性評価システム

    FMEAで特定したリスクをFTAで定量評価し、最適な対策を提案
    """

    def __init__(self):
        self.fmea = FMEA()
        self.fault_trees = {}
        self.mitigation_strategies = []

    def conduct_fmea(self, components: List[Dict]):
        """FMEA実施

        Args:
            components: [{component, failure_mode, ...}, ...]
        """
        for comp in components:
            self.fmea.add_failure_mode(**comp)

    def build_fault_tree_for_high_risk(self, threshold: int = 200):
        """高リスク項目に対して故障の木を構築"""
        high_risk_items = self.fmea.get_high_risk_items(threshold)

        for _, item in high_risk_items.iterrows():
            component = item['Component']
            failure_mode = item['Failure Mode']

            # 故障の木を構築(簡略化)
            ft_name = f"{component}_{failure_mode}"
            self.fault_trees[ft_name] = {
                'component': component,
                'failure_mode': failure_mode,
                'rpn': item['RPN'],
                'severity': item['Severity'],
                'occurrence': item['Occurrence'],
                'detection': item['Detection']
            }

    def generate_mitigation_strategies(self) -> pd.DataFrame:
        """リスク軽減策を生成

        RPN値に基づいて優先度付けされた対策を提案
        """
        analysis_df = self.fmea.get_analysis()

        strategies = []
        for _, row in analysis_df.iterrows():
            rpn = row['RPN']
            severity = row['Severity']
            occurrence = row['Occurrence']
            detection = row['Detection']

            # 対策の優先順位決定
            if severity >= 8:
                priority = 'Critical'
                action = '即座の設計変更・冗長化'
            elif rpn >= 200:
                priority = 'High'
                action = '検出性向上・予防保全強化'
            elif rpn >= 100:
                priority = 'Medium'
                action = '定期監視・手順改善'
            else:
                priority = 'Low'
                action = '現状維持・定期レビュー'

            # 具体的な対策提案
            specific_actions = []
            if occurrence >= 5:
                specific_actions.append('発生頻度低減: 予防保全、部品交換サイクル短縮')
            if detection >= 7:
                specific_actions.append('検出性向上: センサー追加、アラーム設定')
            if severity >= 8:
                specific_actions.append('深刻度低減: 冗長化、フェイルセーフ設計')

            strategies.append({
                'Component': row['Component'],
                'Failure Mode': row['Failure Mode'],
                'Current RPN': rpn,
                'Priority': priority,
                'Recommended Action': action,
                'Specific Measures': '; '.join(specific_actions)
            })

        df = pd.DataFrame(strategies)
        return df.sort_values('Current RPN', ascending=False)

    def calculate_risk_reduction(
        self,
        component: str,
        new_occurrence: int = None,
        new_detection: int = None
    ) -> Dict:
        """対策実施後のリスク低減効果を計算

        Args:
            component: 対象コンポーネント
            new_occurrence: 対策後の発生度
            new_detection: 対策後の検出度

        Returns:
            リスク低減効果の分析結果
        """
        # 現在の値を取得
        analysis_df = self.fmea.get_analysis()
        current = analysis_df[analysis_df['Component'] == component].iloc[0]

        current_rpn = current['RPN']
        severity = current['Severity']
        current_occ = current['Occurrence']
        current_det = current['Detection']

        # 新しいRPN計算
        new_occ = new_occurrence if new_occurrence is not None else current_occ
        new_det = new_detection if new_detection is not None else current_det
        new_rpn = severity * new_occ * new_det

        # リスク低減率
        reduction = ((current_rpn - new_rpn) / current_rpn * 100) if current_rpn > 0 else 0

        return {
            'Component': component,
            'Current RPN': current_rpn,
            'New RPN': new_rpn,
            'Reduction': reduction,
            'Current O-D': f"{current_occ}-{current_det}",
            'New O-D': f"{new_occ}-{new_det}",
            'Status': 'Improved' if new_rpn < current_rpn else 'No Change'
        }

    def generate_safety_report(self) -> str:
        """総合安全性評価レポートを生成"""
        analysis_df = self.fmea.get_analysis()
        strategies_df = self.generate_mitigation_strategies()

        report = []
        report.append("=" * 60)
        report.append("総合安全性評価レポート")
        report.append("=" * 60)
        report.append("")

        # サマリー統計
        report.append("【全体統計】")
        report.append(f"  評価項目数: {len(analysis_df)}")
        report.append(f"  平均RPN: {analysis_df['RPN'].mean():.1f}")
        report.append(f"  最大RPN: {analysis_df['RPN'].max()}")
        report.append(f"  高リスク項目(RPN≥200): {len(analysis_df[analysis_df['RPN'] >= 200])}")
        report.append("")

        # 優先度別対策件数
        report.append("【優先度別対策件数】")
        priority_counts = strategies_df['Priority'].value_counts()
        for priority in ['Critical', 'High', 'Medium', 'Low']:
            count = priority_counts.get(priority, 0)
            report.append(f"  {priority}: {count}件")
        report.append("")

        # Critical項目の詳細
        critical = strategies_df[strategies_df['Priority'] == 'Critical']
        if len(critical) > 0:
            report.append("【Critical項目(即対応必要)】")
            for _, item in critical.iterrows():
                report.append(f"  ⚠️  {item['Component']} - {item['Failure Mode']}")
                report.append(f"      RPN: {item['Current RPN']}")
                report.append(f"      対策: {item['Recommended Action']}")
            report.append("")

        return "\n".join(report)

# 実使用例:反応器システムの統合解析
isa = IntegratedSafetyAnalysis()

# FMEA実施(複数コンポーネント)
components_data = [
    {
        'component': '温度センサー',
        'failure_mode': 'センサー故障(高温側)',
        'effects': '反応暴走、圧力上昇',
        'causes': 'センサー劣化、配線断線',
        'severity': 9,
        'occurrence': 3,
        'detection': 4,
        'current_controls': '冗長センサー、定期校正'
    },
    {
        'component': '冷却水ポンプ',
        'failure_mode': 'ポンプ停止',
        'effects': '反応器過熱',
        'causes': 'モーター故障、軸受摩耗',
        'severity': 8,
        'occurrence': 4,
        'detection': 2,
        'current_controls': '流量計、予備ポンプ'
    },
    {
        'component': '圧力安全弁',
        'failure_mode': '作動不良',
        'effects': '過圧破裂',
        'causes': '弁座固着',
        'severity': 10,
        'occurrence': 2,
        'detection': 8,
        'current_controls': '年次点検'
    },
    {
        'component': '原料供給弁',
        'failure_mode': '全開固着',
        'effects': '原料過剰投入',
        'causes': 'アクチュエータ故障',
        'severity': 7,
        'occurrence': 3,
        'detection': 5,
        'current_controls': '流量制御'
    }
]

isa.conduct_fmea(components_data)

# 軽減策の生成
strategies = isa.generate_mitigation_strategies()
print("=== リスク軽減策 ===")
print(strategies[['Component', 'Current RPN', 'Priority', 'Recommended Action']].to_string(index=False))

# 対策効果の予測
print("\n=== 対策効果シミュレーション ===")

# 温度センサー: 3重化で検出度を4→2に改善
temp_sensor_effect = isa.calculate_risk_reduction(
    '温度センサー',
    new_detection=2
)
print(f"【温度センサー3重化】")
print(f"  改善前RPN: {temp_sensor_effect['Current RPN']}")
print(f"  改善後RPN: {temp_sensor_effect['New RPN']}")
print(f"  リスク低減: {temp_sensor_effect['Reduction']:.1f}%")

# 冷却水ポンプ: 予防保全で発生度を4→2に改善
pump_effect = isa.calculate_risk_reduction(
    '冷却水ポンプ',
    new_occurrence=2
)
print(f"\n【冷却水ポンプ予防保全強化】")
print(f"  改善前RPN: {pump_effect['Current RPN']}")
print(f"  改善後RPN: {pump_effect['New RPN']}")
print(f"  リスク低減: {pump_effect['Reduction']:.1f}%")

# 総合レポート
print("\n" + isa.generate_safety_report())

# 出力例:
# === リスク軽減策 ===
# Component    Current RPN  Priority                  Recommended Action
# 圧力安全弁           160  Critical  即座の設計変更・冗長化
# 温度センサー         108    Medium  定期監視・手順改善
# 原料供給弁          105    Medium  定期監視・手順改善
# 冷却水ポンプ         64       Low  現状維持・定期レビュー

学習目標の確認

この章を完了すると、以下ができるようになります:

プロセス安全性評価入門 目次に戻る

PI実践技術トップ | Homeに戻る