3.1 マルコフ連鎖の基礎
📐 定義: マルコフ連鎖(Markov Chain)
確率過程 \(\{X_n\}_{n=0,1,2,\ldots}\) がマルコフ連鎖であるとは、次の性質を満たすことです: \[P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)\] これをマルコフ性(無記憶性)といいます:未来は現在の状態のみに依存し、過去の履歴には依存しません。 推移確率: \[p_{ij} = P(X_{n+1} = j \mid X_n = i)\] 推移確率行列: \[P = (p_{ij}), \quad \sum_{j} p_{ij} = 1 \text{ for all } i\]
確率過程 \(\{X_n\}_{n=0,1,2,\ldots}\) がマルコフ連鎖であるとは、次の性質を満たすことです: \[P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)\] これをマルコフ性(無記憶性)といいます:未来は現在の状態のみに依存し、過去の履歴には依存しません。 推移確率: \[p_{ij} = P(X_{n+1} = j \mid X_n = i)\] 推移確率行列: \[P = (p_{ij}), \quad \sum_{j} p_{ij} = 1 \text{ for all } i\]
💻 コード例1: 離散マルコフ連鎖のシミュレーション
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import eig
# 3状態マルコフ連鎖の例:天気モデル(晴れ、曇り、雨)
states = ['晴れ', '曇り', '雨']
n_states = len(states)
# 推移確率行列
# P[i,j] = P(明日の天気=j | 今日の天気=i)
P = np.array([
[0.7, 0.2, 0.1], # 晴れ → 晴れ/曇り/雨
[0.3, 0.4, 0.3], # 曇り → 晴れ/曇り/雨
[0.2, 0.3, 0.5] # 雨 → 晴れ/曇り/雨
])
print("推移確率行列 P:")
print(P)
print(f"\n各行の和(確率の和=1): {P.sum(axis=1)}")
# マルコフ連鎖のシミュレーション
def simulate_markov_chain(P, initial_state, n_steps):
"""マルコフ連鎖のシミュレーション"""
n_states = P.shape[0]
states_sequence = [initial_state]
current_state = initial_state
for _ in range(n_steps):
# 現在の状態から次の状態を確率的に選択
next_state = np.random.choice(n_states, p=P[current_state])
states_sequence.append(next_state)
current_state = next_state
return states_sequence
# シミュレーション実行
np.random.seed(42)
n_days = 100
initial_state = 0 # 最初は晴れ
trajectory = simulate_markov_chain(P, initial_state, n_days)
# 可視化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
# (1) 状態の時系列
time = np.arange(len(trajectory))
axes[0].step(time, trajectory, where='post', color='#667eea', linewidth=2)
axes[0].set_yticks([0, 1, 2])
axes[0].set_yticklabels(states)
axes[0].set_xlabel('日数', fontsize=11)
axes[0].set_ylabel('天気', fontsize=11)
axes[0].set_title('マルコフ連鎖による天気の推移', fontsize=12, fontweight='bold')
axes[0].grid(alpha=0.3)
axes[0].set_xlim([0, n_days])
# (2) 各状態の出現頻度
state_counts = [trajectory.count(i) for i in range(n_states)]
state_freqs = np.array(state_counts) / len(trajectory)
x_pos = np.arange(n_states)
axes[1].bar(x_pos, state_freqs, color='#667eea', alpha=0.7, edgecolor='black')
axes[1].set_xticks(x_pos)
axes[1].set_xticklabels(states)
axes[1].set_ylabel('相対頻度', fontsize=11)
axes[1].set_title('各状態の出現頻度', fontsize=12, fontweight='bold')
axes[1].grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\n各状態の出現頻度:")
for i, state in enumerate(states):
print(f" {state}: {state_freqs[i]:.4f} ({state_counts[i]}/{len(trajectory)}日)")
推移確率行列 P:
[[0.7 0.2 0.1]
[0.3 0.4 0.3]
[0.2 0.3 0.5]]
各行の和(確率の和=1): [1. 1. 1.]
各状態の出現頻度:
晴れ: 0.4653 (47/101日)
曇り: 0.2970 (30/101日)
雨: 0.2376 (24/101日)
3.2 推移確率行列と定常分布
📊 定理: 定常分布
確率ベクトル \(\pi = (\pi_1, \pi_2, \ldots, \pi_n)\) が定常分布であるとは: \[\pi P = \pi, \quad \sum_i \pi_i = 1, \quad \pi_i \geq 0\] を満たすことです。定常分布は推移確率行列 \(P\) の固有値1に対応する左固有ベクトルです。 エルゴード定理: 既約かつ非周期的なマルコフ連鎖は、唯一の定常分布 \(\pi\) に収束します: \[\lim_{n \to \infty} P^n = \begin{pmatrix} \pi \\ \pi \\ \vdots \\ \pi \end{pmatrix}\]
確率ベクトル \(\pi = (\pi_1, \pi_2, \ldots, \pi_n)\) が定常分布であるとは: \[\pi P = \pi, \quad \sum_i \pi_i = 1, \quad \pi_i \geq 0\] を満たすことです。定常分布は推移確率行列 \(P\) の固有値1に対応する左固有ベクトルです。 エルゴード定理: 既約かつ非周期的なマルコフ連鎖は、唯一の定常分布 \(\pi\) に収束します: \[\lim_{n \to \infty} P^n = \begin{pmatrix} \pi \\ \pi \\ \vdots \\ \pi \end{pmatrix}\]
💻 コード例2: 推移確率行列と定常分布の計算
# 定常分布の計算(固有値問題として解く)
def compute_stationary_distribution(P):
"""推移確率行列Pの定常分布を計算"""
# P^T の固有値・固有ベクトルを計算
eigenvalues, eigenvectors = eig(P.T)
# 固有値1に対応する固有ベクトルを探す
idx = np.argmin(np.abs(eigenvalues - 1))
stationary = np.real(eigenvectors[:, idx])
# 正規化(和=1)
stationary = stationary / stationary.sum()
return stationary
# 天気モデルの定常分布
pi_stationary = compute_stationary_distribution(P)
print("定常分布の計算:")
print("="*60)
for i, state in enumerate(states):
print(f" π({state}) = {pi_stationary[i]:.6f}")
# 検証: π * P = π
pi_P = pi_stationary @ P
print(f"\n検証: π * P = π")
print(f" π * P = {pi_P}")
print(f" π = {pi_stationary}")
print(f" 誤差 = {np.linalg.norm(pi_P - pi_stationary):.10f}")
# 長期的な挙動: P^n の計算
n_steps = [1, 5, 10, 20, 50, 100]
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, n in enumerate(n_steps):
P_n = np.linalg.matrix_power(P, n)
# ヒートマップ
im = axes[idx].imshow(P_n, cmap='Blues', vmin=0, vmax=1)
axes[idx].set_xticks(range(n_states))
axes[idx].set_yticks(range(n_states))
axes[idx].set_xticklabels(states, rotation=45)
axes[idx].set_yticklabels(states)
axes[idx].set_title(f'P^{n}', fontsize=12, fontweight='bold')
# 数値を表示
for i in range(n_states):
for j in range(n_states):
text = axes[idx].text(j, i, f'{P_n[i, j]:.3f}',
ha="center", va="center", color="black", fontsize=9)
axes[idx].set_xlabel('次の状態', fontsize=10)
axes[idx].set_ylabel('現在の状態', fontsize=10)
plt.colorbar(im, ax=axes, fraction=0.046, pad=0.04)
plt.suptitle('推移確率行列の累乗 P^n(定常分布への収束)',
fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()
# 異なる初期分布からの収束
initial_distributions = [
np.array([1, 0, 0]), # 晴れスタート
np.array([0, 1, 0]), # 曇りスタート
np.array([0, 0, 1]), # 雨スタート
np.array([1/3, 1/3, 1/3]) # 一様分布
]
fig, ax = plt.subplots(figsize=(12, 6))
for i, pi_0 in enumerate(initial_distributions):
distributions = [pi_0]
for n in range(50):
pi_n = distributions[-1] @ P
distributions.append(pi_n)
distributions = np.array(distributions)
for j, state in enumerate(states):
ax.plot(distributions[:, j], linewidth=2, alpha=0.7,
label=f'{states[i]}スタート → {state}' if i == 0 else None,
linestyle=['-', '--', ':'][j])
ax.axhline(y=pi_stationary[0], color='red', linestyle='--', alpha=0.5, linewidth=2)
ax.axhline(y=pi_stationary[1], color='green', linestyle='--', alpha=0.5, linewidth=2)
ax.axhline(y=pi_stationary[2], color='blue', linestyle='--', alpha=0.5, linewidth=2)
ax.set_xlabel('ステップ数 n', fontsize=11)
ax.set_ylabel('確率', fontsize=11)
ax.set_title('異なる初期分布からの定常分布への収束', fontsize=12, fontweight='bold')
ax.legend(fontsize=9, ncol=2)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
3.3 ランダムウォーク
📐 定義: ランダムウォーク
1次元ランダムウォークは、確率 \(p\) で右に1移動、確率 \(1-p\) で左に1移動するマルコフ連鎖です: \[X_{n+1} = X_n + \epsilon_{n+1}, \quad \epsilon_{n+1} = \begin{cases} +1 & \text{w.p. } p \\ -1 & \text{w.p. } 1-p \end{cases}\] 対称ランダムウォーク(\(p = 1/2\))は、材料科学における拡散現象の離散モデルです。
1次元ランダムウォークは、確率 \(p\) で右に1移動、確率 \(1-p\) で左に1移動するマルコフ連鎖です: \[X_{n+1} = X_n + \epsilon_{n+1}, \quad \epsilon_{n+1} = \begin{cases} +1 & \text{w.p. } p \\ -1 & \text{w.p. } 1-p \end{cases}\] 対称ランダムウォーク(\(p = 1/2\))は、材料科学における拡散現象の離散モデルです。
💻 コード例3: ランダムウォーク
# 1次元ランダムウォークのシミュレーション
np.random.seed(42)
def random_walk_1d(n_steps, p=0.5, initial_pos=0):
"""1次元ランダムウォーク"""
positions = [initial_pos]
current_pos = initial_pos
for _ in range(n_steps):
step = 1 if np.random.rand() < p else -1
current_pos += step
positions.append(current_pos)
return np.array(positions)
# 複数の経路をシミュレーション
n_steps = 1000
n_paths = 100
p = 0.5 # 対称ランダムウォーク
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) 複数経路の可視化
for _ in range(50):
path = random_walk_1d(n_steps, p)
axes[0, 0].plot(path, alpha=0.2, linewidth=0.8, color='#667eea')
axes[0, 0].axhline(y=0, color='red', linestyle='--', linewidth=2, label='開始位置')
axes[0, 0].set_xlabel('ステップ数', fontsize=11)
axes[0, 0].set_ylabel('位置', fontsize=11)
axes[0, 0].set_title('対称ランダムウォーク(p=0.5)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 最終位置の分布
final_positions = [random_walk_1d(n_steps, p)[-1] for _ in range(n_paths)]
axes[0, 1].hist(final_positions, bins=30, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
# 理論分布(中心極限定理により正規分布に近づく)
# E[Xn] = 0 (対称), Var(Xn) = n (各ステップの分散=1)
x = np.linspace(min(final_positions), max(final_positions), 1000)
theoretical_pdf = stats.norm.pdf(x, 0, np.sqrt(n_steps))
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label='N(0, n)')
axes[0, 1].set_xlabel('最終位置', fontsize=11)
axes[0, 1].set_ylabel('密度', fontsize=11)
axes[0, 1].set_title(f'最終位置の分布(n={n_steps})', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) 非対称ランダムウォーク
p_biased = 0.6
paths_biased = [random_walk_1d(n_steps, p_biased) for _ in range(50)]
for path in paths_biased:
axes[1, 0].plot(path, alpha=0.2, linewidth=0.8, color='#764ba2')
axes[1, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('ステップ数', fontsize=11)
axes[1, 0].set_ylabel('位置', fontsize=11)
axes[1, 0].set_title(f'非対称ランダムウォーク(p={p_biased})', fontsize=12, fontweight='bold')
axes[1, 0].grid(alpha=0.3)
# (4) 平均変位と理論値の比較
n_checkpoints = np.arange(0, n_steps+1, 10)
mean_positions = []
std_positions = []
for n in n_checkpoints:
if n == 0:
mean_positions.append(0)
std_positions.append(0)
else:
paths = [random_walk_1d(n, p_biased) for _ in range(n_paths)]
final_pos = [path[-1] for path in paths]
mean_positions.append(np.mean(final_pos))
std_positions.append(np.std(final_pos))
mean_positions = np.array(mean_positions)
std_positions = np.array(std_positions)
# 理論値
theoretical_mean = n_checkpoints * (2*p_biased - 1) # E[Xn] = n(2p-1)
theoretical_std = np.sqrt(n_checkpoints * 4*p_biased*(1-p_biased)) # Var(Xn) = 4np(1-p)
axes[1, 1].plot(n_checkpoints, mean_positions, 'o-', color='#667eea',
linewidth=2, markersize=4, label='実測平均')
axes[1, 1].plot(n_checkpoints, theoretical_mean, '--', color='red',
linewidth=2, label='理論平均')
axes[1, 1].fill_between(n_checkpoints,
theoretical_mean - theoretical_std,
theoretical_mean + theoretical_std,
alpha=0.2, color='red', label='理論±1σ')
axes[1, 1].set_xlabel('ステップ数', fontsize=11)
axes[1, 1].set_ylabel('平均位置', fontsize=11)
axes[1, 1].set_title('非対称ランダムウォークの平均変位', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print(f"対称ランダムウォーク(p=0.5):")
print(f" 理論: E[X{n_steps}] = 0, Var(X{n_steps}) = {n_steps}")
print(f" 実測: 平均={np.mean(final_positions):.2f}, 分散={np.var(final_positions):.2f}")
print(f"\n非対称ランダムウォーク(p={p_biased}):")
print(f" 理論: E[X{n_steps}] = {theoretical_mean[-1]:.2f}, Var(X{n_steps}) = {theoretical_std[-1]**2:.2f}")
print(f" 実測: 平均={mean_positions[-1]:.2f}, 標準偏差={std_positions[-1]:.2f}")
3.4 連続時間マルコフ過程
📐 定義: 連続時間マルコフ過程
連続時間マルコフ過程は、連続時間パラメータ \(t \geq 0\) を持つマルコフ過程 \(\{X(t)\}_{t \geq 0}\) です。 推移率行列 \(Q\)(generator matrix): \[q_{ij} = \lim_{h \to 0} \frac{P(X(t+h) = j \mid X(t) = i)}{h} \quad (i \neq j)\] \[q_{ii} = -\sum_{j \neq i} q_{ij}\] チャップマン-コルモゴロフ方程式: \[\frac{dP(t)}{dt} = Q P(t)\] 解は \(P(t) = e^{Qt}\)
連続時間マルコフ過程は、連続時間パラメータ \(t \geq 0\) を持つマルコフ過程 \(\{X(t)\}_{t \geq 0}\) です。 推移率行列 \(Q\)(generator matrix): \[q_{ij} = \lim_{h \to 0} \frac{P(X(t+h) = j \mid X(t) = i)}{h} \quad (i \neq j)\] \[q_{ii} = -\sum_{j \neq i} q_{ij}\] チャップマン-コルモゴロフ方程式: \[\frac{dP(t)}{dt} = Q P(t)\] 解は \(P(t) = e^{Qt}\)
💻 コード例4: 連続時間マルコフ過程
from scipy.linalg import expm
# 連続時間マルコフ過程の例:SIR感染モデルの簡略版
# 状態: 健康(H), 感染(I), 回復(R)
states_sir = ['健康', '感染', '回復']
# 推移率行列 Q
# Q[i,j] = 状態iから状態jへの推移率(i≠jのとき)
lambda_infection = 0.3 # 感染率
mu_recovery = 0.2 # 回復率
Q = np.array([
[-lambda_infection, lambda_infection, 0],
[0, -mu_recovery, mu_recovery],
[0, 0, 0] # 回復は吸収状態
])
print("推移率行列 Q:")
print(Q)
print(f"\n各行の和(=0): {Q.sum(axis=1)}")
# 時間発展 P(t) = exp(Qt)
times = np.linspace(0, 30, 100)
initial_state = np.array([1, 0, 0]) # 最初は健康
probabilities = []
for t in times:
P_t = expm(Q * t)
prob_t = initial_state @ P_t
probabilities.append(prob_t)
probabilities = np.array(probabilities)
# 可視化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# (1) 確率の時間発展
for i, state in enumerate(states_sir):
axes[0].plot(times, probabilities[:, i], linewidth=2.5, label=state)
axes[0].set_xlabel('時間', fontsize=11)
axes[0].set_ylabel('確率', fontsize=11)
axes[0].set_title('連続時間マルコフ過程(SIRモデル)', fontsize=12, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(alpha=0.3)
axes[0].set_ylim([0, 1])
# (2) シミュレーション(ジャンプ過程として)
def simulate_ctmc(Q, initial_state, T_max):
"""連続時間マルコフ連鎖のシミュレーション(Gillespieアルゴリズム)"""
n_states = Q.shape[0]
current_state = initial_state
current_time = 0
times = [current_time]
states = [current_state]
while current_time < T_max:
# 現在の状態での滞在時間(指数分布)
rate = -Q[current_state, current_state]
if rate == 0: # 吸収状態
break
waiting_time = np.random.exponential(1/rate)
current_time += waiting_time
if current_time > T_max:
break
# 次の状態を選択
transition_probs = Q[current_state].copy()
transition_probs[current_state] = 0
transition_probs /= transition_probs.sum()
next_state = np.random.choice(n_states, p=transition_probs)
times.append(current_time)
states.append(next_state)
current_state = next_state
return times, states
# 複数経路のシミュレーション
np.random.seed(42)
n_simulations = 20
T_max = 30
for _ in range(n_simulations):
sim_times, sim_states = simulate_ctmc(Q, 0, T_max)
axes[1].step(sim_times, sim_states, where='post',
alpha=0.4, linewidth=1.5, color='#667eea')
axes[1].set_yticks([0, 1, 2])
axes[1].set_yticklabels(states_sir)
axes[1].set_xlabel('時間', fontsize=11)
axes[1].set_ylabel('状態', fontsize=11)
axes[1].set_title('CTMC シミュレーション(サンプル経路)', fontsize=12, fontweight='bold')
axes[1].grid(alpha=0.3)
axes[1].set_xlim([0, T_max])
plt.tight_layout()
plt.show()
3.5 ポアソン過程の性質
📐 定義: ポアソン過程
計数過程 \(\{N(t)\}_{t \geq 0}\) がポアソン過程(強度 \(\lambda\))であるとは:
計数過程 \(\{N(t)\}_{t \geq 0}\) がポアソン過程(強度 \(\lambda\))であるとは:
- \(N(0) = 0\)
- 独立増分性: 互いに重ならない時間区間での増分は独立
- 定常増分性: \(N(t+s) - N(s)\) の分布は \(t\) のみに依存
- \(N(t) \sim Pois(\lambda t)\)、すなわち \(P(N(t) = k) = \frac{(\lambda t)^k e^{-\lambda t}}{k!}\)
💻 コード例5: ポアソン過程の生成
# ポアソン過程のシミュレーション
np.random.seed(42)
lambda_rate = 2.0 # 強度(単位時間あたりの平均到着数)
T_max = 10
def simulate_poisson_process(lambda_rate, T_max):
"""ポアソン過程のシミュレーション"""
arrival_times = []
current_time = 0
while current_time < T_max:
# 次の到着までの時間(指数分布)
inter_arrival_time = np.random.exponential(1/lambda_rate)
current_time += inter_arrival_time
if current_time < T_max:
arrival_times.append(current_time)
return np.array(arrival_times)
# 複数経路のシミュレーション
n_paths = 10
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) サンプル経路
for i in range(n_paths):
arrivals = simulate_poisson_process(lambda_rate, T_max)
counts = np.arange(1, len(arrivals) + 1)
axes[0, 0].step(arrivals, counts, where='post', alpha=0.6, linewidth=2)
# 理論的期待値 E[N(t)] = λt
t_theory = np.linspace(0, T_max, 1000)
axes[0, 0].plot(t_theory, lambda_rate * t_theory, 'r--', linewidth=2.5,
label=f'E[N(t)] = λt = {lambda_rate}t')
axes[0, 0].set_xlabel('時間 t', fontsize=11)
axes[0, 0].set_ylabel('累積到着数 N(t)', fontsize=11)
axes[0, 0].set_title(f'ポアソン過程(λ={lambda_rate})', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 到着間隔時間の分布
all_inter_arrivals = []
for _ in range(1000):
arrivals = simulate_poisson_process(lambda_rate, T_max)
if len(arrivals) > 1:
inter_arrivals = np.diff(np.concatenate([[0], arrivals]))
all_inter_arrivals.extend(inter_arrivals)
all_inter_arrivals = np.array(all_inter_arrivals)
axes[0, 1].hist(all_inter_arrivals, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='シミュレーション')
# 理論分布 Exp(λ)
x = np.linspace(0, max(all_inter_arrivals), 1000)
theoretical_pdf = lambda_rate * np.exp(-lambda_rate * x)
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label=f'Exp({lambda_rate})')
axes[0, 1].set_xlabel('到着間隔時間', fontsize=11)
axes[0, 1].set_ylabel('密度', fontsize=11)
axes[0, 1].set_title('到着間隔時間の分布', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) 時刻tでの到着数の分布
t_fixed = 5
counts_at_t = []
for _ in range(5000):
arrivals = simulate_poisson_process(lambda_rate, t_fixed)
counts_at_t.append(len(arrivals))
counts_at_t = np.array(counts_at_t)
# ヒストグラム
unique, counts = np.unique(counts_at_t, return_counts=True)
axes[1, 0].bar(unique, counts/len(counts_at_t), alpha=0.6,
color='#667eea', edgecolor='black', label='シミュレーション')
# 理論分布 Pois(λt)
k = np.arange(0, max(counts_at_t) + 1)
theoretical_pmf = stats.poisson.pmf(k, lambda_rate * t_fixed)
axes[1, 0].plot(k, theoretical_pmf, 'ro-', linewidth=2, markersize=6,
label=f'Pois({lambda_rate * t_fixed})')
axes[1, 0].set_xlabel(f't={t_fixed}での到着数', fontsize=11)
axes[1, 0].set_ylabel('確率', fontsize=11)
axes[1, 0].set_title(f'時刻t={t_fixed}での到着数の分布', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) 異なる強度での比較
lambdas = [0.5, 1.0, 2.0, 4.0]
colors = ['blue', 'green', 'orange', 'red']
for lam, color in zip(lambdas, colors):
arrivals = simulate_poisson_process(lam, T_max)
counts = np.arange(1, len(arrivals) + 1)
axes[1, 1].step(arrivals, counts, where='post', linewidth=2,
alpha=0.7, color=color, label=f'λ={lam}')
axes[1, 1].set_xlabel('時間 t', fontsize=11)
axes[1, 1].set_ylabel('累積到着数 N(t)', fontsize=11)
axes[1, 1].set_title('異なる強度でのポアソン過程', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("ポアソン過程の検証:")
print(f"理論: E[到着間隔] = 1/λ = {1/lambda_rate:.4f}")
print(f"実測: E[到着間隔] = {np.mean(all_inter_arrivals):.4f}")
print(f"\n理論: E[N({t_fixed})] = λt = {lambda_rate * t_fixed:.4f}")
print(f"実測: E[N({t_fixed})] = {np.mean(counts_at_t):.4f}")
3.6 到着間隔時間の分布
💻 コード例6: 到着間隔時間の分布
# k番目の到着時刻の分布(アーラン分布)
np.random.seed(42)
lambda_rate = 1.5
n_simulations = 5000
# k=1, 2, 3, 4番目の到着時刻
k_values = [1, 2, 3, 4]
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()
for idx, k in enumerate(k_values):
# シミュレーション
arrival_times_k = []
for _ in range(n_simulations):
arrivals = simulate_poisson_process(lambda_rate, 20)
if len(arrivals) >= k:
arrival_times_k.append(arrivals[k-1])
arrival_times_k = np.array(arrival_times_k)
# ヒストグラム
axes[idx].hist(arrival_times_k, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='シミュレーション')
# 理論分布(ガンマ分布 = アーラン分布)
# T_k ~ Gamma(k, λ)
x = np.linspace(0, max(arrival_times_k), 1000)
theoretical_pdf = stats.gamma.pdf(x, a=k, scale=1/lambda_rate)
axes[idx].plot(x, theoretical_pdf, 'r-', linewidth=2.5,
label=f'Gamma({k}, {lambda_rate})')
axes[idx].set_xlabel(f'{k}番目の到着時刻', fontsize=11)
axes[idx].set_ylabel('密度', fontsize=11)
axes[idx].set_title(f'{k}番目の到着時刻の分布', fontsize=12, fontweight='bold')
axes[idx].legend()
axes[idx].grid(alpha=0.3)
# 統計量
theoretical_mean = k / lambda_rate
theoretical_var = k / lambda_rate**2
print(f"k={k}番目の到着時刻:")
print(f" 理論: E[T{k}] = {theoretical_mean:.4f}, Var(T{k}) = {theoretical_var:.4f}")
print(f" 実測: E[T{k}] = {np.mean(arrival_times_k):.4f}, Var(T{k}) = {np.var(arrival_times_k):.4f}\n")
plt.tight_layout()
plt.show()
3.7 プロセス工学への応用(故障モデリング)
💻 コード例7: プロセス工学への応用(故障モデリング)
# 製造プロセスの故障モデリング(ポアソン過程の応用)
np.random.seed(42)
# パラメータ
lambda_failure = 0.05 # 故障率(1時間あたり0.05回 = 20時間に1回)
T_operation = 1000 # 運転時間(時間)
repair_time_mean = 2 # 平均修理時間(時間)
# 故障時刻のシミュレーション
failure_times = simulate_poisson_process(lambda_failure, T_operation)
n_failures = len(failure_times)
# 修理時間(対数正規分布と仮定)
repair_times = np.random.lognormal(mean=np.log(repair_time_mean), sigma=0.3, size=n_failures)
# 稼働率の計算
total_repair_time = repair_times.sum()
uptime = T_operation - total_repair_time
availability = uptime / T_operation
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) 故障時刻とダウンタイム
time_grid = np.linspace(0, T_operation, 10000)
operational_status = np.ones_like(time_grid)
for i, (failure_time, repair_time) in enumerate(zip(failure_times, repair_times)):
end_repair = failure_time + repair_time
mask = (time_grid >= failure_time) & (time_grid < end_repair)
operational_status[mask] = 0
axes[0, 0].fill_between(time_grid, 0, operational_status,
color='green', alpha=0.6, label='稼働中')
axes[0, 0].fill_between(time_grid, 0, 1-operational_status,
color='red', alpha=0.6, label='ダウンタイム')
for failure_time in failure_times[:10]: # 最初の10個のみ表示
axes[0, 0].axvline(failure_time, color='red', linestyle='--', alpha=0.5, linewidth=1)
axes[0, 0].set_xlabel('時間(時間)', fontsize=11)
axes[0, 0].set_ylabel('状態', fontsize=11)
axes[0, 0].set_title('製造プロセスの稼働状況', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].set_ylim([0, 1.2])
axes[0, 0].grid(alpha=0.3)
# (2) 故障間隔時間の分布
if len(failure_times) > 1:
inter_failure_times = np.diff(failure_times)
axes[0, 1].hist(inter_failure_times, bins=20, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='実測')
# 理論分布 Exp(λ)
x = np.linspace(0, max(inter_failure_times), 1000)
theoretical_pdf = lambda_failure * np.exp(-lambda_failure * x)
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5,
label=f'Exp({lambda_failure})')
axes[0, 1].set_xlabel('故障間隔時間(時間)', fontsize=11)
axes[0, 1].set_ylabel('密度', fontsize=11)
axes[0, 1].set_title('故障間隔時間の分布(MTBF解析)', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) 累積故障数
counts = np.arange(1, len(failure_times) + 1)
axes[1, 0].step(failure_times, counts, where='post', linewidth=2.5,
color='#667eea', label='実測')
axes[1, 0].plot(time_grid, lambda_failure * time_grid, 'r--', linewidth=2,
label=f'E[N(t)] = {lambda_failure}t')
axes[1, 0].set_xlabel('時間(時間)', fontsize=11)
axes[1, 0].set_ylabel('累積故障数', fontsize=11)
axes[1, 0].set_title('累積故障数の推移', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) 稼働率の時間推移
window_size = 100 # 移動窓のサイズ(時間)
time_points = np.arange(window_size, T_operation, 10)
availabilities = []
for t in time_points:
# [t-window_size, t] の区間での稼働率
start_time = t - window_size
failures_in_window = failure_times[(failure_times >= start_time) & (failure_times < t)]
repairs_in_window = repair_times[:(len(failures_in_window))]
total_downtime = repairs_in_window.sum()
avail = 1 - total_downtime / window_size
availabilities.append(avail)
axes[1, 1].plot(time_points, availabilities, linewidth=2, color='#667eea',
label='移動窓稼働率')
axes[1, 1].axhline(y=availability, color='red', linestyle='--', linewidth=2,
label=f'全体稼働率 = {availability:.4f}')
axes[1, 1].set_xlabel('時間(時間)', fontsize=11)
axes[1, 1].set_ylabel('稼働率', fontsize=11)
axes[1, 1].set_title(f'稼働率の時間推移(窓サイズ={window_size}h)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
axes[1, 1].set_ylim([0, 1])
plt.tight_layout()
plt.show()
# 統計サマリー
print("故障モデリング統計:")
print("="*60)
print(f"運転時間: {T_operation} 時間")
print(f"故障回数: {n_failures} 回")
print(f"平均故障間隔(MTBF): {T_operation / n_failures:.2f} 時間")
print(f"理論MTBF: {1 / lambda_failure:.2f} 時間")
print(f"\n総修理時間: {total_repair_time:.2f} 時間")
print(f"平均修理時間(MTTR): {repair_times.mean():.2f} 時間")
print(f"\n稼働率: {availability:.4f} ({availability*100:.2f}%)")
print(f"理論稼働率: {1 / (1 + lambda_failure * repair_time_mean):.4f}")
💡 Note: ポアソン過程は、製造プロセスにおける故障モデリング、品質管理、保守計画に広く応用されます。MTBF(平均故障間隔)= 1/λ、稼働率 = MTBF / (MTBF + MTTR) などの指標が、プロセス最適化の意思決定に使われます。
演習問題
📝 演習1: マルコフ連鎖の定常分布
次の推移確率行列を持つマルコフ連鎖を考える: \[P = \begin{pmatrix} 0.8 & 0.2 & 0 \\ 0.1 & 0.7 & 0.2 \\ 0 & 0.3 & 0.7 \end{pmatrix}\]
次の推移確率行列を持つマルコフ連鎖を考える: \[P = \begin{pmatrix} 0.8 & 0.2 & 0 \\ 0.1 & 0.7 & 0.2 \\ 0 & 0.3 & 0.7 \end{pmatrix}\]
- 定常分布 π を固有値問題として計算せよ
- 異なる初期分布から100ステップシミュレーションし、定常分布への収束を確認せよ
- \(P^{50}\) を計算し、各行が定常分布に収束していることを確認せよ
📝 演習2: ランダムウォークの解析
1次元ランダムウォーク(p=0.6)を1000ステップシミュレートし:
1次元ランダムウォーク(p=0.6)を1000ステップシミュレートし:
- 最終位置の分布をヒストグラムで可視化せよ
- 理論的な期待値 E[X_n] = n(2p-1) と実測値を比較せよ
- 中心極限定理により、標準化した最終位置が正規分布に従うことを確認せよ
📝 演習3: ポアソン過程の応用
コールセンターへの問い合わせがポアソン過程(λ=5 [件/時])に従うとする:
コールセンターへの問い合わせがポアソン過程(λ=5 [件/時])に従うとする:
- 1時間で10件以上の問い合わせが来る確率を計算せよ
- 次の問い合わせまでの待ち時間が15分以上になる確率を計算せよ
- 8時間稼働で、問い合わせ数の分布をシミュレーションし、ポアソン分布と比較せよ