第1章:データ前処理基礎

特徴量エンジニアリングの基盤 - データの品質を高める

📖 読了時間: 20-25分 📊 難易度: 初級 💻 コード例: 12個 📝 演習問題: 5問

学習目標

この章を読むことで、以下を習得できます:


1.1 データ前処理の重要性

データ前処理とは

データ前処理(Data Preprocessing)は、生データを機械学習モデルに適した形式に変換するプロセスです。

「Garbage In, Garbage Out(GIGO)」- データの品質が、モデルの性能を決定します。

前処理が必要な理由

問題 影響 対処法
欠損値 学習エラー、偏った予測 補完、削除
外れ値 モデルの歪み、過学習 検出、変換、削除
スケール差 学習の不安定化 正規化、標準化
不要な特徴 次元の呪い、過学習 特徴選択、次元削減

データ前処理の全体像

graph TD A[生データ] --> B[欠損値処理] B --> C[外れ値処理] C --> D[スケーリング・正規化] D --> E[特徴量エンジニアリング] E --> F[特徴選択] F --> G[学習準備完了] style A fill:#ffebee style B fill:#fff3e0 style C fill:#f3e5f5 style D fill:#e3f2fd style E fill:#e8f5e9 style F fill:#fce4ec style G fill:#c8e6c9

実例:前処理の効果

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

# サンプルデータ生成(異なるスケールの特徴量)
np.random.seed(42)
n_samples = 1000

# 特徴量1: 年齢(20-60)
X1 = np.random.uniform(20, 60, n_samples)

# 特徴量2: 年収(300-1000万円)
X2 = np.random.uniform(300, 1000, n_samples)

# ターゲット: 年齢と年収に基づく
y = ((X1 > 40) & (X2 > 600)).astype(int)

# データフレーム作成
X = pd.DataFrame({'age': X1, 'income': X2})

# 訓練・テストデータ分割
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# 前処理なしでの学習
model_raw = LogisticRegression(random_state=42, max_iter=1000)
model_raw.fit(X_train, y_train)
acc_raw = accuracy_score(y_test, model_raw.predict(X_test))

# 前処理ありでの学習
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

model_scaled = LogisticRegression(random_state=42, max_iter=1000)
model_scaled.fit(X_train_scaled, y_train)
acc_scaled = accuracy_score(y_test, model_scaled.predict(X_test_scaled))

print("=== 前処理の効果比較 ===")
print(f"前処理なし: 精度 = {acc_raw:.3f}")
print(f"前処理あり: 精度 = {acc_scaled:.3f}")
print(f"改善: {(acc_scaled - acc_raw) * 100:.1f}%")

出力

=== 前処理の効果比較 ===
前処理なし: 精度 = 0.890
前処理あり: 精度 = 0.920
改善: 3.0%

重要: スケーリングにより、収束速度と精度が向上します。


1.2 欠損値処理

欠損値のタイプ

欠損値は発生メカニズムにより3種類に分類されます:

タイプ 説明
MCAR
(Missing Completely At Random)
完全にランダムに欠損 機器の故障によるデータ損失
MAR
(Missing At Random)
他の変数に依存して欠損 高齢者ほど年収を答えない
MNAR
(Missing Not At Random)
欠損値自体に意味がある 低所得者が年収を記入しない

欠損値の可視化

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 欠損値を含むサンプルデータ生成
np.random.seed(42)
n = 100

df = pd.DataFrame({
    'age': np.random.randint(20, 70, n),
    'income': np.random.randint(300, 1200, n),
    'score': np.random.uniform(0, 100, n)
})

# 意図的に欠損値を作成
# 年齢に10%の欠損
missing_age = np.random.choice(n, size=int(n * 0.1), replace=False)
df.loc[missing_age, 'age'] = np.nan

# 年収に20%の欠損(年齢依存)
missing_income = df[df['age'] > 50].sample(frac=0.4).index
df.loc[missing_income, 'income'] = np.nan

# スコアに15%の欠損
missing_score = np.random.choice(n, size=int(n * 0.15), replace=False)
df.loc[missing_score, 'score'] = np.nan

# 欠損値の確認
print("=== 欠損値の状況 ===")
print(df.isnull().sum())
print(f"\n欠損率:")
print((df.isnull().sum() / len(df) * 100).round(2))

# ヒートマップで可視化
plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
plt.title('欠損値パターンの可視化(黄色 = 欠損)', fontsize=14)
plt.xlabel('特徴量')
plt.tight_layout()
plt.show()

出力

=== 欠損値の状況 ===
age        10
income     16
score      15
dtype: int64

欠損率:
age        10.0
income     16.0
score      15.0
dtype: float64

削除法

行削除(Listwise Deletion)

# 欠損値を含む行を削除
df_droprows = df.dropna()

print(f"元のデータ: {len(df)}行")
print(f"削除後: {len(df_droprows)}行")
print(f"削除された行: {len(df) - len(df_droprows)}行 ({(1 - len(df_droprows)/len(df))*100:.1f}%)")

出力

元のデータ: 100行
削除後: 64行
削除された行: 36行 (36.0%)

注意: データ量が大幅に減少する可能性があります。

列削除

# 欠損率が30%以上の列を削除
threshold = 0.3
df_dropcols = df.loc[:, df.isnull().mean() < threshold]

print(f"元の特徴量: {df.shape[1]}個")
print(f"削除後: {df_dropcols.shape[1]}個")
print(f"\n削除された特徴量: {set(df.columns) - set(df_dropcols.columns)}")

補完法(Imputation)

単純補完

from sklearn.impute import SimpleImputer

# 平均値補完
imputer_mean = SimpleImputer(strategy='mean')
df_mean = pd.DataFrame(
    imputer_mean.fit_transform(df),
    columns=df.columns
)

# 中央値補完
imputer_median = SimpleImputer(strategy='median')
df_median = pd.DataFrame(
    imputer_median.fit_transform(df),
    columns=df.columns
)

# 最頻値補完
imputer_mode = SimpleImputer(strategy='most_frequent')
df_mode = pd.DataFrame(
    imputer_mode.fit_transform(df),
    columns=df.columns
)

# 定数補完
imputer_constant = SimpleImputer(strategy='constant', fill_value=0)
df_constant = pd.DataFrame(
    imputer_constant.fit_transform(df),
    columns=df.columns
)

print("=== 補完方法の比較 ===\n")
print(f"元データの年齢平均: {df['age'].mean():.2f}")
print(f"平均値補完後: {df_mean['age'].mean():.2f}")
print(f"中央値補完後: {df_median['age'].median():.2f}")
print(f"最頻値補完: {df_mode['age'].mode()[0]:.2f}")

K近傍法補完(KNN Imputer)

from sklearn.impute import KNNImputer

# KNN補完(k=5)
knn_imputer = KNNImputer(n_neighbors=5)
df_knn = pd.DataFrame(
    knn_imputer.fit_transform(df),
    columns=df.columns
)

print("\n=== KNN補完の詳細 ===")
print(f"欠損前の年齢平均: {df['age'].mean():.2f}")
print(f"KNN補完後の年齢平均: {df_knn['age'].mean():.2f}")

# 可視化:補完方法の比較
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
methods = [
    ('元データ', df),
    ('平均値補完', df_mean),
    ('中央値補完', df_median),
    ('最頻値補完', df_mode),
    ('定数補完', df_constant),
    ('KNN補完', df_knn)
]

for ax, (name, data) in zip(axes.flat, methods):
    ax.scatter(data['age'], data['income'], alpha=0.6)
    ax.set_xlabel('年齢')
    ax.set_ylabel('年収')
    ax.set_title(name)
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

多重代入法(Multiple Imputation)

from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# 多重代入法(MICE: Multivariate Imputation by Chained Equations)
mice_imputer = IterativeImputer(random_state=42, max_iter=10)
df_mice = pd.DataFrame(
    mice_imputer.fit_transform(df),
    columns=df.columns
)

print("=== 多重代入法(MICE)===")
print(f"補完前の欠損数: {df.isnull().sum().sum()}")
print(f"補完後の欠損数: {df_mice.isnull().sum().sum()}")
print(f"\n各特徴量の補完後の統計:")
print(df_mice.describe())

補完方法の選択ガイドライン

状況 推奨手法 理由
欠損率 < 5% 削除法 情報損失が少ない
数値、正規分布 平均値補完 分布を保持
数値、外れ値あり 中央値補完 ロバスト
カテゴリカル 最頻値補完 妥当な推定
特徴間に相関 KNN、MICE 関係性を利用
MNAR ドメイン知識活用 欠損自体が情報

1.3 外れ値処理

外れ値とは

外れ値(Outlier)は、他のデータと大きく異なる値で、測定エラーまたは真の異常値です。

外れ値検出方法

1. IQR法(四分位範囲)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# サンプルデータ(外れ値を含む)
np.random.seed(42)
data_normal = np.random.normal(50, 10, 95)
outliers = np.array([100, 105, 110, 0, -5])  # 外れ値
data = np.concatenate([data_normal, outliers])

# IQR法による外れ値検出
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1

lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers_iqr = (data < lower_bound) | (data > upper_bound)

