3.1 Fundamentals of Markov Chains
š Definition: Markov Chain
A stochastic process \(\{X_n\}_{n=0,1,2,\ldots}\) is called a Markov chain if it satisfies the following property: \[P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)\] This is called the Markov property (memorylessness): the future depends only on the current state and not on the past history. Transition probability: \[p_{ij} = P(X_{n+1} = j \mid X_n = i)\] Transition probability matrix: \[P = (p_{ij}), \quad \sum_{j} p_{ij} = 1 \text{ for all } i\]
A stochastic process \(\{X_n\}_{n=0,1,2,\ldots}\) is called a Markov chain if it satisfies the following property: \[P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)\] This is called the Markov property (memorylessness): the future depends only on the current state and not on the past history. Transition probability: \[p_{ij} = P(X_{n+1} = j \mid X_n = i)\] Transition probability matrix: \[P = (p_{ij}), \quad \sum_{j} p_{ij} = 1 \text{ for all } i\]
š» Code Example 1: Discrete Markov Chain Simulation
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import eig
from scipy import stats
# Example of 3-state Markov chain: Weather model (sunny, cloudy, rainy)
states = ['Sunny', 'Cloudy', 'Rainy']
n_states = len(states)
# Transition probability matrix
# P[i,j] = P(tomorrow's weather=j | today's weather=i)
P = np.array([
[0.7, 0.2, 0.1], # Sunny ā Sunny/Cloudy/Rainy
[0.3, 0.4, 0.3], # Cloudy ā Sunny/Cloudy/Rainy
[0.2, 0.3, 0.5] # Rainy ā Sunny/Cloudy/Rainy
])
print("Transition probability matrix P:")
print(P)
print(f"\nSum of each row (probability sum=1): {P.sum(axis=1)}")
# Markov chain simulation
def simulate_markov_chain(P, initial_state, n_steps):
"""Simulate Markov chain"""
n_states = P.shape[0]
states_sequence = [initial_state]
current_state = initial_state
for _ in range(n_steps):
# Stochastically select next state from current state
next_state = np.random.choice(n_states, p=P[current_state])
states_sequence.append(next_state)
current_state = next_state
return states_sequence
# Execute simulation
np.random.seed(42)
n_days = 100
initial_state = 0 # Start with sunny
trajectory = simulate_markov_chain(P, initial_state, n_days)
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
# (1) Time series of states
time = np.arange(len(trajectory))
axes[0].step(time, trajectory, where='post', color='#667eea', linewidth=2)
axes[0].set_yticks([0, 1, 2])
axes[0].set_yticklabels(states)
axes[0].set_xlabel('Days', fontsize=11)
axes[0].set_ylabel('Weather', fontsize=11)
axes[0].set_title('Weather Evolution by Markov Chain', fontsize=12, fontweight='bold')
axes[0].grid(alpha=0.3)
axes[0].set_xlim([0, n_days])
# (2) Frequency of each state
state_counts = [trajectory.count(i) for i in range(n_states)]
state_freqs = np.array(state_counts) / len(trajectory)
x_pos = np.arange(n_states)
axes[1].bar(x_pos, state_freqs, color='#667eea', alpha=0.7, edgecolor='black')
axes[1].set_xticks(x_pos)
axes[1].set_xticklabels(states)
axes[1].set_ylabel('Relative Frequency', fontsize=11)
axes[1].set_title('Frequency of Each State', fontsize=12, fontweight='bold')
axes[1].grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\nFrequency of each state:")
for i, state in enumerate(states):
print(f" {state}: {state_freqs[i]:.4f} ({state_counts[i]}/{len(trajectory)} days)")
Transition probability matrix P:
[[0.7 0.2 0.1]
[0.3 0.4 0.3]
[0.2 0.3 0.5]]
Sum of each row (probability sum=1): [1. 1. 1.]
Frequency of each state:
Sunny: 0.4653 (47/101 days)
Cloudy: 0.2970 (30/101 days)
Rainy: 0.2376 (24/101 days)
3.2 Transition Probability Matrix and Stationary Distribution
š Theorem: Stationary Distribution
A probability vector \(\pi = (\pi_1, \pi_2, \ldots, \pi_n)\) is called a stationary distribution if: \[\pi P = \pi, \quad \sum_i \pi_i = 1, \quad \pi_i \geq 0\] The stationary distribution is the left eigenvector corresponding to eigenvalue 1 of the transition probability matrix \(P\). Ergodic theorem: An irreducible and aperiodic Markov chain converges to a unique stationary distribution \(\pi\): \[\lim_{n \to \infty} P^n = \begin{pmatrix} \pi \\ \pi \\ \vdots \\ \pi \end{pmatrix}\]
A probability vector \(\pi = (\pi_1, \pi_2, \ldots, \pi_n)\) is called a stationary distribution if: \[\pi P = \pi, \quad \sum_i \pi_i = 1, \quad \pi_i \geq 0\] The stationary distribution is the left eigenvector corresponding to eigenvalue 1 of the transition probability matrix \(P\). Ergodic theorem: An irreducible and aperiodic Markov chain converges to a unique stationary distribution \(\pi\): \[\lim_{n \to \infty} P^n = \begin{pmatrix} \pi \\ \pi \\ \vdots \\ \pi \end{pmatrix}\]
š» Code Example 2: Transition Probability Matrix and Stationary Distribution Calculation
# Calculate stationary distribution (solve as eigenvalue problem)
def compute_stationary_distribution(P):
"""Calculate stationary distribution of transition probability matrix P"""
# Calculate eigenvalues and eigenvectors of P^T
eigenvalues, eigenvectors = eig(P.T)
# Find eigenvector corresponding to eigenvalue 1
idx = np.argmin(np.abs(eigenvalues - 1))
stationary = np.real(eigenvectors[:, idx])
# Normalize (sum=1)
stationary = stationary / stationary.sum()
return stationary
# Stationary distribution for weather model
pi_stationary = compute_stationary_distribution(P)
print("Stationary distribution calculation:")
print("="*60)
for i, state in enumerate(states):
print(f" Ļ({state}) = {pi_stationary[i]:.6f}")
# Verification: Ļ * P = Ļ
pi_P = pi_stationary @ P
print(f"\nVerification: Ļ * P = Ļ")
print(f" Ļ * P = {pi_P}")
print(f" Ļ = {pi_stationary}")
print(f" Error = {np.linalg.norm(pi_P - pi_stationary):.10f}")
# Long-term behavior: calculation of P^n
n_steps = [1, 5, 10, 20, 50, 100]
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, n in enumerate(n_steps):
P_n = np.linalg.matrix_power(P, n)
# Heatmap
im = axes[idx].imshow(P_n, cmap='Blues', vmin=0, vmax=1)
axes[idx].set_xticks(range(n_states))
axes[idx].set_yticks(range(n_states))
axes[idx].set_xticklabels(states, rotation=45)
axes[idx].set_yticklabels(states)
axes[idx].set_title(f'P^{n}', fontsize=12, fontweight='bold')
# Display values
for i in range(n_states):
for j in range(n_states):
text = axes[idx].text(j, i, f'{P_n[i, j]:.3f}',
ha="center", va="center", color="black", fontsize=9)
axes[idx].set_xlabel('Next State', fontsize=10)
axes[idx].set_ylabel('Current State', fontsize=10)
plt.colorbar(im, ax=axes, fraction=0.046, pad=0.04)
plt.suptitle('Powers of Transition Probability Matrix P^n (Convergence to Stationary Distribution)',
fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()
# Convergence from different initial distributions
initial_distributions = [
np.array([1, 0, 0]), # Start sunny
np.array([0, 1, 0]), # Start cloudy
np.array([0, 0, 1]), # Start rainy
np.array([1/3, 1/3, 1/3]) # Uniform distribution
]
fig, ax = plt.subplots(figsize=(12, 6))
for i, pi_0 in enumerate(initial_distributions):
distributions = [pi_0]
for n in range(50):
pi_n = distributions[-1] @ P
distributions.append(pi_n)
distributions = np.array(distributions)
for j, state in enumerate(states):
ax.plot(distributions[:, j], linewidth=2, alpha=0.7,
label=f'{states[i]} start ā {state}' if i == 0 else None,
linestyle=['-', '--', ':'][j])
ax.axhline(y=pi_stationary[0], color='red', linestyle='--', alpha=0.5, linewidth=2)
ax.axhline(y=pi_stationary[1], color='green', linestyle='--', alpha=0.5, linewidth=2)
ax.axhline(y=pi_stationary[2], color='blue', linestyle='--', alpha=0.5, linewidth=2)
ax.set_xlabel('Step number n', fontsize=11)
ax.set_ylabel('Probability', fontsize=11)
ax.set_title('Convergence to Stationary Distribution from Different Initial Distributions', fontsize=12, fontweight='bold')
ax.legend(fontsize=9, ncol=2)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
3.3 Random Walk
š Definition: Random Walk
A one-dimensional random walk is a Markov chain that moves right by 1 with probability \(p\) or left by 1 with probability \(1-p\): \[X_{n+1} = X_n + \epsilon_{n+1}, \quad \epsilon_{n+1} = \begin{cases} +1 & \text{w.p. } p \\ -1 & \text{w.p. } 1-p \end{cases}\] A symmetric random walk (\(p = 1/2\)) is a discrete model of diffusion phenomena in materials science.
A one-dimensional random walk is a Markov chain that moves right by 1 with probability \(p\) or left by 1 with probability \(1-p\): \[X_{n+1} = X_n + \epsilon_{n+1}, \quad \epsilon_{n+1} = \begin{cases} +1 & \text{w.p. } p \\ -1 & \text{w.p. } 1-p \end{cases}\] A symmetric random walk (\(p = 1/2\)) is a discrete model of diffusion phenomena in materials science.
š» Code Example 3: Random Walk
# Simulation of one-dimensional random walk
np.random.seed(42)
def random_walk_1d(n_steps, p=0.5, initial_pos=0):
"""One-dimensional random walk"""
positions = [initial_pos]
current_pos = initial_pos
for _ in range(n_steps):
step = 1 if np.random.rand() < p else -1
current_pos += step
positions.append(current_pos)
return np.array(positions)
# Simulate multiple paths
n_steps = 1000
n_paths = 100
p = 0.5 # Symmetric random walk
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Visualization of multiple paths
for _ in range(50):
path = random_walk_1d(n_steps, p)
axes[0, 0].plot(path, alpha=0.2, linewidth=0.8, color='#667eea')
axes[0, 0].axhline(y=0, color='red', linestyle='--', linewidth=2, label='Starting position')
axes[0, 0].set_xlabel('Step number', fontsize=11)
axes[0, 0].set_ylabel('Position', fontsize=11)
axes[0, 0].set_title('Symmetric Random Walk (p=0.5)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Distribution of final positions
final_positions = [random_walk_1d(n_steps, p)[-1] for _ in range(n_paths)]
axes[0, 1].hist(final_positions, bins=30, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
# Theoretical distribution (approaches normal distribution by central limit theorem)
# E[Xn] = 0 (symmetric), Var(Xn) = n (variance of each step = 1)
x = np.linspace(min(final_positions), max(final_positions), 1000)
theoretical_pdf = stats.norm.pdf(x, 0, np.sqrt(n_steps))
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label='N(0, n)')
axes[0, 1].set_xlabel('Final position', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title(f'Distribution of Final Position (n={n_steps})', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Asymmetric random walk
p_biased = 0.6
paths_biased = [random_walk_1d(n_steps, p_biased) for _ in range(50)]
for path in paths_biased:
axes[1, 0].plot(path, alpha=0.2, linewidth=0.8, color='#764ba2')
axes[1, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('Step number', fontsize=11)
axes[1, 0].set_ylabel('Position', fontsize=11)
axes[1, 0].set_title(f'Asymmetric Random Walk (p={p_biased})', fontsize=12, fontweight='bold')
axes[1, 0].grid(alpha=0.3)
# (4) Comparison of mean displacement with theoretical values
n_checkpoints = np.arange(0, n_steps+1, 10)
mean_positions = []
std_positions = []
for n in n_checkpoints:
if n == 0:
mean_positions.append(0)
std_positions.append(0)
else:
paths = [random_walk_1d(n, p_biased) for _ in range(n_paths)]
final_pos = [path[-1] for path in paths]
mean_positions.append(np.mean(final_pos))
std_positions.append(np.std(final_pos))
mean_positions = np.array(mean_positions)
std_positions = np.array(std_positions)
# Theoretical values
theoretical_mean = n_checkpoints * (2*p_biased - 1) # E[Xn] = n(2p-1)
theoretical_std = np.sqrt(n_checkpoints * 4*p_biased*(1-p_biased)) # Var(Xn) = 4np(1-p)
axes[1, 1].plot(n_checkpoints, mean_positions, 'o-', color='#667eea',
linewidth=2, markersize=4, label='Observed mean')
axes[1, 1].plot(n_checkpoints, theoretical_mean, '--', color='red',
linewidth=2, label='Theoretical mean')
axes[1, 1].fill_between(n_checkpoints,
theoretical_mean - theoretical_std,
theoretical_mean + theoretical_std,
alpha=0.2, color='red', label='Theoretical ±1Ļ')
axes[1, 1].set_xlabel('Step number', fontsize=11)
axes[1, 1].set_ylabel('Mean position', fontsize=11)
axes[1, 1].set_title('Mean Displacement of Asymmetric Random Walk', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print(f"Symmetric random walk (p=0.5):")
print(f" Theory: E[X{n_steps}] = 0, Var(X{n_steps}) = {n_steps}")
print(f" Observed: Mean={np.mean(final_positions):.2f}, Variance={np.var(final_positions):.2f}")
print(f"\nAsymmetric random walk (p={p_biased}):")
print(f" Theory: E[X{n_steps}] = {theoretical_mean[-1]:.2f}, Var(X{n_steps}) = {theoretical_std[-1]**2:.2f}")
print(f" Observed: Mean={mean_positions[-1]:.2f}, Standard deviation={std_positions[-1]:.2f}")
3.4 Continuous-Time Markov Process
š Definition: Continuous-Time Markov Process
A continuous-time Markov process is a Markov process \(\{X(t)\}_{t \geq 0}\) with continuous time parameter \(t \geq 0\). Transition rate matrix \(Q\) (generator matrix): \[q_{ij} = \lim_{h \to 0} \frac{P(X(t+h) = j \mid X(t) = i)}{h} \quad (i \neq j)\] \[q_{ii} = -\sum_{j \neq i} q_{ij}\] Chapman-Kolmogorov equation: \[\frac{dP(t)}{dt} = Q P(t)\] Solution is \(P(t) = e^{Qt}\)
A continuous-time Markov process is a Markov process \(\{X(t)\}_{t \geq 0}\) with continuous time parameter \(t \geq 0\). Transition rate matrix \(Q\) (generator matrix): \[q_{ij} = \lim_{h \to 0} \frac{P(X(t+h) = j \mid X(t) = i)}{h} \quad (i \neq j)\] \[q_{ii} = -\sum_{j \neq i} q_{ij}\] Chapman-Kolmogorov equation: \[\frac{dP(t)}{dt} = Q P(t)\] Solution is \(P(t) = e^{Qt}\)
š» Code Example 4: Continuous-Time Markov Process
from scipy.linalg import expm
# Example of continuous-time Markov process: simplified SIR infection model
# States: Healthy (H), Infected (I), Recovered (R)
states_sir = ['Healthy', 'Infected', 'Recovered']
# Transition rate matrix Q
# Q[i,j] = transition rate from state i to state j (when iā j)
lambda_infection = 0.3 # Infection rate
mu_recovery = 0.2 # Recovery rate
Q = np.array([
[-lambda_infection, lambda_infection, 0],
[0, -mu_recovery, mu_recovery],
[0, 0, 0] # Recovery is absorbing state
])
print("Transition rate matrix Q:")
print(Q)
print(f"\nSum of each row (=0): {Q.sum(axis=1)}")
# Time evolution P(t) = exp(Qt)
times = np.linspace(0, 30, 100)
initial_state = np.array([1, 0, 0]) # Start healthy
probabilities = []
for t in times:
P_t = expm(Q * t)
prob_t = initial_state @ P_t
probabilities.append(prob_t)
probabilities = np.array(probabilities)
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# (1) Time evolution of probabilities
for i, state in enumerate(states_sir):
axes[0].plot(times, probabilities[:, i], linewidth=2.5, label=state)
axes[0].set_xlabel('Time', fontsize=11)
axes[0].set_ylabel('Probability', fontsize=11)
axes[0].set_title('Continuous-Time Markov Process (SIR Model)', fontsize=12, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(alpha=0.3)
axes[0].set_ylim([0, 1])
# (2) Simulation (as jump process)
def simulate_ctmc(Q, initial_state, T_max):
"""Simulate continuous-time Markov chain (Gillespie algorithm)"""
n_states = Q.shape[0]
current_state = initial_state
current_time = 0
times = [current_time]
states = [current_state]
while current_time < T_max:
# Sojourn time at current state (exponential distribution)
rate = -Q[current_state, current_state]
if rate == 0: # Absorbing state
break
waiting_time = np.random.exponential(1/rate)
current_time += waiting_time
if current_time > T_max:
break
# Select next state
transition_probs = Q[current_state].copy()
transition_probs[current_state] = 0
transition_probs /= transition_probs.sum()
next_state = np.random.choice(n_states, p=transition_probs)
times.append(current_time)
states.append(next_state)
current_state = next_state
return times, states
# Simulate multiple paths
np.random.seed(42)
n_simulations = 20
T_max = 30
for _ in range(n_simulations):
sim_times, sim_states = simulate_ctmc(Q, 0, T_max)
axes[1].step(sim_times, sim_states, where='post',
alpha=0.4, linewidth=1.5, color='#667eea')
axes[1].set_yticks([0, 1, 2])
axes[1].set_yticklabels(states_sir)
axes[1].set_xlabel('Time', fontsize=11)
axes[1].set_ylabel('State', fontsize=11)
axes[1].set_title('CTMC Simulation (Sample Paths)', fontsize=12, fontweight='bold')
axes[1].grid(alpha=0.3)
axes[1].set_xlim([0, T_max])
plt.tight_layout()
plt.show()
3.5 Properties of Poisson Process
š Definition: Poisson Process
A counting process \(\{N(t)\}_{t \geq 0}\) is called a Poisson process (with intensity \(\lambda\)) if:
A counting process \(\{N(t)\}_{t \geq 0}\) is called a Poisson process (with intensity \(\lambda\)) if:
- \(N(0) = 0\)
- Independent increments: increments over non-overlapping time intervals are independent
- Stationary increments: distribution of \(N(t+s) - N(s)\) depends only on \(t\)
- \(N(t) \sim Pois(\lambda t)\), i.e., \(P(N(t) = k) = \frac{(\lambda t)^k e^{-\lambda t}}{k!}\)
š» Code Example 5: Poisson Process Generation
# Poisson process simulation
np.random.seed(42)
lambda_rate = 2.0 # Intensity (average arrivals per unit time)
T_max = 10
def simulate_poisson_process(lambda_rate, T_max):
"""Simulate Poisson process"""
arrival_times = []
current_time = 0
while current_time < T_max:
# Time until next arrival (exponential distribution)
inter_arrival_time = np.random.exponential(1/lambda_rate)
current_time += inter_arrival_time
if current_time < T_max:
arrival_times.append(current_time)
return np.array(arrival_times)
# Simulate multiple paths
n_paths = 10
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Sample paths
for i in range(n_paths):
arrivals = simulate_poisson_process(lambda_rate, T_max)
counts = np.arange(1, len(arrivals) + 1)
axes[0, 0].step(arrivals, counts, where='post', alpha=0.6, linewidth=2)
# Theoretical expected value E[N(t)] = λt
t_theory = np.linspace(0, T_max, 1000)
axes[0, 0].plot(t_theory, lambda_rate * t_theory, 'r--', linewidth=2.5,
label=f'E[N(t)] = λt = {lambda_rate}t')
axes[0, 0].set_xlabel('Time t', fontsize=11)
axes[0, 0].set_ylabel('Cumulative arrival count N(t)', fontsize=11)
axes[0, 0].set_title(f'Poisson Process (Ī»={lambda_rate})', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Distribution of inter-arrival times
all_inter_arrivals = []
for _ in range(1000):
arrivals = simulate_poisson_process(lambda_rate, T_max)
if len(arrivals) > 1:
inter_arrivals = np.diff(np.concatenate([[0], arrivals]))
all_inter_arrivals.extend(inter_arrivals)
all_inter_arrivals = np.array(all_inter_arrivals)
axes[0, 1].hist(all_inter_arrivals, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='Simulation')
# Theoretical distribution Exp(Ī»)
x = np.linspace(0, max(all_inter_arrivals), 1000)
theoretical_pdf = lambda_rate * np.exp(-lambda_rate * x)
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label=f'Exp({lambda_rate})')
axes[0, 1].set_xlabel('Inter-arrival time', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title('Distribution of Inter-arrival Times', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Distribution of arrival count at time t
t_fixed = 5
counts_at_t = []
for _ in range(5000):
arrivals = simulate_poisson_process(lambda_rate, t_fixed)
counts_at_t.append(len(arrivals))
counts_at_t = np.array(counts_at_t)
# Histogram
unique, counts = np.unique(counts_at_t, return_counts=True)
axes[1, 0].bar(unique, counts/len(counts_at_t), alpha=0.6,
color='#667eea', edgecolor='black', label='Simulation')
# Theoretical distribution Pois(λt)
k = np.arange(0, max(counts_at_t) + 1)
theoretical_pmf = stats.poisson.pmf(k, lambda_rate * t_fixed)
axes[1, 0].plot(k, theoretical_pmf, 'ro-', linewidth=2, markersize=6,
label=f'Pois({lambda_rate * t_fixed})')
axes[1, 0].set_xlabel(f'Arrival count at t={t_fixed}', fontsize=11)
axes[1, 0].set_ylabel('Probability', fontsize=11)
axes[1, 0].set_title(f'Distribution of Arrival Count at Time t={t_fixed}', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) Comparison at different intensities
lambdas = [0.5, 1.0, 2.0, 4.0]
colors = ['blue', 'green', 'orange', 'red']
for lam, color in zip(lambdas, colors):
arrivals = simulate_poisson_process(lam, T_max)
counts = np.arange(1, len(arrivals) + 1)
axes[1, 1].step(arrivals, counts, where='post', linewidth=2,
alpha=0.7, color=color, label=f'Ī»={lam}')
axes[1, 1].set_xlabel('Time t', fontsize=11)
axes[1, 1].set_ylabel('Cumulative arrival count N(t)', fontsize=11)
axes[1, 1].set_title('Poisson Process at Different Intensities', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("Verification of Poisson process:")
print(f"Theory: E[inter-arrival] = 1/Ī» = {1/lambda_rate:.4f}")
print(f"Observed: E[inter-arrival] = {np.mean(all_inter_arrivals):.4f}")
print(f"\nTheory: E[N({t_fixed})] = λt = {lambda_rate * t_fixed:.4f}")
print(f"Observed: E[N({t_fixed})] = {np.mean(counts_at_t):.4f}")
3.6 Distribution of Inter-Arrival Times
š» Code Example 6: Distribution of Inter-Arrival Times
# Distribution of k-th arrival time (Erlang distribution)
np.random.seed(42)
lambda_rate = 1.5
n_simulations = 5000
# k=1, 2, 3, 4th arrival times
k_values = [1, 2, 3, 4]
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()
for idx, k in enumerate(k_values):
# Simulation
arrival_times_k = []
for _ in range(n_simulations):
arrivals = simulate_poisson_process(lambda_rate, 20)
if len(arrivals) >= k:
arrival_times_k.append(arrivals[k-1])
arrival_times_k = np.array(arrival_times_k)
# Histogram
axes[idx].hist(arrival_times_k, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='Simulation')
# Theoretical distribution (Gamma distribution = Erlang distribution)
# T_k ~ Gamma(k, Ī»)
x = np.linspace(0, max(arrival_times_k), 1000)
theoretical_pdf = stats.gamma.pdf(x, a=k, scale=1/lambda_rate)
axes[idx].plot(x, theoretical_pdf, 'r-', linewidth=2.5,
label=f'Gamma({k}, {lambda_rate})')
axes[idx].set_xlabel(f'{k}th arrival time', fontsize=11)
axes[idx].set_ylabel('Density', fontsize=11)
axes[idx].set_title(f'Distribution of {k}th Arrival Time', fontsize=12, fontweight='bold')
axes[idx].legend()
axes[idx].grid(alpha=0.3)
# Statistics
theoretical_mean = k / lambda_rate
theoretical_var = k / lambda_rate**2
print(f"k={k}th arrival time:")
print(f" Theory: E[T{k}] = {theoretical_mean:.4f}, Var(T{k}) = {theoretical_var:.4f}")
print(f" Observed: E[T{k}] = {np.mean(arrival_times_k):.4f}, Var(T{k}) = {np.var(arrival_times_k):.4f}\n")
plt.tight_layout()
plt.show()
3.7 Application to Process Engineering (Failure Modeling)
š» Code Example 7: Application to Process Engineering (Failure Modeling)
# Manufacturing process failure modeling (application of Poisson process)
np.random.seed(42)
# Parameters
lambda_failure = 0.05 # Failure rate (0.05 times per hour = once every 20 hours)
T_operation = 1000 # Operation time (hours)
repair_time_mean = 2 # Mean repair time (hours)
# Simulation of failure times
failure_times = simulate_poisson_process(lambda_failure, T_operation)
n_failures = len(failure_times)
# Repair times (assuming lognormal distribution)
repair_times = np.random.lognormal(mean=np.log(repair_time_mean), sigma=0.3, size=n_failures)
# Calculate availability
total_repair_time = repair_times.sum()
uptime = T_operation - total_repair_time
availability = uptime / T_operation
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Failure times and downtime
time_grid = np.linspace(0, T_operation, 10000)
operational_status = np.ones_like(time_grid)
for i, (failure_time, repair_time) in enumerate(zip(failure_times, repair_times)):
end_repair = failure_time + repair_time
mask = (time_grid >= failure_time) & (time_grid < end_repair)
operational_status[mask] = 0
axes[0, 0].fill_between(time_grid, 0, operational_status,
color='green', alpha=0.6, label='Operating')
axes[0, 0].fill_between(time_grid, 0, 1-operational_status,
color='red', alpha=0.6, label='Downtime')
for failure_time in failure_times[:10]: # Display only first 10
axes[0, 0].axvline(failure_time, color='red', linestyle='--', alpha=0.5, linewidth=1)
axes[0, 0].set_xlabel('Time (hours)', fontsize=11)
axes[0, 0].set_ylabel('Status', fontsize=11)
axes[0, 0].set_title('Manufacturing Process Operational Status', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].set_ylim([0, 1.2])
axes[0, 0].grid(alpha=0.3)
# (2) Distribution of inter-failure times
if len(failure_times) > 1:
inter_failure_times = np.diff(failure_times)
axes[0, 1].hist(inter_failure_times, bins=20, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='Observed')
# Theoretical distribution Exp(Ī»)
x = np.linspace(0, max(inter_failure_times), 1000)
theoretical_pdf = lambda_failure * np.exp(-lambda_failure * x)
axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5,
label=f'Exp({lambda_failure})')
axes[0, 1].set_xlabel('Inter-failure time (hours)', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title('Distribution of Inter-failure Times (MTBF Analysis)', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Cumulative failure count
counts = np.arange(1, len(failure_times) + 1)
axes[1, 0].step(failure_times, counts, where='post', linewidth=2.5,
color='#667eea', label='Observed')
axes[1, 0].plot(time_grid, lambda_failure * time_grid, 'r--', linewidth=2,
label=f'E[N(t)] = {lambda_failure}t')
axes[1, 0].set_xlabel('Time (hours)', fontsize=11)
axes[1, 0].set_ylabel('Cumulative failure count', fontsize=11)
axes[1, 0].set_title('Cumulative Failure Count Evolution', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) Availability time evolution
window_size = 100 # Moving window size (hours)
time_points = np.arange(window_size, T_operation, 10)
availabilities = []
for t in time_points:
# Availability in interval [t-window_size, t]
start_time = t - window_size
failures_in_window = failure_times[(failure_times >= start_time) & (failure_times < t)]
repairs_in_window = repair_times[:(len(failures_in_window))]
total_downtime = repairs_in_window.sum()
avail = 1 - total_downtime / window_size
availabilities.append(avail)
axes[1, 1].plot(time_points, availabilities, linewidth=2, color='#667eea',
label='Moving window availability')
axes[1, 1].axhline(y=availability, color='red', linestyle='--', linewidth=2,
label=f'Overall availability = {availability:.4f}')
axes[1, 1].set_xlabel('Time (hours)', fontsize=11)
axes[1, 1].set_ylabel('Availability', fontsize=11)
axes[1, 1].set_title(f'Availability Time Evolution (window size={window_size}h)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
axes[1, 1].set_ylim([0, 1])
plt.tight_layout()
plt.show()
# Statistical summary
print("Failure modeling statistics:")
print("="*60)
print(f"Operation time: {T_operation} hours")
print(f"Number of failures: {n_failures} times")
print(f"Mean time between failures (MTBF): {T_operation / n_failures:.2f} hours")
print(f"Theoretical MTBF: {1 / lambda_failure:.2f} hours")
print(f"\nTotal repair time: {total_repair_time:.2f} hours")
print(f"Mean time to repair (MTTR): {repair_times.mean():.2f} hours")
print(f"\nAvailability: {availability:.4f} ({availability*100:.2f}%)")
print(f"Theoretical availability: {1 / (1 + lambda_failure * repair_time_mean):.4f}")
š” Note: Poisson processes are widely applied in failure modeling, quality control, and maintenance planning in manufacturing processes. Metrics such as MTBF (mean time between failures) = 1/Ī» and availability = MTBF / (MTBF + MTTR) are used for process optimization decision-making.
Exercises
š Exercise 1: Stationary Distribution of Markov Chain
Consider a Markov chain with the following transition probability matrix: \[P = \begin{pmatrix} 0.8 & 0.2 & 0 \\ 0.1 & 0.7 & 0.2 \\ 0 & 0.3 & 0.7 \end{pmatrix}\]
Consider a Markov chain with the following transition probability matrix: \[P = \begin{pmatrix} 0.8 & 0.2 & 0 \\ 0.1 & 0.7 & 0.2 \\ 0 & 0.3 & 0.7 \end{pmatrix}\]
- Calculate the stationary distribution Ļ as an eigenvalue problem
- Simulate 100 steps from different initial distributions and verify convergence to the stationary distribution
- Calculate \(P^{50}\) and confirm that each row converges to the stationary distribution
š Exercise 2: Random Walk Analysis
Simulate a one-dimensional random walk (p=0.6) for 1000 steps and:
Simulate a one-dimensional random walk (p=0.6) for 1000 steps and:
- Visualize the distribution of final positions with a histogram
- Compare the theoretical expected value E[X_n] = n(2p-1) with the observed value
- Verify that the standardized final position follows a normal distribution by the central limit theorem
š Exercise 3: Application of Poisson Process
Assume that inquiries to a call center follow a Poisson process (Ī»=5 [inquiries/hour]):
Assume that inquiries to a call center follow a Poisson process (Ī»=5 [inquiries/hour]):
- Calculate the probability of receiving 10 or more inquiries in one hour
- Calculate the probability that the waiting time until the next inquiry is 15 minutes or more
- Simulate the distribution of inquiry counts over 8 hours of operation and compare with the Poisson distribution