システム故障モードの体系的分析と信頼性評価
FMEA(Failure Mode and Effects Analysis)は、システムの各コンポーネントについて、 考えられる故障モード、その影響、発生確率、検出性を体系的に分析する手法です。 プロセス産業では、設備の信頼性評価や保全計画の策定に広く活用されています。
FMEAの核心は、リスク優先度数(RPN: Risk Priority Number)の計算です。 RPN = 深刻度(S) × 発生度(O) × 検出度(D) で算出され、1000点満点で評価します。
# Example 1: Basic FMEA Implementation
import pandas as pd
import numpy as np
from typing import List, Dict
class FMEA:
"""故障モード影響解析(FMEA)システム
各機器の故障モード、影響、リスクを体系的に分析
"""
def __init__(self):
self.analysis_data = []
def add_failure_mode(self, component: str, failure_mode: str,
effects: str, causes: str,
severity: int, occurrence: int, detection: int,
current_controls: str = ""):
"""故障モードを追加
Args:
component: 対象コンポーネント
failure_mode: 故障モード
effects: 影響
causes: 原因
severity: 深刻度 (1-10)
occurrence: 発生度 (1-10)
detection: 検出度 (1-10)
current_controls: 現在の管理策
"""
rpn = severity * occurrence * detection
self.analysis_data.append({
'Component': component,
'Failure Mode': failure_mode,
'Effects': effects,
'Causes': causes,
'Severity': severity,
'Occurrence': occurrence,
'Detection': detection,
'RPN': rpn,
'Current Controls': current_controls
})
def get_analysis(self) -> pd.DataFrame:
"""FMEA分析結果を取得"""
df = pd.DataFrame(self.analysis_data)
return df.sort_values('RPN', ascending=False)
def get_high_risk_items(self, threshold: int = 200) -> pd.DataFrame:
"""高リスク項目を抽出
Args:
threshold: RPNしきい値(デフォルト200以上を高リスクと判定)
"""
df = self.get_analysis()
return df[df['RPN'] >= threshold]
# 実使用例:化学反応器のFMEA
fmea = FMEA()
# 温度制御システムの故障モード
fmea.add_failure_mode(
component="温度センサー",
failure_mode="センサー故障(高温側)",
effects="反応暴走、圧力上昇、爆発リスク",
causes="センサー劣化、配線断線",
severity=9,
occurrence=3,
detection=4,
current_controls="冗長センサー、定期校正"
)
fmea.add_failure_mode(
component="冷却水ポンプ",
failure_mode="ポンプ停止",
effects="反応器過熱、製品品質低下",
causes="モーター故障、軸受摩耗",
severity=8,
occurrence=4,
detection=2,
current_controls="流量計アラーム、予備ポンプ"
)
fmea.add_failure_mode(
component="圧力安全弁",
failure_mode="作動不良(開かない)",
effects="過圧による容器破裂",
causes="弁座固着、スプリング劣化",
severity=10,
occurrence=2,
detection=8,
current_controls="年次点検、圧力試験"
)
# 分析結果表示
analysis_df = fmea.get_analysis()
print("=== FMEA分析結果(RPN順) ===")
print(analysis_df[['Component', 'Failure Mode', 'RPN']].to_string(index=False))
print(f"\n平均RPN: {analysis_df['RPN'].mean():.1f}")
print(f"最大RPN: {analysis_df['RPN'].max()}")
# 高リスク項目の抽出
high_risk = fmea.get_high_risk_items(threshold=150)
print(f"\n高リスク項目(RPN≥150): {len(high_risk)}件")
print(high_risk[['Component', 'RPN']].to_string(index=False))
# 出力例:
# === FMEA分析結果(RPN順) ===
# Component Failure Mode RPN
# 圧力安全弁 作動不良(開かない) 160
# 温度センサー センサー故障(高温側) 108
# 冷却水ポンプ ポンプ停止 64
#
# 平均RPN: 110.7
# 最大RPN: 160
#
# 高リスク項目(RPN≥150): 1件
# Component RPN
# 圧力安全弁 160
臨界度解析は、FMEAを拡張し、故障モードの影響を定量的に評価します。 各故障モードの臨界度 = 発生確率 × 深刻度 で計算し、重要度をランク付けします。
# Example 2: Criticality Analysis
import matplotlib.pyplot as plt
class CriticalityAnalysis:
"""臨界度解析(Criticality Analysis)
故障モードの重要度を定量的に評価し、優先順位を決定
"""
def __init__(self, fmea_data: pd.DataFrame):
self.data = fmea_data.copy()
self._calculate_criticality()
def _calculate_criticality(self):
"""臨界度を計算
Criticality = (Occurrence/10) * Severity * α
α: 検出性による重み係数(検出困難ほど大きい)
"""
# 検出度から重み係数を計算(検出困難 = 係数大)
detection_weight = self.data['Detection'] / 10
# 臨界度 = 発生確率 × 深刻度 × 検出困難度
self.data['Criticality'] = (
(self.data['Occurrence'] / 10) *
self.data['Severity'] *
detection_weight
)
# 臨界度ランク付け(1-4)
criticality_values = self.data['Criticality']
self.data['Criticality Rank'] = pd.cut(
criticality_values,
bins=[0, 2, 4, 6, 10],
labels=['Low', 'Medium', 'High', 'Critical']
)
def get_critical_items(self, rank: str = 'Critical') -> pd.DataFrame:
"""臨界度ランク別の項目取得"""
return self.data[self.data['Criticality Rank'] == rank]
def plot_criticality_matrix(self):
"""臨界度マトリクスをプロット(深刻度 vs 発生度)"""
fig, ax = plt.subplots(figsize=(10, 8))
# バブルチャート(バブルサイズ = 検出度)
scatter = ax.scatter(
self.data['Occurrence'],
self.data['Severity'],
s=self.data['Detection'] * 50, # バブルサイズ
c=self.data['Criticality'],
cmap='YlOrRd',
alpha=0.6,
edgecolors='black'
)
# 各点にラベル付け
for idx, row in self.data.iterrows():
ax.annotate(
row['Component'],
(row['Occurrence'], row['Severity']),
fontsize=8,
ha='center'
)
ax.set_xlabel('Occurrence (発生度)', fontsize=12)
ax.set_ylabel('Severity (深刻度)', fontsize=12)
ax.set_title('Criticality Matrix (臨界度マトリクス)', fontsize=14)
ax.grid(True, alpha=0.3)
# カラーバー
cbar = plt.colorbar(scatter)
cbar.set_label('Criticality (臨界度)', fontsize=10)
return fig
# 実使用例
ca = CriticalityAnalysis(analysis_df)
print("=== 臨界度解析結果 ===")
result = ca.data[['Component', 'Criticality', 'Criticality Rank']]
result = result.sort_values('Criticality', ascending=False)
print(result.to_string(index=False))
# ランク別の統計
print("\n=== 臨界度ランク別統計 ===")
rank_stats = ca.data['Criticality Rank'].value_counts().sort_index()
print(rank_stats)
# Critical項目の詳細
critical_items = ca.get_critical_items('Critical')
if len(critical_items) > 0:
print(f"\n⚠️ Critical項目(最優先対策): {len(critical_items)}件")
print(critical_items[['Component', 'Failure Mode', 'Criticality']].to_string(index=False))
# 出力例:
# === 臨界度解析結果 ===
# Component Criticality Criticality Rank
# 圧力安全弁 1.60 Low
# 温度センサー 1.08 Low
# 冷却水ポンプ 0.64 Low
#
# === 臨界度ランク別統計 ===
# Low 3
# Medium 0
# High 0
# Critical 0
故障の木解析(FTA)は、望ましくない事象(トップイベント)から、 その原因となる基本事象までを論理ゲート(AND/OR)で結んで木構造で表現します。 これにより、システム全体の信頼性を定量的に評価できます。
# Example 3: Fault Tree Construction
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum
class GateType(Enum):
"""論理ゲートの種類"""
AND = "AND" # すべての入力が真の時に真
OR = "OR" # 少なくとも1つの入力が真の時に真
@dataclass
class FaultTreeNode:
"""故障の木のノード"""
name: str
probability: float = 0.0 # 故障確率
gate_type: Optional[GateType] = None
children: List['FaultTreeNode'] = None
def __post_init__(self):
if self.children is None:
self.children = []
class FaultTree:
"""故障の木解析(FTA)システム"""
def __init__(self, top_event: str):
self.root = FaultTreeNode(name=top_event)
def add_gate(self, parent_name: str, gate_type: GateType,
children: List[tuple]):
"""論理ゲートを追加
Args:
parent_name: 親ノード名
gate_type: ゲートタイプ(AND/OR)
children: [(子ノード名, 故障確率), ...]
"""
parent = self._find_node(self.root, parent_name)
if parent is None:
raise ValueError(f"Node '{parent_name}' not found")
parent.gate_type = gate_type
for child_name, prob in children:
child_node = FaultTreeNode(name=child_name, probability=prob)
parent.children.append(child_node)
def _find_node(self, node: FaultTreeNode, name: str) -> Optional[FaultTreeNode]:
"""ノードを名前で検索"""
if node.name == name:
return node
for child in node.children:
result = self._find_node(child, name)
if result:
return result
return None
def calculate_top_event_probability(self) -> float:
"""トップイベントの発生確率を計算"""
return self._calculate_node_probability(self.root)
def _calculate_node_probability(self, node: FaultTreeNode) -> float:
"""ノードの故障確率を再帰的に計算"""
# 基本事象(リーフノード)の場合
if not node.children:
return node.probability
# 子ノードの確率を計算
child_probs = [self._calculate_node_probability(child)
for child in node.children]
# ゲートタイプに応じて計算
if node.gate_type == GateType.AND:
# ANDゲート: すべてが故障する確率
node.probability = np.prod(child_probs)
elif node.gate_type == GateType.OR:
# ORゲート: 少なくとも1つが故障する確率
# P(A∪B) = 1 - P(A'∩B') = 1 - (1-P(A))(1-P(B))
node.probability = 1 - np.prod([1 - p for p in child_probs])
return node.probability
def print_tree(self, node: Optional[FaultTreeNode] = None,
level: int = 0):
"""故障の木を表示"""
if node is None:
node = self.root
indent = " " * level
gate_str = f" [{node.gate_type.value}]" if node.gate_type else ""
prob_str = f" (P={node.probability:.4f})" if node.probability > 0 else ""
print(f"{indent}{node.name}{gate_str}{prob_str}")
for child in node.children:
self.print_tree(child, level + 1)
# 実使用例:反応器過圧事故の故障の木
ft = FaultTree(top_event="反応器過圧事故")
# レベル1: 過圧の直接原因(ORゲート)
ft.add_gate(
parent_name="反応器過圧事故",
gate_type=GateType.OR,
children=[
("反応暴走", 0.0),
("圧力制御系故障", 0.0),
("安全弁作動不良", 0.002) # 年間故障確率0.2%
]
)
# レベル2: 反応暴走の原因(ORゲート)
ft.add_gate(
parent_name="反応暴走",
gate_type=GateType.OR,
children=[
("温度制御失敗", 0.0),
("原料過剰投入", 0.001)
]
)
# レベル3: 温度制御失敗の原因(ANDゲート - すべて同時に発生)
ft.add_gate(
parent_name="温度制御失敗",
gate_type=GateType.AND,
children=[
("冷却系故障", 0.01), # 年間1%
("温度センサー故障", 0.005) # 年間0.5%
]
)
# レベル3: 圧力制御系故障の原因(ORゲート)
ft.add_gate(
parent_name="圧力制御系故障",
gate_type=GateType.OR,
children=[
("圧力センサー故障", 0.003),
("制御弁故障", 0.004)
]
)
# トップイベント確率の計算
top_prob = ft.calculate_top_event_probability()
print("=== 故障の木解析結果 ===\n")
ft.print_tree()
print(f"\n【トップイベント発生確率】")
print(f"反応器過圧事故: {top_prob:.6f} (年間{top_prob*100:.4f}%)")
print(f"期待発生頻度: {1/top_prob:.1f}年に1回" if top_prob > 0 else "")
# 出力例:
# === 故障の木解析結果 ===
#
# 反応器過圧事故 [OR] (P=0.0090)
# 反応暴走 [OR] (P=0.0010)
# 温度制御失敗 [AND] (P=0.0001)
# 冷却系故障 (P=0.0100)
# 温度センサー故障 (P=0.0050)
# 原料過剰投入 (P=0.0010)
# 圧力制御系故障 [OR] (P=0.0070)
# 圧力センサー故障 (P=0.0030)
# 制御弁故障 (P=0.0040)
# 安全弁作動不良 (P=0.0020)
#
# 【トップイベント発生確率】
# 反応器過圧事故: 0.009000 (年間0.9000%)
# 期待発生頻度: 111.1年に1回
最小カットセット(Minimal Cut Set)は、同時に故障するとトップイベントが発生する 基本事象の最小組合せです。これを特定することで、最も重要な故障経路を把握できます。
# Example 4: Minimal Cut Set Analysis
from itertools import combinations
class MinimalCutSetAnalyzer:
"""最小カットセット解析
トップイベントを引き起こす基本事象の最小組合せを導出
"""
def __init__(self, fault_tree: FaultTree):
self.fault_tree = fault_tree
self.minimal_cut_sets = []
def find_minimal_cut_sets(self) -> List[List[str]]:
"""最小カットセットを導出"""
self.minimal_cut_sets = []
self._traverse_tree(self.fault_tree.root, [])
# 最小性チェック(他のカットセットの部分集合を除外)
self.minimal_cut_sets = self._remove_supersets(self.minimal_cut_sets)
return self.minimal_cut_sets
def _traverse_tree(self, node: FaultTreeNode, current_path: List[str]):
"""故障の木を走査してカットセットを抽出"""
# リーフノード(基本事象)に到達
if not node.children:
self.minimal_cut_sets.append(current_path + [node.name])
return
if node.gate_type == GateType.AND:
# ANDゲート: すべての子を同じパスに追加
new_path = current_path.copy()
for child in node.children:
if not child.children: # 基本事象
new_path.append(child.name)
else:
self._traverse_tree(child, new_path)
# すべて基本事象の場合
if all(not child.children for child in node.children):
self.minimal_cut_sets.append(new_path)
elif node.gate_type == GateType.OR:
# ORゲート: 各子を個別のパスとして展開
for child in node.children:
self._traverse_tree(child, current_path.copy())
def _remove_supersets(self, cut_sets: List[List[str]]) -> List[List[str]]:
"""最小カットセット以外(上位集合)を除外"""
minimal = []
sorted_sets = sorted(cut_sets, key=len)
for cs in sorted_sets:
cs_set = set(cs)
is_minimal = True
for existing in minimal:
if set(existing).issubset(cs_set):
is_minimal = False
break
if is_minimal:
minimal.append(cs)
return minimal
def calculate_cut_set_probabilities(self) -> pd.DataFrame:
"""各カットセットの発生確率を計算"""
results = []
for i, cut_set in enumerate(self.minimal_cut_sets, 1):
# カットセット内の基本事象の確率を取得
probabilities = []
for event_name in cut_set:
node = self.fault_tree._find_node(
self.fault_tree.root, event_name
)
if node:
probabilities.append(node.probability)
# カットセット発生確率 = すべての事象が同時発生
cut_set_prob = np.prod(probabilities) if probabilities else 0
results.append({
'Cut Set ID': f"CS{i}",
'Events': ' AND '.join(cut_set),
'Order': len(cut_set),
'Probability': cut_set_prob
})
df = pd.DataFrame(results)
return df.sort_values('Probability', ascending=False)
def get_importance_measures(self) -> pd.DataFrame:
"""各基本事象の重要度指標を計算
Fussell-Vesely重要度: その事象を含むカットセットの寄与度
"""
# すべての基本事象を抽出
all_events = set()
for cut_set in self.minimal_cut_sets:
all_events.update(cut_set)
# トップイベント確率
top_prob = self.fault_tree.calculate_top_event_probability()
results = []
for event in all_events:
# この事象を含むカットセットの確率合計
containing_prob = 0
for cut_set in self.minimal_cut_sets:
if event in cut_set:
probs = []
for e in cut_set:
node = self.fault_tree._find_node(
self.fault_tree.root, e
)
if node:
probs.append(node.probability)
containing_prob += np.prod(probs)
# Fussell-Vesely重要度
fv_importance = containing_prob / top_prob if top_prob > 0 else 0
results.append({
'Event': event,
'FV Importance': fv_importance,
'Cut Sets': sum(1 for cs in self.minimal_cut_sets if event in cs)
})
df = pd.DataFrame(results)
return df.sort_values('FV Importance', ascending=False)
# 実使用例
mcs_analyzer = MinimalCutSetAnalyzer(ft)
minimal_cuts = mcs_analyzer.find_minimal_cut_sets()
print("\n=== 最小カットセット ===")
for i, cut_set in enumerate(minimal_cuts, 1):
print(f"CS{i}: {' AND '.join(cut_set)}")
# カットセット確率
cut_set_probs = mcs_analyzer.calculate_cut_set_probabilities()
print("\n=== カットセット発生確率 ===")
print(cut_set_probs.to_string(index=False))
# 重要度解析
importance = mcs_analyzer.get_importance_measures()
print("\n=== 基本事象の重要度 ===")
print(importance.to_string(index=False))
# 出力例:
# === 最小カットセット ===
# CS1: 冷却系故障 AND 温度センサー故障
# CS2: 原料過剰投入
# CS3: 圧力センサー故障
# CS4: 制御弁故障
# CS5: 安全弁作動不良
#
# === カットセット発生確率 ===
# Cut Set ID Events Order Probability
# CS2 原料過剰投入 1 0.0010
# CS4 制御弁故障 1 0.0004
# CS3 圧力センサー故障 1 0.0003
# CS5 安全弁作動不良 1 0.0002
# CS1 冷却系故障 AND 温度センサー故障 2 0.0001
# Example 5: Advanced Reliability Calculation
import scipy.stats as stats
class ReliabilityCalculator:
"""システム信頼性計算
故障率、MTBF、アベイラビリティを計算
"""
@staticmethod
def failure_rate_to_probability(lambda_rate: float, time: float) -> float:
"""故障率から故障確率を計算
Args:
lambda_rate: 故障率 [1/hour]
time: 時間 [hour]
Returns:
故障確率 P(t) = 1 - exp(-λt)
"""
return 1 - np.exp(-lambda_rate * time)
@staticmethod
def calculate_mtbf(failure_rate: float) -> float:
"""平均故障間隔(MTBF)を計算
Args:
failure_rate: 故障率 [1/hour]
Returns:
MTBF [hour]
"""
return 1 / failure_rate if failure_rate > 0 else float('inf')
@staticmethod
def calculate_availability(mtbf: float, mttr: float) -> float:
"""アベイラビリティ(稼働率)を計算
Args:
mtbf: 平均故障間隔 [hour]
mttr: 平均修復時間 [hour]
Returns:
Availability = MTBF / (MTBF + MTTR)
"""
return mtbf / (mtbf + mttr)
@staticmethod
def weibull_reliability(t: float, beta: float, eta: float) -> float:
"""ワイブル分布による信頼度関数
Args:
t: 時間
beta: 形状パラメータ(β<1:初期故障、β=1:ランダム故障、β>1:摩耗故障)
eta: 尺度パラメータ(特性寿命)
Returns:
R(t) = exp(-(t/η)^β)
"""
return np.exp(-((t / eta) ** beta))
@staticmethod
def series_system_reliability(reliabilities: List[float]) -> float:
"""直列システムの信頼度
R_sys = R1 × R2 × ... × Rn
"""
return np.prod(reliabilities)
@staticmethod
def parallel_system_reliability(reliabilities: List[float]) -> float:
"""並列システム(冗長化)の信頼度
R_sys = 1 - (1-R1) × (1-R2) × ... × (1-Rn)
"""
return 1 - np.prod([1 - r for r in reliabilities])
# 実使用例
rc = ReliabilityCalculator()
# 機器仕様
equipment = {
'温度センサー': {'lambda': 5.7e-6, 'mttr': 4}, # 故障率 [1/h], MTTR [h]
'圧力センサー': {'lambda': 6.2e-6, 'mttr': 4},
'制御弁': {'lambda': 3.8e-6, 'mttr': 8},
'冷却ポンプ': {'lambda': 1.2e-5, 'mttr': 12}
}
print("=== 機器信頼性解析(1年間運転) ===\n")
operation_time = 8760 # 1年 = 8760時間
for name, spec in equipment.items():
lambda_rate = spec['lambda']
mttr = spec['mttr']
# 各種指標計算
failure_prob = rc.failure_rate_to_probability(lambda_rate, operation_time)
reliability = 1 - failure_prob
mtbf = rc.calculate_mtbf(lambda_rate)
availability = rc.calculate_availability(mtbf, mttr)
print(f"【{name}】")
print(f" 故障率: {lambda_rate:.2e} [1/h]")
print(f" MTBF: {mtbf:.0f} h ({mtbf/8760:.1f} 年)")
print(f" MTTR: {mttr} h")
print(f" 年間故障確率: {failure_prob*100:.2f}%")
print(f" 年間信頼度: {reliability*100:.2f}%")
print(f" アベイラビリティ: {availability*100:.4f}%\n")
# 直列システム(すべて正常でないとシステム稼働不可)
series_reliabilities = [
1 - rc.failure_rate_to_probability(spec['lambda'], operation_time)
for spec in equipment.values()
]
series_reliability = rc.series_system_reliability(series_reliabilities)
print(f"【直列システム全体】")
print(f" システム年間信頼度: {series_reliability*100:.2f}%")
print(f" システム年間故障確率: {(1-series_reliability)*100:.2f}%")
# 並列システム(センサーを2重化した場合)
sensor_reliability = 1 - rc.failure_rate_to_probability(5.7e-6, operation_time)
redundant_reliability = rc.parallel_system_reliability([sensor_reliability, sensor_reliability])
print(f"\n【温度センサー2重化の効果】")
print(f" 単独: {sensor_reliability*100:.2f}%")
print(f" 2重化: {redundant_reliability*100:.4f}%")
print(f" 信頼度向上: {(redundant_reliability-sensor_reliability)*100:.4f}%")
# 出力例:
# === 機器信頼性解析(1年間運転) ===
#
# 【温度センサー】
# 故障率: 5.70e-06 [1/h]
# MTBF: 175439 h (20.0 年)
# MTTR: 4 h
# 年間故障確率: 4.90%
# 年間信頼度: 95.10%
# アベイラビリティ: 99.9977%
# Example 6: Common Cause Failure Analysis
class CommonCauseFailureAnalysis:
"""共通原因故障(CCF)解析
冗長システムにおける共通原因による同時故障を評価
"""
def __init__(self):
self.beta_factors = {} # β因子(CCF発生割合)
def set_beta_factor(self, component: str, beta: float):
"""β因子を設定
Args:
component: 対象コンポーネント
beta: β因子(0-1、典型的には0.05-0.15)
"""
if not 0 <= beta <= 1:
raise ValueError("β因子は0-1の範囲で指定してください")
self.beta_factors[component] = beta
def calculate_redundant_reliability_with_ccf(
self,
component: str,
base_reliability: float,
n_redundant: int = 2
) -> float:
"""CCFを考慮した冗長システムの信頼度
Args:
component: コンポーネント名
base_reliability: 単体の信頼度
n_redundant: 冗長数
Returns:
CCF考慮後のシステム信頼度
"""
beta = self.beta_factors.get(component, 0)
# 独立故障確率
independent_failure = (1 - base_reliability) * (1 - beta)
# CCF故障確率
ccf_failure = (1 - base_reliability) * beta
# 冗長システムの信頼度
# R_sys = (独立故障を考慮した冗長) × (CCF非発生)
independent_reliability = 1 - (independent_failure ** n_redundant)
ccf_reliability = 1 - ccf_failure
total_reliability = independent_reliability * ccf_reliability
return total_reliability
def compare_ccf_impact(
self,
component: str,
base_reliability: float,
n_redundant: int = 2
) -> Dict:
"""CCFの影響を比較
Returns:
CCF考慮前後の信頼度比較
"""
# CCF無視した場合の冗長化効果
without_ccf = 1 - ((1 - base_reliability) ** n_redundant)
# CCF考慮した場合
with_ccf = self.calculate_redundant_reliability_with_ccf(
component, base_reliability, n_redundant
)
# 信頼度低下
degradation = without_ccf - with_ccf
return {
'Component': component,
'Base Reliability': base_reliability,
'Redundancy': n_redundant,
'Beta Factor': self.beta_factors.get(component, 0),
'Without CCF': without_ccf,
'With CCF': with_ccf,
'Degradation': degradation,
'Degradation %': (degradation / without_ccf * 100) if without_ccf > 0 else 0
}
# 実使用例
ccf = CommonCauseFailureAnalysis()
# 各コンポーネントのβ因子設定(産業データに基づく典型値)
ccf.set_beta_factor('温度センサー', beta=0.10) # 10%がCCF
ccf.set_beta_factor('圧力センサー', beta=0.12)
ccf.set_beta_factor('制御弁', beta=0.08)
# 単体信頼度(1年間)
base_reliabilities = {
'温度センサー': 0.95,
'圧力センサー': 0.95,
'制御弁': 0.97
}
print("=== 共通原因故障(CCF)の影響評価 ===\n")
results = []
for component, base_rel in base_reliabilities.items():
comparison = ccf.compare_ccf_impact(component, base_rel, n_redundant=2)
results.append(comparison)
print(f"【{component}】(2重化)")
print(f" 単体信頼度: {comparison['Base Reliability']*100:.2f}%")
print(f" β因子: {comparison['Beta Factor']:.2f}")
print(f" CCF無視: {comparison['Without CCF']*100:.4f}%")
print(f" CCF考慮: {comparison['With CCF']*100:.4f}%")
print(f" 信頼度低下: {comparison['Degradation']*100:.4f}% "
f"({comparison['Degradation %']:.2f}%低下)\n")
# 結果をDataFrameで比較
df_ccf = pd.DataFrame(results)
print("=== CCF影響まとめ ===")
print(df_ccf[['Component', 'Beta Factor', 'Without CCF', 'With CCF', 'Degradation %']])
# 出力例:
# === 共通原因故障(CCF)の影響評価 ===
#
# 【温度センサー】(2重化)
# 単体信頼度: 95.00%
# β因子: 0.10
# CCF無視: 99.7500%
# CCF考慮: 99.2525%
# 信頼度低下: 0.4975% (0.50%低下)
FMEAとFTAを統合することで、コンポーネントレベルからシステムレベルまでの 包括的な安全性評価が可能になります。FMEAで特定した高リスク項目をFTAで定量評価します。
# Example 7: Integrated FMEA/FTA Analysis System
class IntegratedSafetyAnalysis:
"""FMEA/FTA統合安全性評価システム
FMEAで特定したリスクをFTAで定量評価し、最適な対策を提案
"""
def __init__(self):
self.fmea = FMEA()
self.fault_trees = {}
self.mitigation_strategies = []
def conduct_fmea(self, components: List[Dict]):
"""FMEA実施
Args:
components: [{component, failure_mode, ...}, ...]
"""
for comp in components:
self.fmea.add_failure_mode(**comp)
def build_fault_tree_for_high_risk(self, threshold: int = 200):
"""高リスク項目に対して故障の木を構築"""
high_risk_items = self.fmea.get_high_risk_items(threshold)
for _, item in high_risk_items.iterrows():
component = item['Component']
failure_mode = item['Failure Mode']
# 故障の木を構築(簡略化)
ft_name = f"{component}_{failure_mode}"
self.fault_trees[ft_name] = {
'component': component,
'failure_mode': failure_mode,
'rpn': item['RPN'],
'severity': item['Severity'],
'occurrence': item['Occurrence'],
'detection': item['Detection']
}
def generate_mitigation_strategies(self) -> pd.DataFrame:
"""リスク軽減策を生成
RPN値に基づいて優先度付けされた対策を提案
"""
analysis_df = self.fmea.get_analysis()
strategies = []
for _, row in analysis_df.iterrows():
rpn = row['RPN']
severity = row['Severity']
occurrence = row['Occurrence']
detection = row['Detection']
# 対策の優先順位決定
if severity >= 8:
priority = 'Critical'
action = '即座の設計変更・冗長化'
elif rpn >= 200:
priority = 'High'
action = '検出性向上・予防保全強化'
elif rpn >= 100:
priority = 'Medium'
action = '定期監視・手順改善'
else:
priority = 'Low'
action = '現状維持・定期レビュー'
# 具体的な対策提案
specific_actions = []
if occurrence >= 5:
specific_actions.append('発生頻度低減: 予防保全、部品交換サイクル短縮')
if detection >= 7:
specific_actions.append('検出性向上: センサー追加、アラーム設定')
if severity >= 8:
specific_actions.append('深刻度低減: 冗長化、フェイルセーフ設計')
strategies.append({
'Component': row['Component'],
'Failure Mode': row['Failure Mode'],
'Current RPN': rpn,
'Priority': priority,
'Recommended Action': action,
'Specific Measures': '; '.join(specific_actions)
})
df = pd.DataFrame(strategies)
return df.sort_values('Current RPN', ascending=False)
def calculate_risk_reduction(
self,
component: str,
new_occurrence: int = None,
new_detection: int = None
) -> Dict:
"""対策実施後のリスク低減効果を計算
Args:
component: 対象コンポーネント
new_occurrence: 対策後の発生度
new_detection: 対策後の検出度
Returns:
リスク低減効果の分析結果
"""
# 現在の値を取得
analysis_df = self.fmea.get_analysis()
current = analysis_df[analysis_df['Component'] == component].iloc[0]
current_rpn = current['RPN']
severity = current['Severity']
current_occ = current['Occurrence']
current_det = current['Detection']
# 新しいRPN計算
new_occ = new_occurrence if new_occurrence is not None else current_occ
new_det = new_detection if new_detection is not None else current_det
new_rpn = severity * new_occ * new_det
# リスク低減率
reduction = ((current_rpn - new_rpn) / current_rpn * 100) if current_rpn > 0 else 0
return {
'Component': component,
'Current RPN': current_rpn,
'New RPN': new_rpn,
'Reduction': reduction,
'Current O-D': f"{current_occ}-{current_det}",
'New O-D': f"{new_occ}-{new_det}",
'Status': 'Improved' if new_rpn < current_rpn else 'No Change'
}
def generate_safety_report(self) -> str:
"""総合安全性評価レポートを生成"""
analysis_df = self.fmea.get_analysis()
strategies_df = self.generate_mitigation_strategies()
report = []
report.append("=" * 60)
report.append("総合安全性評価レポート")
report.append("=" * 60)
report.append("")
# サマリー統計
report.append("【全体統計】")
report.append(f" 評価項目数: {len(analysis_df)}")
report.append(f" 平均RPN: {analysis_df['RPN'].mean():.1f}")
report.append(f" 最大RPN: {analysis_df['RPN'].max()}")
report.append(f" 高リスク項目(RPN≥200): {len(analysis_df[analysis_df['RPN'] >= 200])}")
report.append("")
# 優先度別対策件数
report.append("【優先度別対策件数】")
priority_counts = strategies_df['Priority'].value_counts()
for priority in ['Critical', 'High', 'Medium', 'Low']:
count = priority_counts.get(priority, 0)
report.append(f" {priority}: {count}件")
report.append("")
# Critical項目の詳細
critical = strategies_df[strategies_df['Priority'] == 'Critical']
if len(critical) > 0:
report.append("【Critical項目(即対応必要)】")
for _, item in critical.iterrows():
report.append(f" ⚠️ {item['Component']} - {item['Failure Mode']}")
report.append(f" RPN: {item['Current RPN']}")
report.append(f" 対策: {item['Recommended Action']}")
report.append("")
return "\n".join(report)
# 実使用例:反応器システムの統合解析
isa = IntegratedSafetyAnalysis()
# FMEA実施(複数コンポーネント)
components_data = [
{
'component': '温度センサー',
'failure_mode': 'センサー故障(高温側)',
'effects': '反応暴走、圧力上昇',
'causes': 'センサー劣化、配線断線',
'severity': 9,
'occurrence': 3,
'detection': 4,
'current_controls': '冗長センサー、定期校正'
},
{
'component': '冷却水ポンプ',
'failure_mode': 'ポンプ停止',
'effects': '反応器過熱',
'causes': 'モーター故障、軸受摩耗',
'severity': 8,
'occurrence': 4,
'detection': 2,
'current_controls': '流量計、予備ポンプ'
},
{
'component': '圧力安全弁',
'failure_mode': '作動不良',
'effects': '過圧破裂',
'causes': '弁座固着',
'severity': 10,
'occurrence': 2,
'detection': 8,
'current_controls': '年次点検'
},
{
'component': '原料供給弁',
'failure_mode': '全開固着',
'effects': '原料過剰投入',
'causes': 'アクチュエータ故障',
'severity': 7,
'occurrence': 3,
'detection': 5,
'current_controls': '流量制御'
}
]
isa.conduct_fmea(components_data)
# 軽減策の生成
strategies = isa.generate_mitigation_strategies()
print("=== リスク軽減策 ===")
print(strategies[['Component', 'Current RPN', 'Priority', 'Recommended Action']].to_string(index=False))
# 対策効果の予測
print("\n=== 対策効果シミュレーション ===")
# 温度センサー: 3重化で検出度を4→2に改善
temp_sensor_effect = isa.calculate_risk_reduction(
'温度センサー',
new_detection=2
)
print(f"【温度センサー3重化】")
print(f" 改善前RPN: {temp_sensor_effect['Current RPN']}")
print(f" 改善後RPN: {temp_sensor_effect['New RPN']}")
print(f" リスク低減: {temp_sensor_effect['Reduction']:.1f}%")
# 冷却水ポンプ: 予防保全で発生度を4→2に改善
pump_effect = isa.calculate_risk_reduction(
'冷却水ポンプ',
new_occurrence=2
)
print(f"\n【冷却水ポンプ予防保全強化】")
print(f" 改善前RPN: {pump_effect['Current RPN']}")
print(f" 改善後RPN: {pump_effect['New RPN']}")
print(f" リスク低減: {pump_effect['Reduction']:.1f}%")
# 総合レポート
print("\n" + isa.generate_safety_report())
# 出力例:
# === リスク軽減策 ===
# Component Current RPN Priority Recommended Action
# 圧力安全弁 160 Critical 即座の設計変更・冗長化
# 温度センサー 108 Medium 定期監視・手順改善
# 原料供給弁 105 Medium 定期監視・手順改善
# 冷却水ポンプ 64 Low 現状維持・定期レビュー
この章を完了すると、以下ができるようになります: