🌐 EN | šŸ‡ÆšŸ‡µ JP | Last sync: 2025-11-16

Chapter 3: Markov Processes and Poisson Processes

Markov Processes and Poisson Processes

3.1 Fundamentals of Markov Chains

šŸ“ Definition: Markov Chain
A stochastic process \(\{X_n\}_{n=0,1,2,\ldots}\) is called a Markov chain if it satisfies the following property: \[P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)\] This is called the Markov property (memorylessness): the future depends only on the current state and not on the past history. Transition probability: \[p_{ij} = P(X_{n+1} = j \mid X_n = i)\] Transition probability matrix: \[P = (p_{ij}), \quad \sum_{j} p_{ij} = 1 \text{ for all } i\]

šŸ’» Code Example 1: Discrete Markov Chain Simulation

import numpy as np import matplotlib.pyplot as plt from scipy.linalg import eig from scipy import stats # Example of 3-state Markov chain: Weather model (sunny, cloudy, rainy) states = ['Sunny', 'Cloudy', 'Rainy'] n_states = len(states) # Transition probability matrix # P[i,j] = P(tomorrow's weather=j | today's weather=i) P = np.array([ [0.7, 0.2, 0.1], # Sunny → Sunny/Cloudy/Rainy [0.3, 0.4, 0.3], # Cloudy → Sunny/Cloudy/Rainy [0.2, 0.3, 0.5] # Rainy → Sunny/Cloudy/Rainy ]) print("Transition probability matrix P:") print(P) print(f"\nSum of each row (probability sum=1): {P.sum(axis=1)}") # Markov chain simulation def simulate_markov_chain(P, initial_state, n_steps): """Simulate Markov chain""" n_states = P.shape[0] states_sequence = [initial_state] current_state = initial_state for _ in range(n_steps): # Stochastically select next state from current state next_state = np.random.choice(n_states, p=P[current_state]) states_sequence.append(next_state) current_state = next_state return states_sequence # Execute simulation np.random.seed(42) n_days = 100 initial_state = 0 # Start with sunny trajectory = simulate_markov_chain(P, initial_state, n_days) # Visualization fig, axes = plt.subplots(2, 1, figsize=(14, 8)) # (1) Time series of states time = np.arange(len(trajectory)) axes[0].step(time, trajectory, where='post', color='#667eea', linewidth=2) axes[0].set_yticks([0, 1, 2]) axes[0].set_yticklabels(states) axes[0].set_xlabel('Days', fontsize=11) axes[0].set_ylabel('Weather', fontsize=11) axes[0].set_title('Weather Evolution by Markov Chain', fontsize=12, fontweight='bold') axes[0].grid(alpha=0.3) axes[0].set_xlim([0, n_days]) # (2) Frequency of each state state_counts = [trajectory.count(i) for i in range(n_states)] state_freqs = np.array(state_counts) / len(trajectory) x_pos = np.arange(n_states) axes[1].bar(x_pos, state_freqs, color='#667eea', alpha=0.7, edgecolor='black') axes[1].set_xticks(x_pos) axes[1].set_xticklabels(states) axes[1].set_ylabel('Relative Frequency', fontsize=11) axes[1].set_title('Frequency of Each State', fontsize=12, fontweight='bold') axes[1].grid(axis='y', alpha=0.3) plt.tight_layout() plt.show() print(f"\nFrequency of each state:") for i, state in enumerate(states): print(f" {state}: {state_freqs[i]:.4f} ({state_counts[i]}/{len(trajectory)} days)")
Transition probability matrix P: [[0.7 0.2 0.1] [0.3 0.4 0.3] [0.2 0.3 0.5]] Sum of each row (probability sum=1): [1. 1. 1.] Frequency of each state: Sunny: 0.4653 (47/101 days) Cloudy: 0.2970 (30/101 days) Rainy: 0.2376 (24/101 days)

3.2 Transition Probability Matrix and Stationary Distribution

šŸ“Š Theorem: Stationary Distribution
A probability vector \(\pi = (\pi_1, \pi_2, \ldots, \pi_n)\) is called a stationary distribution if: \[\pi P = \pi, \quad \sum_i \pi_i = 1, \quad \pi_i \geq 0\] The stationary distribution is the left eigenvector corresponding to eigenvalue 1 of the transition probability matrix \(P\). Ergodic theorem: An irreducible and aperiodic Markov chain converges to a unique stationary distribution \(\pi\): \[\lim_{n \to \infty} P^n = \begin{pmatrix} \pi \\ \pi \\ \vdots \\ \pi \end{pmatrix}\]

šŸ’» Code Example 2: Transition Probability Matrix and Stationary Distribution Calculation

# Calculate stationary distribution (solve as eigenvalue problem) def compute_stationary_distribution(P): """Calculate stationary distribution of transition probability matrix P""" # Calculate eigenvalues and eigenvectors of P^T eigenvalues, eigenvectors = eig(P.T) # Find eigenvector corresponding to eigenvalue 1 idx = np.argmin(np.abs(eigenvalues - 1)) stationary = np.real(eigenvectors[:, idx]) # Normalize (sum=1) stationary = stationary / stationary.sum() return stationary # Stationary distribution for weather model pi_stationary = compute_stationary_distribution(P) print("Stationary distribution calculation:") print("="*60) for i, state in enumerate(states): print(f" Ļ€({state}) = {pi_stationary[i]:.6f}") # Verification: Ļ€ * P = Ļ€ pi_P = pi_stationary @ P print(f"\nVerification: Ļ€ * P = Ļ€") print(f" Ļ€ * P = {pi_P}") print(f" Ļ€ = {pi_stationary}") print(f" Error = {np.linalg.norm(pi_P - pi_stationary):.10f}") # Long-term behavior: calculation of P^n n_steps = [1, 5, 10, 20, 50, 100] fig, axes = plt.subplots(2, 3, figsize=(15, 10)) axes = axes.flatten() for idx, n in enumerate(n_steps): P_n = np.linalg.matrix_power(P, n) # Heatmap im = axes[idx].imshow(P_n, cmap='Blues', vmin=0, vmax=1) axes[idx].set_xticks(range(n_states)) axes[idx].set_yticks(range(n_states)) axes[idx].set_xticklabels(states, rotation=45) axes[idx].set_yticklabels(states) axes[idx].set_title(f'P^{n}', fontsize=12, fontweight='bold') # Display values for i in range(n_states): for j in range(n_states): text = axes[idx].text(j, i, f'{P_n[i, j]:.3f}', ha="center", va="center", color="black", fontsize=9) axes[idx].set_xlabel('Next State', fontsize=10) axes[idx].set_ylabel('Current State', fontsize=10) plt.colorbar(im, ax=axes, fraction=0.046, pad=0.04) plt.suptitle('Powers of Transition Probability Matrix P^n (Convergence to Stationary Distribution)', fontsize=14, fontweight='bold') plt.tight_layout() plt.show() # Convergence from different initial distributions initial_distributions = [ np.array([1, 0, 0]), # Start sunny np.array([0, 1, 0]), # Start cloudy np.array([0, 0, 1]), # Start rainy np.array([1/3, 1/3, 1/3]) # Uniform distribution ] fig, ax = plt.subplots(figsize=(12, 6)) for i, pi_0 in enumerate(initial_distributions): distributions = [pi_0] for n in range(50): pi_n = distributions[-1] @ P distributions.append(pi_n) distributions = np.array(distributions) for j, state in enumerate(states): ax.plot(distributions[:, j], linewidth=2, alpha=0.7, label=f'{states[i]} start → {state}' if i == 0 else None, linestyle=['-', '--', ':'][j]) ax.axhline(y=pi_stationary[0], color='red', linestyle='--', alpha=0.5, linewidth=2) ax.axhline(y=pi_stationary[1], color='green', linestyle='--', alpha=0.5, linewidth=2) ax.axhline(y=pi_stationary[2], color='blue', linestyle='--', alpha=0.5, linewidth=2) ax.set_xlabel('Step number n', fontsize=11) ax.set_ylabel('Probability', fontsize=11) ax.set_title('Convergence to Stationary Distribution from Different Initial Distributions', fontsize=12, fontweight='bold') ax.legend(fontsize=9, ncol=2) ax.grid(alpha=0.3) plt.tight_layout() plt.show()

3.3 Random Walk

šŸ“ Definition: Random Walk
A one-dimensional random walk is a Markov chain that moves right by 1 with probability \(p\) or left by 1 with probability \(1-p\): \[X_{n+1} = X_n + \epsilon_{n+1}, \quad \epsilon_{n+1} = \begin{cases} +1 & \text{w.p. } p \\ -1 & \text{w.p. } 1-p \end{cases}\] A symmetric random walk (\(p = 1/2\)) is a discrete model of diffusion phenomena in materials science.

šŸ’» Code Example 3: Random Walk

# Simulation of one-dimensional random walk np.random.seed(42) def random_walk_1d(n_steps, p=0.5, initial_pos=0): """One-dimensional random walk""" positions = [initial_pos] current_pos = initial_pos for _ in range(n_steps): step = 1 if np.random.rand() < p else -1 current_pos += step positions.append(current_pos) return np.array(positions) # Simulate multiple paths n_steps = 1000 n_paths = 100 p = 0.5 # Symmetric random walk fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # (1) Visualization of multiple paths for _ in range(50): path = random_walk_1d(n_steps, p) axes[0, 0].plot(path, alpha=0.2, linewidth=0.8, color='#667eea') axes[0, 0].axhline(y=0, color='red', linestyle='--', linewidth=2, label='Starting position') axes[0, 0].set_xlabel('Step number', fontsize=11) axes[0, 0].set_ylabel('Position', fontsize=11) axes[0, 0].set_title('Symmetric Random Walk (p=0.5)', fontsize=12, fontweight='bold') axes[0, 0].legend() axes[0, 0].grid(alpha=0.3) # (2) Distribution of final positions final_positions = [random_walk_1d(n_steps, p)[-1] for _ in range(n_paths)] axes[0, 1].hist(final_positions, bins=30, density=True, alpha=0.6, color='#667eea', edgecolor='black') # Theoretical distribution (approaches normal distribution by central limit theorem) # E[Xn] = 0 (symmetric), Var(Xn) = n (variance of each step = 1) x = np.linspace(min(final_positions), max(final_positions), 1000) theoretical_pdf = stats.norm.pdf(x, 0, np.sqrt(n_steps)) axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label='N(0, n)') axes[0, 1].set_xlabel('Final position', fontsize=11) axes[0, 1].set_ylabel('Density', fontsize=11) axes[0, 1].set_title(f'Distribution of Final Position (n={n_steps})', fontsize=12, fontweight='bold') axes[0, 1].legend() axes[0, 1].grid(alpha=0.3) # (3) Asymmetric random walk p_biased = 0.6 paths_biased = [random_walk_1d(n_steps, p_biased) for _ in range(50)] for path in paths_biased: axes[1, 0].plot(path, alpha=0.2, linewidth=0.8, color='#764ba2') axes[1, 0].axhline(y=0, color='red', linestyle='--', linewidth=2) axes[1, 0].set_xlabel('Step number', fontsize=11) axes[1, 0].set_ylabel('Position', fontsize=11) axes[1, 0].set_title(f'Asymmetric Random Walk (p={p_biased})', fontsize=12, fontweight='bold') axes[1, 0].grid(alpha=0.3) # (4) Comparison of mean displacement with theoretical values n_checkpoints = np.arange(0, n_steps+1, 10) mean_positions = [] std_positions = [] for n in n_checkpoints: if n == 0: mean_positions.append(0) std_positions.append(0) else: paths = [random_walk_1d(n, p_biased) for _ in range(n_paths)] final_pos = [path[-1] for path in paths] mean_positions.append(np.mean(final_pos)) std_positions.append(np.std(final_pos)) mean_positions = np.array(mean_positions) std_positions = np.array(std_positions) # Theoretical values theoretical_mean = n_checkpoints * (2*p_biased - 1) # E[Xn] = n(2p-1) theoretical_std = np.sqrt(n_checkpoints * 4*p_biased*(1-p_biased)) # Var(Xn) = 4np(1-p) axes[1, 1].plot(n_checkpoints, mean_positions, 'o-', color='#667eea', linewidth=2, markersize=4, label='Observed mean') axes[1, 1].plot(n_checkpoints, theoretical_mean, '--', color='red', linewidth=2, label='Theoretical mean') axes[1, 1].fill_between(n_checkpoints, theoretical_mean - theoretical_std, theoretical_mean + theoretical_std, alpha=0.2, color='red', label='Theoretical ±1σ') axes[1, 1].set_xlabel('Step number', fontsize=11) axes[1, 1].set_ylabel('Mean position', fontsize=11) axes[1, 1].set_title('Mean Displacement of Asymmetric Random Walk', fontsize=12, fontweight='bold') axes[1, 1].legend() axes[1, 1].grid(alpha=0.3) plt.tight_layout() plt.show() print(f"Symmetric random walk (p=0.5):") print(f" Theory: E[X{n_steps}] = 0, Var(X{n_steps}) = {n_steps}") print(f" Observed: Mean={np.mean(final_positions):.2f}, Variance={np.var(final_positions):.2f}") print(f"\nAsymmetric random walk (p={p_biased}):") print(f" Theory: E[X{n_steps}] = {theoretical_mean[-1]:.2f}, Var(X{n_steps}) = {theoretical_std[-1]**2:.2f}") print(f" Observed: Mean={mean_positions[-1]:.2f}, Standard deviation={std_positions[-1]:.2f}")

3.4 Continuous-Time Markov Process

šŸ“ Definition: Continuous-Time Markov Process
A continuous-time Markov process is a Markov process \(\{X(t)\}_{t \geq 0}\) with continuous time parameter \(t \geq 0\). Transition rate matrix \(Q\) (generator matrix): \[q_{ij} = \lim_{h \to 0} \frac{P(X(t+h) = j \mid X(t) = i)}{h} \quad (i \neq j)\] \[q_{ii} = -\sum_{j \neq i} q_{ij}\] Chapman-Kolmogorov equation: \[\frac{dP(t)}{dt} = Q P(t)\] Solution is \(P(t) = e^{Qt}\)

šŸ’» Code Example 4: Continuous-Time Markov Process

from scipy.linalg import expm # Example of continuous-time Markov process: simplified SIR infection model # States: Healthy (H), Infected (I), Recovered (R) states_sir = ['Healthy', 'Infected', 'Recovered'] # Transition rate matrix Q # Q[i,j] = transition rate from state i to state j (when i≠j) lambda_infection = 0.3 # Infection rate mu_recovery = 0.2 # Recovery rate Q = np.array([ [-lambda_infection, lambda_infection, 0], [0, -mu_recovery, mu_recovery], [0, 0, 0] # Recovery is absorbing state ]) print("Transition rate matrix Q:") print(Q) print(f"\nSum of each row (=0): {Q.sum(axis=1)}") # Time evolution P(t) = exp(Qt) times = np.linspace(0, 30, 100) initial_state = np.array([1, 0, 0]) # Start healthy probabilities = [] for t in times: P_t = expm(Q * t) prob_t = initial_state @ P_t probabilities.append(prob_t) probabilities = np.array(probabilities) # Visualization fig, axes = plt.subplots(1, 2, figsize=(14, 5)) # (1) Time evolution of probabilities for i, state in enumerate(states_sir): axes[0].plot(times, probabilities[:, i], linewidth=2.5, label=state) axes[0].set_xlabel('Time', fontsize=11) axes[0].set_ylabel('Probability', fontsize=11) axes[0].set_title('Continuous-Time Markov Process (SIR Model)', fontsize=12, fontweight='bold') axes[0].legend(fontsize=10) axes[0].grid(alpha=0.3) axes[0].set_ylim([0, 1]) # (2) Simulation (as jump process) def simulate_ctmc(Q, initial_state, T_max): """Simulate continuous-time Markov chain (Gillespie algorithm)""" n_states = Q.shape[0] current_state = initial_state current_time = 0 times = [current_time] states = [current_state] while current_time < T_max: # Sojourn time at current state (exponential distribution) rate = -Q[current_state, current_state] if rate == 0: # Absorbing state break waiting_time = np.random.exponential(1/rate) current_time += waiting_time if current_time > T_max: break # Select next state transition_probs = Q[current_state].copy() transition_probs[current_state] = 0 transition_probs /= transition_probs.sum() next_state = np.random.choice(n_states, p=transition_probs) times.append(current_time) states.append(next_state) current_state = next_state return times, states # Simulate multiple paths np.random.seed(42) n_simulations = 20 T_max = 30 for _ in range(n_simulations): sim_times, sim_states = simulate_ctmc(Q, 0, T_max) axes[1].step(sim_times, sim_states, where='post', alpha=0.4, linewidth=1.5, color='#667eea') axes[1].set_yticks([0, 1, 2]) axes[1].set_yticklabels(states_sir) axes[1].set_xlabel('Time', fontsize=11) axes[1].set_ylabel('State', fontsize=11) axes[1].set_title('CTMC Simulation (Sample Paths)', fontsize=12, fontweight='bold') axes[1].grid(alpha=0.3) axes[1].set_xlim([0, T_max]) plt.tight_layout() plt.show()

3.5 Properties of Poisson Process

šŸ“ Definition: Poisson Process
A counting process \(\{N(t)\}_{t \geq 0}\) is called a Poisson process (with intensity \(\lambda\)) if:
  1. \(N(0) = 0\)
  2. Independent increments: increments over non-overlapping time intervals are independent
  3. Stationary increments: distribution of \(N(t+s) - N(s)\) depends only on \(t\)
  4. \(N(t) \sim Pois(\lambda t)\), i.e., \(P(N(t) = k) = \frac{(\lambda t)^k e^{-\lambda t}}{k!}\)
Inter-arrival times: The waiting time between events in a Poisson process follows \(Exp(\lambda)\).

šŸ’» Code Example 5: Poisson Process Generation

# Poisson process simulation np.random.seed(42) lambda_rate = 2.0 # Intensity (average arrivals per unit time) T_max = 10 def simulate_poisson_process(lambda_rate, T_max): """Simulate Poisson process""" arrival_times = [] current_time = 0 while current_time < T_max: # Time until next arrival (exponential distribution) inter_arrival_time = np.random.exponential(1/lambda_rate) current_time += inter_arrival_time if current_time < T_max: arrival_times.append(current_time) return np.array(arrival_times) # Simulate multiple paths n_paths = 10 fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # (1) Sample paths for i in range(n_paths): arrivals = simulate_poisson_process(lambda_rate, T_max) counts = np.arange(1, len(arrivals) + 1) axes[0, 0].step(arrivals, counts, where='post', alpha=0.6, linewidth=2) # Theoretical expected value E[N(t)] = λt t_theory = np.linspace(0, T_max, 1000) axes[0, 0].plot(t_theory, lambda_rate * t_theory, 'r--', linewidth=2.5, label=f'E[N(t)] = λt = {lambda_rate}t') axes[0, 0].set_xlabel('Time t', fontsize=11) axes[0, 0].set_ylabel('Cumulative arrival count N(t)', fontsize=11) axes[0, 0].set_title(f'Poisson Process (λ={lambda_rate})', fontsize=12, fontweight='bold') axes[0, 0].legend() axes[0, 0].grid(alpha=0.3) # (2) Distribution of inter-arrival times all_inter_arrivals = [] for _ in range(1000): arrivals = simulate_poisson_process(lambda_rate, T_max) if len(arrivals) > 1: inter_arrivals = np.diff(np.concatenate([[0], arrivals])) all_inter_arrivals.extend(inter_arrivals) all_inter_arrivals = np.array(all_inter_arrivals) axes[0, 1].hist(all_inter_arrivals, bins=50, density=True, alpha=0.6, color='#667eea', edgecolor='black', label='Simulation') # Theoretical distribution Exp(λ) x = np.linspace(0, max(all_inter_arrivals), 1000) theoretical_pdf = lambda_rate * np.exp(-lambda_rate * x) axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label=f'Exp({lambda_rate})') axes[0, 1].set_xlabel('Inter-arrival time', fontsize=11) axes[0, 1].set_ylabel('Density', fontsize=11) axes[0, 1].set_title('Distribution of Inter-arrival Times', fontsize=12, fontweight='bold') axes[0, 1].legend() axes[0, 1].grid(alpha=0.3) # (3) Distribution of arrival count at time t t_fixed = 5 counts_at_t = [] for _ in range(5000): arrivals = simulate_poisson_process(lambda_rate, t_fixed) counts_at_t.append(len(arrivals)) counts_at_t = np.array(counts_at_t) # Histogram unique, counts = np.unique(counts_at_t, return_counts=True) axes[1, 0].bar(unique, counts/len(counts_at_t), alpha=0.6, color='#667eea', edgecolor='black', label='Simulation') # Theoretical distribution Pois(λt) k = np.arange(0, max(counts_at_t) + 1) theoretical_pmf = stats.poisson.pmf(k, lambda_rate * t_fixed) axes[1, 0].plot(k, theoretical_pmf, 'ro-', linewidth=2, markersize=6, label=f'Pois({lambda_rate * t_fixed})') axes[1, 0].set_xlabel(f'Arrival count at t={t_fixed}', fontsize=11) axes[1, 0].set_ylabel('Probability', fontsize=11) axes[1, 0].set_title(f'Distribution of Arrival Count at Time t={t_fixed}', fontsize=12, fontweight='bold') axes[1, 0].legend() axes[1, 0].grid(alpha=0.3) # (4) Comparison at different intensities lambdas = [0.5, 1.0, 2.0, 4.0] colors = ['blue', 'green', 'orange', 'red'] for lam, color in zip(lambdas, colors): arrivals = simulate_poisson_process(lam, T_max) counts = np.arange(1, len(arrivals) + 1) axes[1, 1].step(arrivals, counts, where='post', linewidth=2, alpha=0.7, color=color, label=f'λ={lam}') axes[1, 1].set_xlabel('Time t', fontsize=11) axes[1, 1].set_ylabel('Cumulative arrival count N(t)', fontsize=11) axes[1, 1].set_title('Poisson Process at Different Intensities', fontsize=12, fontweight='bold') axes[1, 1].legend() axes[1, 1].grid(alpha=0.3) plt.tight_layout() plt.show() print("Verification of Poisson process:") print(f"Theory: E[inter-arrival] = 1/λ = {1/lambda_rate:.4f}") print(f"Observed: E[inter-arrival] = {np.mean(all_inter_arrivals):.4f}") print(f"\nTheory: E[N({t_fixed})] = λt = {lambda_rate * t_fixed:.4f}") print(f"Observed: E[N({t_fixed})] = {np.mean(counts_at_t):.4f}")

3.6 Distribution of Inter-Arrival Times

šŸ’» Code Example 6: Distribution of Inter-Arrival Times

# Distribution of k-th arrival time (Erlang distribution) np.random.seed(42) lambda_rate = 1.5 n_simulations = 5000 # k=1, 2, 3, 4th arrival times k_values = [1, 2, 3, 4] fig, axes = plt.subplots(2, 2, figsize=(14, 10)) axes = axes.flatten() for idx, k in enumerate(k_values): # Simulation arrival_times_k = [] for _ in range(n_simulations): arrivals = simulate_poisson_process(lambda_rate, 20) if len(arrivals) >= k: arrival_times_k.append(arrivals[k-1]) arrival_times_k = np.array(arrival_times_k) # Histogram axes[idx].hist(arrival_times_k, bins=50, density=True, alpha=0.6, color='#667eea', edgecolor='black', label='Simulation') # Theoretical distribution (Gamma distribution = Erlang distribution) # T_k ~ Gamma(k, Ī») x = np.linspace(0, max(arrival_times_k), 1000) theoretical_pdf = stats.gamma.pdf(x, a=k, scale=1/lambda_rate) axes[idx].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label=f'Gamma({k}, {lambda_rate})') axes[idx].set_xlabel(f'{k}th arrival time', fontsize=11) axes[idx].set_ylabel('Density', fontsize=11) axes[idx].set_title(f'Distribution of {k}th Arrival Time', fontsize=12, fontweight='bold') axes[idx].legend() axes[idx].grid(alpha=0.3) # Statistics theoretical_mean = k / lambda_rate theoretical_var = k / lambda_rate**2 print(f"k={k}th arrival time:") print(f" Theory: E[T{k}] = {theoretical_mean:.4f}, Var(T{k}) = {theoretical_var:.4f}") print(f" Observed: E[T{k}] = {np.mean(arrival_times_k):.4f}, Var(T{k}) = {np.var(arrival_times_k):.4f}\n") plt.tight_layout() plt.show()

3.7 Application to Process Engineering (Failure Modeling)

šŸ’» Code Example 7: Application to Process Engineering (Failure Modeling)

# Manufacturing process failure modeling (application of Poisson process) np.random.seed(42) # Parameters lambda_failure = 0.05 # Failure rate (0.05 times per hour = once every 20 hours) T_operation = 1000 # Operation time (hours) repair_time_mean = 2 # Mean repair time (hours) # Simulation of failure times failure_times = simulate_poisson_process(lambda_failure, T_operation) n_failures = len(failure_times) # Repair times (assuming lognormal distribution) repair_times = np.random.lognormal(mean=np.log(repair_time_mean), sigma=0.3, size=n_failures) # Calculate availability total_repair_time = repair_times.sum() uptime = T_operation - total_repair_time availability = uptime / T_operation fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # (1) Failure times and downtime time_grid = np.linspace(0, T_operation, 10000) operational_status = np.ones_like(time_grid) for i, (failure_time, repair_time) in enumerate(zip(failure_times, repair_times)): end_repair = failure_time + repair_time mask = (time_grid >= failure_time) & (time_grid < end_repair) operational_status[mask] = 0 axes[0, 0].fill_between(time_grid, 0, operational_status, color='green', alpha=0.6, label='Operating') axes[0, 0].fill_between(time_grid, 0, 1-operational_status, color='red', alpha=0.6, label='Downtime') for failure_time in failure_times[:10]: # Display only first 10 axes[0, 0].axvline(failure_time, color='red', linestyle='--', alpha=0.5, linewidth=1) axes[0, 0].set_xlabel('Time (hours)', fontsize=11) axes[0, 0].set_ylabel('Status', fontsize=11) axes[0, 0].set_title('Manufacturing Process Operational Status', fontsize=12, fontweight='bold') axes[0, 0].legend() axes[0, 0].set_ylim([0, 1.2]) axes[0, 0].grid(alpha=0.3) # (2) Distribution of inter-failure times if len(failure_times) > 1: inter_failure_times = np.diff(failure_times) axes[0, 1].hist(inter_failure_times, bins=20, density=True, alpha=0.6, color='#667eea', edgecolor='black', label='Observed') # Theoretical distribution Exp(Ī») x = np.linspace(0, max(inter_failure_times), 1000) theoretical_pdf = lambda_failure * np.exp(-lambda_failure * x) axes[0, 1].plot(x, theoretical_pdf, 'r-', linewidth=2.5, label=f'Exp({lambda_failure})') axes[0, 1].set_xlabel('Inter-failure time (hours)', fontsize=11) axes[0, 1].set_ylabel('Density', fontsize=11) axes[0, 1].set_title('Distribution of Inter-failure Times (MTBF Analysis)', fontsize=12, fontweight='bold') axes[0, 1].legend() axes[0, 1].grid(alpha=0.3) # (3) Cumulative failure count counts = np.arange(1, len(failure_times) + 1) axes[1, 0].step(failure_times, counts, where='post', linewidth=2.5, color='#667eea', label='Observed') axes[1, 0].plot(time_grid, lambda_failure * time_grid, 'r--', linewidth=2, label=f'E[N(t)] = {lambda_failure}t') axes[1, 0].set_xlabel('Time (hours)', fontsize=11) axes[1, 0].set_ylabel('Cumulative failure count', fontsize=11) axes[1, 0].set_title('Cumulative Failure Count Evolution', fontsize=12, fontweight='bold') axes[1, 0].legend() axes[1, 0].grid(alpha=0.3) # (4) Availability time evolution window_size = 100 # Moving window size (hours) time_points = np.arange(window_size, T_operation, 10) availabilities = [] for t in time_points: # Availability in interval [t-window_size, t] start_time = t - window_size failures_in_window = failure_times[(failure_times >= start_time) & (failure_times < t)] repairs_in_window = repair_times[:(len(failures_in_window))] total_downtime = repairs_in_window.sum() avail = 1 - total_downtime / window_size availabilities.append(avail) axes[1, 1].plot(time_points, availabilities, linewidth=2, color='#667eea', label='Moving window availability') axes[1, 1].axhline(y=availability, color='red', linestyle='--', linewidth=2, label=f'Overall availability = {availability:.4f}') axes[1, 1].set_xlabel('Time (hours)', fontsize=11) axes[1, 1].set_ylabel('Availability', fontsize=11) axes[1, 1].set_title(f'Availability Time Evolution (window size={window_size}h)', fontsize=12, fontweight='bold') axes[1, 1].legend() axes[1, 1].grid(alpha=0.3) axes[1, 1].set_ylim([0, 1]) plt.tight_layout() plt.show() # Statistical summary print("Failure modeling statistics:") print("="*60) print(f"Operation time: {T_operation} hours") print(f"Number of failures: {n_failures} times") print(f"Mean time between failures (MTBF): {T_operation / n_failures:.2f} hours") print(f"Theoretical MTBF: {1 / lambda_failure:.2f} hours") print(f"\nTotal repair time: {total_repair_time:.2f} hours") print(f"Mean time to repair (MTTR): {repair_times.mean():.2f} hours") print(f"\nAvailability: {availability:.4f} ({availability*100:.2f}%)") print(f"Theoretical availability: {1 / (1 + lambda_failure * repair_time_mean):.4f}")
šŸ’” Note: Poisson processes are widely applied in failure modeling, quality control, and maintenance planning in manufacturing processes. Metrics such as MTBF (mean time between failures) = 1/Ī» and availability = MTBF / (MTBF + MTTR) are used for process optimization decision-making.

Exercises

šŸ“ Exercise 1: Stationary Distribution of Markov Chain
Consider a Markov chain with the following transition probability matrix: \[P = \begin{pmatrix} 0.8 & 0.2 & 0 \\ 0.1 & 0.7 & 0.2 \\ 0 & 0.3 & 0.7 \end{pmatrix}\]
  1. Calculate the stationary distribution π as an eigenvalue problem
  2. Simulate 100 steps from different initial distributions and verify convergence to the stationary distribution
  3. Calculate \(P^{50}\) and confirm that each row converges to the stationary distribution
šŸ“ Exercise 2: Random Walk Analysis
Simulate a one-dimensional random walk (p=0.6) for 1000 steps and:
  1. Visualize the distribution of final positions with a histogram
  2. Compare the theoretical expected value E[X_n] = n(2p-1) with the observed value
  3. Verify that the standardized final position follows a normal distribution by the central limit theorem
šŸ“ Exercise 3: Application of Poisson Process
Assume that inquiries to a call center follow a Poisson process (Ī»=5 [inquiries/hour]):
  1. Calculate the probability of receiving 10 or more inquiries in one hour
  2. Calculate the probability that the waiting time until the next inquiry is 15 minutes or more
  3. Simulate the distribution of inquiry counts over 8 hours of operation and compare with the Poisson distribution

Disclaimer