print("=== IQR法による外れ値検出 ===")
print(f"Q1 (25%点): {Q1:.2f}")
print(f"Q3 (75%点): {Q3:.2f}")
print(f"IQR: {IQR:.2f}")
print(f"下限: {lower_bound:.2f}")
print(f"上限: {upper_bound:.2f}")
print(f"外れ値の数: {outliers_iqr.sum()}")
print(f"外れ値: {data[outliers_iqr]}")

# 可視化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# ボックスプロット
axes[0].boxplot(data, vert=True)
axes[0].axhline(y=lower_bound, color='r', linestyle='--',
                label=f'下限: {lower_bound:.1f}')
axes[0].axhline(y=upper_bound, color='r', linestyle='--',
                label=f'上限: {upper_bound:.1f}')
axes[0].set_ylabel('値')
axes[0].set_title('ボックスプロット(IQR法)', fontsize=14)
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# ヒストグラム
axes[1].hist(data, bins=20, alpha=0.7, edgecolor='black')
axes[1].axvline(x=lower_bound, color='r', linestyle='--', linewidth=2)
axes[1].axvline(x=upper_bound, color='r', linestyle='--', linewidth=2)
axes[1].set_xlabel('値')
axes[1].set_ylabel('頻度')
axes[1].set_title('ヒストグラム', fontsize=14)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

出力

=== IQR法による外れ値検出 ===
Q1 (25%点): 43.26
Q3 (75%点): 56.83
IQR: 13.57
下限: 22.90
上限: 77.19
外れ値の数: 5
外れ値: [100. 105. 110.   0.  -5.]

2. Z-scoreによる検出

from scipy import stats

# Z-scoreの計算
z_scores = np.abs(stats.zscore(data))
threshold = 3  # 通常、3を閾値とする

outliers_zscore = z_scores > threshold

print("\n=== Z-scoreによる外れ値検出 ===")
print(f"閾値: {threshold}")
print(f"外れ値の数: {outliers_zscore.sum()}")
print(f"外れ値のZ-score: {z_scores[outliers_zscore]}")
print(f"外れ値: {data[outliers_zscore]}")

# 可視化
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(range(len(data)), data, c=outliers_zscore,
            cmap='coolwarm', s=50, alpha=0.7, edgecolors='black')
plt.axhline(y=data.mean() + 3*data.std(), color='r',
            linestyle='--', label='+3σ')
plt.axhline(y=data.mean() - 3*data.std(), color='r',
            linestyle='--', label='-3σ')
plt.xlabel('インデックス')
plt.ylabel('値')
plt.title('データポイント(赤 = 外れ値)', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.scatter(range(len(z_scores)), z_scores,
            c=outliers_zscore, cmap='coolwarm', s=50,
            alpha=0.7, edgecolors='black')
plt.axhline(y=threshold, color='r', linestyle='--',
            label=f'閾値: {threshold}')
plt.xlabel('インデックス')
plt.ylabel('|Z-score|')
plt.title('Z-scoreの分布', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

3. Isolation Forestによる検出

from sklearn.ensemble import IsolationForest

# データを2次元に拡張
np.random.seed(42)
X = np.random.normal(50, 10, (95, 2))
X_outliers = np.array([[100, 100], [105, 105], [0, 0], [-5, -5], [110, 110]])
X_combined = np.vstack([X, X_outliers])

# Isolation Forest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
outliers_iso = iso_forest.fit_predict(X_combined)
# -1: 外れ値, 1: 正常値

print("\n=== Isolation Forestによる外れ値検出 ===")
print(f"外れ値の数: {(outliers_iso == -1).sum()}")
print(f"正常値の数: {(outliers_iso == 1).sum()}")

# 可視化
plt.figure(figsize=(10, 8))
plt.scatter(X_combined[outliers_iso == 1, 0],
            X_combined[outliers_iso == 1, 1],
            c='blue', label='正常値', alpha=0.6, s=50, edgecolors='black')
plt.scatter(X_combined[outliers_iso == -1, 0],
            X_combined[outliers_iso == -1, 1],
            c='red', label='外れ値', alpha=0.8, s=100,
            edgecolors='black', marker='X')
plt.xlabel('特徴量 1')
plt.ylabel('特徴量 2')
plt.title('Isolation Forestによる外れ値検出', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

外れ値の扱い方

1. 削除

# 外れ値を削除
data_cleaned = data[~outliers_iqr]

print(f"元のデータ: {len(data)}個")
print(f"削除後: {len(data_cleaned)}個")
print(f"平均(削除前): {data.mean():.2f}")
print(f"平均(削除後): {data_cleaned.mean():.2f}")

2. 変換(対数変換)

# 対数変換(正の値のみ)
data_positive = data[data > 0]
data_log = np.log1p(data_positive)  # log(1 + x) で0を回避

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].hist(data_positive, bins=20, alpha=0.7, edgecolor='black')
axes[0].set_xlabel('値')
axes[0].set_ylabel('頻度')
axes[0].set_title('元のデータ', fontsize=14)
axes[0].grid(True, alpha=0.3)

axes[1].hist(data_log, bins=20, alpha=0.7, edgecolor='black', color='orange')
axes[1].set_xlabel('log(値)')
axes[1].set_ylabel('頻度')
axes[1].set_title('対数変換後', fontsize=14)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

3. キャップ処理(Winsorization)

from scipy.stats import mstats

# Winsorization: 外れ値を上下限にキャップ
data_winsorized = mstats.winsorize(data, limits=[0.05, 0.05])

print("\n=== Winsorization(上下5%をキャップ)===")
print(f"元のデータ範囲: [{data.min():.2f}, {data.max():.2f}]")
print(f"処理後の範囲: [{data_winsorized.min():.2f}, {data_winsorized.max():.2f}]")

# 可視化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].boxplot(data)
axes[0].set_ylabel('値')
axes[0].set_title('元のデータ', fontsize=14)
axes[0].grid(True, alpha=0.3)

axes[1].boxplot(data_winsorized)
axes[1].set_ylabel('値')
axes[1].set_title('Winsorization後', fontsize=14)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

外れ値処理の選択ガイドライン

状況 推奨手法 理由
明らかなエラー 削除 データ品質向上
真の極端な値 保持またはキャップ 情報を維持
歪んだ分布 対数変換 正規分布に近づける
頑健性が必要 Winsorization 影響を抑える
多次元データ Isolation Forest 複雑なパターン検出

1.4 スケーリングと正規化

なぜスケーリングが必要か

特徴量のスケールが異なると、以下の問題が発生します:

1. StandardScaler(標準化)

標準化(Standardization)は、平均0、標準偏差1に変換します。

$$ z = \frac{x - \mu}{\sigma} $$

from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# サンプルデータ(異なるスケール)
np.random.seed(42)
data = pd.DataFrame({
    'age': np.random.randint(20, 70, 100),
    'income': np.random.randint(300, 1500, 100),
    'score': np.random.uniform(0, 100, 100)
})

print("=== 元のデータの統計 ===")
print(data.describe())

# StandardScalerの適用
scaler = StandardScaler()
data_scaled = pd.DataFrame(
    scaler.fit_transform(data),
    columns=data.columns
)

print("\n=== 標準化後のデータの統計 ===")
print(data_scaled.describe())

# 可視化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

for i, col in enumerate(data.columns):
    # 元のデータ
    axes[0, i].hist(data[col], bins=20, alpha=0.7, edgecolor='black')
    axes[0, i].set_xlabel(col)
    axes[0, i].set_ylabel('頻度')
    axes[0, i].set_title(f'{col} (元データ)', fontsize=12)
    axes[0, i].grid(True, alpha=0.3)

    # 標準化後
    axes[1, i].hist(data_scaled[col], bins=20, alpha=0.7,
                    edgecolor='black', color='orange')
    axes[1, i].set_xlabel(col)
    axes[1, i].set_ylabel('頻度')
    axes[1, i].set_title(f'{col} (標準化後)', fontsize=12)
    axes[1, i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

2. MinMaxScaler(正規化)

正規化(Normalization)は、値を指定範囲(通常[0, 1])にスケーリングします。

$$ x_{\text{norm}} = \frac{x - x_{\min}}{x_{\max} - x_{\min}} $$

from sklearn.preprocessing import MinMaxScaler

# MinMaxScalerの適用
minmax_scaler = MinMaxScaler(feature_range=(0, 1))
data_minmax = pd.DataFrame(
    minmax_scaler.fit_transform(data),
    columns=data.columns
)

print("=== MinMaxScaler(正規化)後の統計 ===")
print(data_minmax.describe())

# 可視化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for i, col in enumerate(data.columns):
    axes[i].hist(data_minmax[col], bins=20, alpha=0.7,
                 edgecolor='black', color='green')
    axes[i].set_xlabel(col)
    axes[i].set_ylabel('頻度')
    axes[i].set_title(f'{col} (MinMax: [0,1])', fontsize=12)
    axes[i].set_xlim(-0.1, 1.1)
    axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

3. RobustScaler(ロバストスケーリング)

RobustScalerは、中央値とIQRを使い、外れ値に頑健です。

$$ x_{\text{robust}} = \frac{x - \text{median}}{\text{IQR}} $$

from sklearn.preprocessing import RobustScaler

# 外れ値を含むデータ
data_with_outliers = data.copy()
data_with_outliers.loc[0:5, 'income'] = [5000, 5500, 6000, 100, 50, 10000]

# RobustScalerの適用
robust_scaler = RobustScaler()
data_robust = pd.DataFrame(
    robust_scaler.fit_transform(data_with_outliers),
    columns=data.columns
)

# 比較: StandardScaler vs RobustScaler
standard_scaler = StandardScaler()
data_standard = pd.DataFrame(
    standard_scaler.fit_transform(data_with_outliers),
    columns=data.columns
)

print("=== 外れ値を含むデータでの比較 ===")
print("\nStandardScaler:")
print(data_standard['income'].describe())
print("\nRobustScaler:")
print(data_robust['income'].describe())

# 可視化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

axes[0].boxplot(data_with_outliers['income'])
axes[0].set_ylabel('income')
axes[0].set_title('元データ(外れ値あり)', fontsize=12)
axes[0].grid(True, alpha=0.3)

axes[1].boxplot(data_standard['income'])
axes[1].set_ylabel('income (scaled)')
axes[1].set_title('StandardScaler', fontsize=12)
axes[1].grid(True, alpha=0.3)

axes[2].boxplot(data_robust['income'])
axes[2].set_ylabel('income (scaled)')
axes[2].set_title('RobustScaler(外れ値に頑健)', fontsize=12)
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

スケーラーの使い分けガイドライン

スケーラー 使用場面 長所 短所
StandardScaler 正規分布、外れ値なし 多くのアルゴリズムで標準 外れ値に敏感
MinMaxScaler 範囲が重要、ニューラルネット 解釈しやすい[0,1] 外れ値の影響大
RobustScaler 外れ値あり 外れ値に頑健 範囲が不定

アルゴリズムごとの推奨

アルゴリズム スケーリング必要? 推奨スケーラー
線形回帰 推奨 StandardScaler
ロジスティック回帰 必須 StandardScaler
SVM 必須 StandardScaler
KNN 必須 StandardScaler, MinMaxScaler
ニューラルネットワーク 必須 MinMaxScaler, StandardScaler
決定木 不要 -
ランダムフォレスト 不要 -
XGBoost 不要 -

1.5 実践例:完全な前処理パイプライン

データ準備

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# 実世界を模したデータ生成
np.random.seed(42)
n = 1000

# 特徴量の生成
df = pd.DataFrame({
    'age': np.random.randint(18, 80, n),
    'income': np.random.normal(500, 200, n),
    'credit_score': np.random.uniform(300, 850, n),
    'loan_amount': np.random.uniform(1000, 50000, n),
    'employment_years': np.random.randint(0, 40, n)
})

# ターゲット変数(ローン承認)
df['approved'] = (
    (df['credit_score'] > 600) &
    (df['income'] > 400) &
    (df['age'] > 25)
).astype(int)

# 意図的にデータ品質の問題を追加
# 1. 欠損値
missing_idx = np.random.choice(n, size=100, replace=False)
df.loc[missing_idx[:50], 'income'] = np.nan
df.loc[missing_idx[50:], 'credit_score'] = np.nan

# 2. 外れ値
outlier_idx = np.random.choice(n, size=20, replace=False)
df.loc[outlier_idx, 'loan_amount'] = df.loc[outlier_idx, 'loan_amount'] * 10

print("=== データの概要 ===")
print(df.head(10))
print(f"\n形状: {df.shape}")
print(f"\n欠損値:")
print(df.isnull().sum())
print(f"\n基本統計:")
print(df.describe())

前処理パイプラインの構築

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, RobustScaler

# 特徴量とターゲットの分離
X = df.drop('approved', axis=1)
y = df['approved']

# 訓練・テストデータ分割
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# パイプラインの定義
# 数値特徴量を2つのグループに分ける
sensitive_features = ['loan_amount']  # 外れ値に敏感
regular_features = ['age', 'income', 'credit_score', 'employment_years']

# パイプライン構築
preprocessor = ColumnTransformer(
    transformers=[
        # 通常の特徴量: 欠損値補完 → 標準化
        ('regular', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]), regular_features),

        # 外れ値に敏感な特徴量: 欠損値補完 → ロバストスケーリング
        ('sensitive', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', RobustScaler())
        ]), sensitive_features)
    ]
)

# 完全なパイプライン(前処理 + モデル)
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

print("=== パイプライン構造 ===")
print(full_pipeline)

モデルの学習と評価

# パイプラインの実行
full_pipeline.fit(X_train, y_train)

# 予測
y_pred = full_pipeline.predict(X_test)

# 評価
accuracy = accuracy_score(y_test, y_pred)

print("\n=== モデルの性能 ===")
print(f"精度: {accuracy:.3f}")
print(f"\n詳細レポート:")
print(classification_report(y_test, y_pred,
                           target_names=['Rejected', 'Approved']))

# 前処理なしとの比較
from sklearn.ensemble import RandomForestClassifier

# 前処理なし(欠損値を単純に削除)
X_train_raw = X_train.dropna()
y_train_raw = y_train[X_train.dropna().index]
X_test_raw = X_test.fillna(X_test.median())

model_raw = RandomForestClassifier(n_estimators=100, random_state=42)
model_raw.fit(X_train_raw, y_train_raw)
y_pred_raw = model_raw.predict(X_test_raw)
accuracy_raw = accuracy_score(y_test, y_pred_raw)

print(f"\n=== パイプライン vs 前処理なし ===")
print(f"パイプラインあり: {accuracy:.3f}")
print(f"前処理なし: {accuracy_raw:.3f}")
print(f"改善: {(accuracy - accuracy_raw) * 100:.1f}%")
print(f"\n訓練データサイズ:")
print(f"  パイプライン: {len(X_train)}行")
print(f"  前処理なし: {len(X_train_raw)}行({len(X_train) - len(X_train_raw)}行削除)")

前処理の詳細分析

# 前処理されたデータの取得
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)

# 特徴量名の取得
feature_names = regular_features + sensitive_features

print("\n=== 前処理後のデータ ===")
print(f"形状: {X_train_processed.shape}")
print(f"\n前処理後の統計(訓練データ):")
df_processed = pd.DataFrame(X_train_processed, columns=feature_names)
print(df_processed.describe())

# 可視化:前処理の効果
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
features_to_plot = ['age', 'income', 'loan_amount']

for i, feature in enumerate(features_to_plot):
    # 前処理前
    axes[0, i].hist(X_train[feature].dropna(), bins=30,
                    alpha=0.7, edgecolor='black')
    axes[0, i].set_xlabel(feature)
    axes[0, i].set_ylabel('頻度')
    axes[0, i].set_title(f'{feature} (前処理前)', fontsize=12)
    axes[0, i].grid(True, alpha=0.3)

    # 前処理後
    feature_idx = feature_names.index(feature)
    axes[1, i].hist(X_train_processed[:, feature_idx], bins=30,
                    alpha=0.7, edgecolor='black', color='orange')
    axes[1, i].set_xlabel(feature)
    axes[1, i].set_ylabel('頻度')
    axes[1, i].set_title(f'{feature} (前処理後)', fontsize=12)
    axes[1, i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

パイプラインの保存と再利用

import joblib

# パイプラインの保存
joblib.dump(full_pipeline, 'loan_approval_pipeline.pkl')
print("パイプラインを保存しました: loan_approval_pipeline.pkl")

# 読み込みと使用
loaded_pipeline = joblib.load('loan_approval_pipeline.pkl')

# 新しいデータでの予測
new_data = pd.DataFrame({
    'age': [35, 22, 50],
    'income': [700, 300, np.nan],  # 欠損値を含む
    'credit_score': [750, 550, 800],
    'loan_amount': [25000, 5000, 100000],  # 外れ値を含む
    'employment_years': [10, 1, 25]
})

predictions = loaded_pipeline.predict(new_data)
probabilities = loaded_pipeline.predict_proba(new_data)

print("\n=== 新しいデータでの予測 ===")
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
    print(f"\nサンプル {i+1}:")
    print(f"  予測: {'承認' if pred == 1 else '却下'}")
    print(f"  確率: 却下={prob[0]:.2%}, 承認={prob[1]:.2%}")

1.6 本章のまとめ

学んだこと

  1. データ前処理の重要性

    • データ品質がモデル性能を決定する
    • 前処理により精度とロバスト性が向上
  2. 欠損値処理

    • MCAR、MAR、MNARの3タイプ
    • 削除法、単純補完、KNN補完、多重代入法
    • 状況に応じた適切な手法の選択
  3. 外れ値処理

    • IQR法、Z-score、Isolation Forestによる検出
    • 削除、変換、キャップ処理での対処
    • ドメイン知識との組み合わせ
  4. スケーリングと正規化

    • StandardScaler: 平均0、標準偏差1
    • MinMaxScaler: 指定範囲にスケーリング
    • RobustScaler: 外れ値に頑健
    • アルゴリズムごとの使い分け
  5. パイプライン構築

    • 再現性のある前処理フロー
    • 訓練・テストでの一貫性
    • 本番環境への容易なデプロイ

前処理の原則

原則 説明
データ理解優先 可視化と統計で問題を把握してから処理
ドメイン知識活用 業務知識を前処理の判断に反映
データリーク防止 訓練データでfitし、テストデータでtransform
再現性確保 パイプラインで処理を標準化
段階的アプローチ 一度に多くを変更せず、効果を確認

次の章へ

第2章では、カテゴリカル変数のエンコーディングを学びます:


演習問題

問題1(難易度:easy)

欠損値の3つのタイプ(MCAR、MAR、MNAR)をそれぞれ説明し、具体例を挙げてください。

解答例

解答

  1. MCAR(Missing Completely At Random)

    • 説明: 欠損が完全にランダムで、他の変数と無関係
    • 例: センサーの故障により一部のデータが記録されない
  2. MAR(Missing At Random)

    • 説明: 欠損が観測されている他の変数に依存
    • 例: 高齢者ほど健康データの記入率が低い(年齢は観測済み)
  3. MNAR(Missing Not At Random)

    • 説明: 欠損が欠損値そのものに依存
    • 例: 低所得者が年収の記入を避ける(年収そのものが欠損の原因)

問題2(難易度:medium)

以下のデータに対して、IQR法を用いて外れ値を検出し、その数を報告してください。

data = np.array([12, 15, 14, 10, 8, 12, 15, 14, 100, 13, 12, 14, 15, -5, 11])
解答例
import numpy as np

data = np.array([12, 15, 14, 10, 8, 12, 15, 14, 100, 13, 12, 14, 15, -5, 11])

# IQR法
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1

lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers = (data < lower_bound) | (data > upper_bound)

print("=== IQR法による外れ値検出 ===")
print(f"Q1: {Q1}")
print(f"Q3: {Q3}")
print(f"IQR: {IQR}")
print(f"下限: {lower_bound}")
print(f"上限: {upper_bound}")
print(f"\n外れ値の数: {outliers.sum()}")
print(f"外れ値: {data[outliers]}")
print(f"正常値: {data[~outliers]}")

出力

=== IQR法による外れ値検出 ===
Q1: 11.5
Q3: 14.5
IQR: 3.0
下限: 7.0
上限: 19.0

外れ値の数: 2
外れ値: [100  -5]
正常値: [12 15 14 10  8 12 15 14 13 12 14 15 11]

問題3(難易度:medium)

StandardScalerとMinMaxScalerの違いを説明し、それぞれをどのような場面で使うべきか述べてください。

解答例

解答

StandardScaler(標準化)

MinMaxScaler(正規化)

使い分け

場面 推奨
正規分布に近いデータ StandardScaler
範囲が重要(例: [0, 1]が必須) MinMaxScaler
外れ値が少ない どちらでも可
外れ値が多い RobustScaler(または標準化)
ニューラルネットワーク MinMaxScaler(活性化関数の範囲に合わせる)
線形モデル、SVM StandardScaler

問題4(難易度:hard)

以下のデータに対して、欠損値処理と外れ値処理を含む完全な前処理パイプラインを構築してください。

import numpy as np
import pandas as pd

np.random.seed(42)
data = pd.DataFrame({
    'feature1': np.random.normal(50, 10, 100),
    'feature2': np.random.normal(100, 20, 100),
    'feature3': np.random.uniform(0, 1, 100)
})

# 欠損値を追加
data.loc[0:10, 'feature1'] = np.nan
data.loc[20:25, 'feature2'] = np.nan

# 外れ値を追加
data.loc[50, 'feature1'] = 200
data.loc[60, 'feature2'] = 500
解答例
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.impute import KNNImputer
from sklearn.compose import ColumnTransformer

np.random.seed(42)
data = pd.DataFrame({
    'feature1': np.random.normal(50, 10, 100),
    'feature2': np.random.normal(100, 20, 100),
    'feature3': np.random.uniform(0, 1, 100)
})

# 欠損値を追加
data.loc[0:10, 'feature1'] = np.nan
data.loc[20:25, 'feature2'] = np.nan

# 外れ値を追加
data.loc[50, 'feature1'] = 200
data.loc[60, 'feature2'] = 500

print("=== 前処理前のデータ ===")
print(data.describe())
print(f"\n欠損値:\n{data.isnull().sum()}")

# パイプラインの構築
# feature1, feature2: 欠損値あり、外れ値あり → KNN補完 + RobustScaler
# feature3: 問題なし → StandardScaler

preprocessor = ColumnTransformer(
    transformers=[
        ('features_with_issues', Pipeline([
            ('imputer', KNNImputer(n_neighbors=5)),
            ('scaler', RobustScaler())
        ]), ['feature1', 'feature2']),

        ('clean_features', Pipeline([
            ('scaler', StandardScaler())
        ]), ['feature3'])
    ]
)

# 前処理の実行
data_processed = preprocessor.fit_transform(data)

print("\n=== 前処理後のデータ ===")
df_processed = pd.DataFrame(
    data_processed,
    columns=['feature1', 'feature2', 'feature3']
)
print(df_processed.describe())
print(f"\n欠損値: {df_processed.isnull().sum().sum()}")

# 可視化
import matplotlib.pyplot as plt

fig, axes = plt.subplots(2, 3, figsize=(15, 10))

for i, col in enumerate(['feature1', 'feature2', 'feature3']):
    # 前処理前
    axes[0, i].boxplot(data[col].dropna())
    axes[0, i].set_ylabel(col)
    axes[0, i].set_title(f'{col} (前処理前)', fontsize=12)
    axes[0, i].grid(True, alpha=0.3)

    # 前処理後
    axes[1, i].boxplot(df_processed[col])
    axes[1, i].set_ylabel(col)
    axes[1, i].set_title(f'{col} (前処理後)', fontsize=12)
    axes[1, i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n✓ パイプライン構築完了")
print("✓ 欠損値補完完了(KNN, k=5)")
print("✓ 外れ値に頑健なスケーリング完了(RobustScaler)")

問題5(難易度:hard)

訓練データとテストデータで別々にスケーリングを行うと、なぜデータリークが発生するのか説明してください。正しい方法も示してください。

解答例

解答

データリークが発生する理由

テストデータで別途スケーリングを行うと、テストデータの統計情報(平均、標準偏差など)を使用することになります。これは以下の問題を引き起こします:

  1. 未来の情報を使用: 本番環境では新しいデータの統計情報は事前に不明
  2. 評価の歪み: テストデータの情報を使うため、性能が過大評価される
  3. 再現性の欠如: 実際のデプロイ時に同じ変換ができない

誤った方法(データリークあり)

# ❌ 間違い
scaler_train = StandardScaler()
X_train_scaled = scaler_train.fit_transform(X_train)

scaler_test = StandardScaler()
X_test_scaled = scaler_test.fit_transform(X_test)  # テストデータでfit

正しい方法

# ✅ 正しい
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # 訓練データでfit
X_test_scaled = scaler.transform(X_test)  # 訓練データの統計で変換

実例で確認

import numpy as np
from sklearn.preprocessing import StandardScaler

# サンプルデータ
X_train = np.array([[1], [2], [3], [4], [5]])
X_test = np.array([[100], [200], [300]])

# 誤った方法
scaler_train = StandardScaler()
scaler_test = StandardScaler()
X_train_wrong = scaler_train.fit_transform(X_train)
X_test_wrong = scaler_test.fit_transform(X_test)

# 正しい方法
scaler = StandardScaler()
X_train_correct = scaler.fit_transform(X_train)
X_test_correct = scaler.transform(X_test)

print("=== 誤った方法(データリークあり)===")
print(f"訓練データの平均: {X_train_wrong.mean():.3f}")
print(f"テストデータの平均: {X_test_wrong.mean():.3f}")
print("→ 両方とも0に近い(独立にスケーリング)")

print("\n=== 正しい方法 ===")
print(f"訓練データの平均: {X_train_correct.mean():.3f}")
print(f"テストデータの平均: {X_test_correct.mean():.3f}")
print("→ テストデータは訓練データの統計で変換")
print(f"\nテストデータの値: {X_test_correct.flatten()}")
print("→ 訓練データの分布と比較して極端に大きい値(正しく検出)")

出力

=== 誤った方法(データリークあり)===
訓練データの平均: 0.000
テストデータの平均: 0.000
→ 両方とも0に近い(独立にスケーリング)

=== 正しい方法 ===
訓練データの平均: 0.000
テストデータの平均: 63.246
→ テストデータは訓練データの統計で変換

テストデータの値: [63.25 126.49 189.74]
→ 訓練データの分布と比較して極端に大きい値(正しく検出)

参考文献

  1. Géron, A. (2019). Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow (2nd ed.). O'Reilly Media.
  2. Kuhn, M., & Johnson, K. (2019). Feature Engineering and Selection: A Practical Approach for Predictive Models. CRC Press.
  3. Zheng, A., & Casari, A. (2018). Feature Engineering for Machine Learning. O'Reilly Media.
  4. Little, R. J., & Rubin, D. B. (2019). Statistical Analysis with Missing Data (3rd ed.). Wiley.

免責事項