本章では、PyTorch GeometricとMaterials Project APIを用いた実践的なワークフローを学びます。カスタムデータセットの作成、分散学習、GPU最適化、そして本番環境へのデプロイメントまで、実際のプロジェクトで必要となる技術を包括的に習得します。
Materials Projectは材料科学における最大級のオープンデータベースであり、148,000以上の結晶構造と物性データを提供しています。pymatgenライブラリとmp-apiを用いて、これらのデータを効率的に取得・処理する方法を学びます。
Materials Project APIを使用するには、無料のAPIキーが必要です。Materials Projectウェブサイトでアカウントを作成し、APIキーを取得してください。
# コード例1: Materials Project API認証と基本的なデータ取得
# Google Colabで実行可能
# 必要なライブラリのインストール
!pip install mp-api pymatgen -q
from mp_api.client import MPRester
from pymatgen.core import Structure
import pandas as pd
# APIキーを設定(ご自身のAPIキーに置き換えてください)
API_KEY = "your_api_key_here"
# MPResterを初期化
with MPRester(API_KEY) as mpr:
# ペロブスカイト構造(ABX3)の材料を検索
# 形成エネルギーが負(安定)で、バンドギャップが1-3 eVの材料
docs = mpr.materials.summary.search(
formula="*3", # ABX3形式
num_elements=(3, 3), # 3元素系
energy_above_hull=(0, 0.01), # ほぼ安定相
band_gap=(1.0, 3.0), # 半導体領域
fields=["material_id", "formula_pretty", "band_gap",
"energy_above_hull", "formation_energy_per_atom"]
)
# 結果をDataFrameに変換
data = []
for doc in docs:
data.append({
"material_id": doc.material_id,
"formula": doc.formula_pretty,
"band_gap": doc.band_gap,
"e_hull": doc.energy_above_hull,
"formation_energy": doc.formation_energy_per_atom
})
df = pd.DataFrame(data)
print(f"検索結果: {len(df)}件の材料")
print(df.head())
# 統計情報
print("\n=== 統計情報 ===")
print(f"バンドギャップ範囲: {df['band_gap'].min():.3f} - {df['band_gap'].max():.3f} eV")
print(f"形成エネルギー範囲: {df['formation_energy'].min():.3f} - {df['formation_energy'].max():.3f} eV/atom")
Materials Projectから取得した結晶構造は、pymatgenのStructureオブジェクトとして扱われます。これをCIF(Crystallographic Information File)形式で保存し、可視化や機械学習モデルへの入力として活用できます。
# コード例2: 結晶構造の取得とCIF形式での保存
# Google Colabで実行可能
from mp_api.client import MPRester
from pymatgen.io.cif import CifWriter
import os
API_KEY = "your_api_key_here"
# 保存ディレクトリの作成
os.makedirs("structures", exist_ok=True)
with MPRester(API_KEY) as mpr:
# 例: mp-1234(仮のMaterial ID)の結晶構造を取得
# 実際のMaterial IDに置き換えてください
structure = mpr.get_structure_by_material_id("mp-1234")
# 構造情報の表示
print("=== 結晶構造情報 ===")
print(f"化学式: {structure.composition.reduced_formula}")
print(f"空間群: {structure.get_space_group_info()}")
print(f"格子定数: {structure.lattice.abc}")
print(f"格子角度: {structure.lattice.angles}")
print(f"原子数: {len(structure)}")
print(f"体積: {structure.volume:.3f} ų")
# 原子サイト情報
print("\n=== 原子サイト ===")
for i, site in enumerate(structure):
print(f"Site {i+1}: {site.species_string} at {site.frac_coords}")
# CIF形式で保存
cif_writer = CifWriter(structure)
cif_writer.write_file(f"structures/mp-1234.cif")
print("\nCIFファイルを保存しました: structures/mp-1234.cif")
# 複数の材料をバッチ取得
material_ids = ["mp-1234", "mp-5678", "mp-9012"] # 実際のIDに置き換え
with MPRester(API_KEY) as mpr:
for mat_id in material_ids:
try:
structure = mpr.get_structure_by_material_id(mat_id)
cif_writer = CifWriter(structure)
cif_writer.write_file(f"structures/{mat_id}.cif")
print(f"✓ {mat_id}: {structure.composition.reduced_formula}")
except Exception as e:
print(f"✗ {mat_id}: エラー - {e}")
time.sleep()でレート制限を遵守し、バッチ処理を検討してください。
Materials Projectから取得したデータをPyTorch GeometricのDataオブジェクトに変換し、訓練用データセットを作成します。InMemoryDatasetクラスを継承して、効率的なデータローディングを実現します。
# コード例3: Materials Project構造をPyTorch Geometric Dataに変換
# Google Colabで実行可能(GPU推奨)
import torch
from torch_geometric.data import Data, InMemoryDataset
from pymatgen.core import Structure
from mp_api.client import MPRester
import numpy as np
from typing import List, Tuple
class StructureToGraph:
"""
pymatgen Structureオブジェクトをグラフ表現に変換
"""
def __init__(self, cutoff: float = 5.0):
"""
Args:
cutoff: 原子間距離のカットオフ半径(Å)
"""
self.cutoff = cutoff
def convert(self, structure: Structure) -> Data:
"""
Structure → PyG Data変換
Args:
structure: pymatgen Structureオブジェクト
Returns:
PyG Dataオブジェクト
"""
# ノード特徴量: 原子番号のワンホット表現(最大原子番号92: U)
atom_numbers = [site.specie.Z for site in structure]
x = torch.zeros((len(atom_numbers), 92))
for i, z in enumerate(atom_numbers):
x[i, z-1] = 1.0 # インデックスは0始まり
# エッジ構築: カットオフ半径内の原子ペア
edge_index = []
edge_attr = []
for i, site_i in enumerate(structure):
# 周期境界条件を考慮した近傍探索
neighbors = structure.get_neighbors(site_i, self.cutoff)
for neighbor in neighbors:
j = neighbor.index
distance = neighbor.nn_distance
# エッジ追加(無向グラフなので双方向)
edge_index.append([i, j])
# エッジ特徴量: 距離のガウス展開
edge_feature = self._gaussian_expansion(distance)
edge_attr.append(edge_feature)
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
edge_attr = torch.tensor(edge_attr, dtype=torch.float)
# グラフデータ作成
data = Data(
x=x,
edge_index=edge_index,
edge_attr=edge_attr
)
return data
def _gaussian_expansion(self, distance: float, num_centers: int = 41) -> np.ndarray:
"""
距離をガウス基底関数で展開
Args:
distance: 原子間距離(Å)
num_centers: ガウス基底の数
Returns:
展開係数ベクトル
"""
centers = np.linspace(0, self.cutoff, num_centers)
width = 0.5 # ガウス幅
gamma = -0.5 / (width ** 2)
return np.exp(gamma * (distance - centers) ** 2)
# 使用例
API_KEY = "your_api_key_here"
converter = StructureToGraph(cutoff=5.0)
with MPRester(API_KEY) as mpr:
structure = mpr.get_structure_by_material_id("mp-1234")
data = converter.convert(structure)
print("=== グラフ表現 ===")
print(f"ノード数: {data.x.size(0)}")
print(f"ノード特徴量次元: {data.x.size(1)}")
print(f"エッジ数: {data.edge_index.size(1)}")
print(f"エッジ特徴量次元: {data.edge_attr.size(1)}")
InMemoryDatasetを継承して、Materials Projectデータの前処理とキャッシングを自動化します。これにより、再実行時のデータ取得時間を大幅に削減できます。
# コード例4: Materials Project用カスタムInMemoryDataset
# Google Colabで実行可能(GPU推奨)
import os
import torch
from torch_geometric.data import InMemoryDataset, Data
from mp_api.client import MPRester
import pickle
class MaterialsProjectDataset(InMemoryDataset):
"""
Materials Projectから材料物性予測用データセットを作成
"""
def __init__(self, root, api_key, material_ids=None,
property_name="band_gap", cutoff=5.0,
transform=None, pre_transform=None, pre_filter=None):
"""
Args:
root: データセット保存ディレクトリ
api_key: Materials Project APIキー
material_ids: 取得するMaterial IDのリスト(Noneの場合は検索)
property_name: 予測対象物性('band_gap', 'formation_energy_per_atom'等)
cutoff: グラフ構築カットオフ半径(Å)
"""
self.api_key = api_key
self.material_ids = material_ids
self.property_name = property_name
self.converter = StructureToGraph(cutoff=cutoff)
super().__init__(root, transform, pre_transform, pre_filter)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return ['materials.pkl']
@property
def processed_file_names(self):
return ['data.pt']
def download(self):
"""
Materials Project APIからデータをダウンロード
"""
with MPRester(self.api_key) as mpr:
if self.material_ids is None:
# Material IDが指定されていない場合は検索
docs = mpr.materials.summary.search(
energy_above_hull=(0, 0.05), # 準安定相まで含む
num_elements=(1, 5), # 1-5元素系
fields=["material_id", self.property_name]
)
self.material_ids = [doc.material_id for doc in docs
if getattr(doc, self.property_name) is not None]
print(f"検索結果: {len(self.material_ids)}件の材料")
# 構造と物性データを取得
materials_data = []
for i, mat_id in enumerate(self.material_ids):
try:
structure = mpr.get_structure_by_material_id(mat_id)
doc = mpr.materials.summary.search(
material_ids=[mat_id],
fields=[self.property_name]
)[0]
property_value = getattr(doc, self.property_name)
materials_data.append({
'material_id': mat_id,
'structure': structure,
'property': property_value
})
if (i + 1) % 100 == 0:
print(f"ダウンロード進捗: {i+1}/{len(self.material_ids)}")
except Exception as e:
print(f"エラー ({mat_id}): {e}")
# 保存
os.makedirs(self.raw_dir, exist_ok=True)
with open(self.raw_paths[0], 'wb') as f:
pickle.dump(materials_data, f)
print(f"✓ ダウンロード完了: {len(materials_data)}件")
def process(self):
"""
Raw dataをPyG Data形式に変換
"""
# Raw dataの読み込み
with open(self.raw_paths[0], 'rb') as f:
materials_data = pickle.load(f)
# PyG Data形式に変換
data_list = []
for item in materials_data:
# グラフ変換
data = self.converter.convert(item['structure'])
# ラベル(物性値)を追加
data.y = torch.tensor([item['property']], dtype=torch.float)
data.material_id = item['material_id']
data_list.append(data)
# フィルタリング(オプション)
if self.pre_filter is not None:
data_list = [d for d in data_list if self.pre_filter(d)]
# 前処理(オプション)
if self.pre_transform is not None:
data_list = [self.pre_transform(d) for d in data_list]
# 保存
data, slices = self.collate(data_list)
torch.save((data, slices), self.processed_paths[0])
print(f"✓ 処理完了: {len(data_list)}件")
# 使用例
API_KEY = "your_api_key_here"
# データセット作成(初回は自動ダウンロード & 処理)
dataset = MaterialsProjectDataset(
root='./data/mp_band_gap',
api_key=API_KEY,
property_name='band_gap',
cutoff=5.0
)
print(f"データセットサイズ: {len(dataset)}")
print(f"サンプル: {dataset[0]}")
InMemoryDatasetは処理済みデータを自動保存します。2回目以降の実行では、保存されたデータを読み込むだけで高速に起動します。
大規模データセットや複雑なGNNモデルを効率的に訓練するため、PyTorchの分散学習機能とGPU最適化テクニックを活用します。
# コード例5: DataParallelによるマルチGPU並列学習
# Google Colab Pro/Pro+(複数GPU環境)で実行可能
import torch
import torch.nn as nn
from torch_geometric.nn import CGConv, global_mean_pool
from torch_geometric.loader import DataLoader
import time
class CGCNNModel(nn.Module):
"""
CGCNN(Crystal Graph Convolutional Neural Network)
"""
def __init__(self, atom_fea_len=92, nbr_fea_len=41,
hidden_dim=128, n_conv=3):
super(CGCNNModel, self).__init__()
# 原子埋め込み
self.atom_embedding = nn.Linear(atom_fea_len, hidden_dim)
# CGConv層
self.conv_layers = nn.ModuleList([
CGConv(hidden_dim, nbr_fea_len) for _ in range(n_conv)
])
self.bn_layers = nn.ModuleList([
nn.BatchNorm1d(hidden_dim) for _ in range(n_conv)
])
# 予測ヘッド
self.fc1 = nn.Linear(hidden_dim, 64)
self.fc2 = nn.Linear(64, 1)
self.activation = nn.Softplus()
def forward(self, data):
x, edge_index, edge_attr, batch = data.x, data.edge_index, data.edge_attr, data.batch
# 原子埋め込み
x = self.atom_embedding(x)
# CGConv層(残差接続付き)
for conv, bn in zip(self.conv_layers, self.bn_layers):
x_new = conv(x, edge_index, edge_attr)
x_new = bn(x_new)
x_new = self.activation(x_new)
x = x + x_new # 残差接続
# グラフレベルプーリング
x = global_mean_pool(x, batch)
# 予測
x = self.activation(self.fc1(x))
x = self.fc2(x)
return x.squeeze()
# マルチGPU学習
def train_multigpu(dataset, epochs=100, batch_size=64, lr=0.001):
"""
DataParallelによるマルチGPU並列学習
"""
# データローダー
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)
# モデル構築
model = CGCNNModel()
# GPUデバイス確認
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
gpu_count = torch.cuda.device_count()
print(f"使用可能GPU数: {gpu_count}")
if gpu_count > 1:
# マルチGPU並列化
model = nn.DataParallel(model)
print(f"DataParallelモード: {gpu_count}個のGPUを使用")
model = model.to(device)
# 最適化設定
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=10, verbose=True
)
# 学習ループ
for epoch in range(epochs):
model.train()
total_loss = 0
start_time = time.time()
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
output = model(batch)
loss = criterion(output, batch.y)
loss.backward()
optimizer.step()
total_loss += loss.item() * batch.num_graphs
avg_loss = total_loss / len(dataset)
epoch_time = time.time() - start_time
# 学習率調整
scheduler.step(avg_loss)
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Time: {epoch_time:.2f}s")
return model
# 実行例
# dataset = MaterialsProjectDataset(...) で作成したデータセット
# model = train_multigpu(dataset, epochs=200, batch_size=64)
PyTorchのtorch.cuda.ampを用いて、FP16(半精度浮動小数点)とFP32(単精度)を混合して学習します。メモリ使用量を削減し、学習速度を最大2倍高速化できます。
# コード例6: Mixed Precision Training(混合精度学習)
# Google Colab(GPU環境)で実行可能
import torch
from torch.cuda.amp import autocast, GradScaler
from torch_geometric.loader import DataLoader
import time
def train_mixed_precision(model, dataset, epochs=100, batch_size=64, lr=0.001):
"""
Mixed Precision Trainingによる高速学習
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# データローダー
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)
# 最適化設定
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# Gradient Scaler(勾配スケーリング)
scaler = GradScaler()
print("=== Mixed Precision Training開始 ===")
for epoch in range(epochs):
model.train()
total_loss = 0
start_time = time.time()
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
# Mixed Precision: FP16で順伝播
with autocast():
output = model(batch)
loss = criterion(output, batch.y)
# スケーリング付き逆伝播
scaler.scale(loss).backward()
# 勾配クリッピング(オプション)
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# パラメータ更新
scaler.step(optimizer)
scaler.update()
total_loss += loss.item() * batch.num_graphs
avg_loss = total_loss / len(dataset)
epoch_time = time.time() - start_time
if (epoch + 1) % 10 == 0:
# メモリ使用量表示
if torch.cuda.is_available():
memory_allocated = torch.cuda.memory_allocated() / 1024**3 # GB
memory_reserved = torch.cuda.memory_reserved() / 1024**3
print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Time={epoch_time:.2f}s, "
f"Memory={memory_allocated:.2f}GB/{memory_reserved:.2f}GB")
else:
print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Time={epoch_time:.2f}s")
return model
# 使用例
model = CGCNNModel()
# model = train_mixed_precision(model, dataset, epochs=200)
訓練済みモデルを保存し、後で推論に使用するためのベストプラクティスを学びます。
# コード例7: モデルチェックポイントの保存とロード
# Google Colabで実行可能
import torch
import os
from datetime import datetime
class ModelCheckpoint:
"""
モデルチェックポイント管理
"""
def __init__(self, save_dir='checkpoints', monitor='val_loss', mode='min'):
"""
Args:
save_dir: 保存ディレクトリ
monitor: 監視する指標('val_loss', 'val_mae'等)
mode: 'min'(最小化)または'max'(最大化)
"""
self.save_dir = save_dir
self.monitor = monitor
self.mode = mode
self.best_score = float('inf') if mode == 'min' else float('-inf')
os.makedirs(save_dir, exist_ok=True)
def save(self, model, optimizer, epoch, metrics, filename=None):
"""
チェックポイントを保存
Args:
model: PyTorchモデル
optimizer: オプティマイザー
epoch: 現在のエポック
metrics: メトリクス辞書(例: {'val_loss': 0.025, 'val_mae': 0.18})
filename: 保存ファイル名(Noneの場合は自動生成)
"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"checkpoint_epoch{epoch}_{timestamp}.pt"
filepath = os.path.join(self.save_dir, filename)
# チェックポイントデータ
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'metrics': metrics
}
torch.save(checkpoint, filepath)
print(f"✓ Checkpoint saved: {filepath}")
# ベストモデルの場合は別名でも保存
current_score = metrics.get(self.monitor)
if current_score is not None:
is_best = (self.mode == 'min' and current_score < self.best_score) or \
(self.mode == 'max' and current_score > self.best_score)
if is_best:
self.best_score = current_score
best_path = os.path.join(self.save_dir, 'best_model.pt')
torch.save(checkpoint, best_path)
print(f"✓ Best model updated: {self.monitor}={current_score:.4f}")
@staticmethod
def load(filepath, model, optimizer=None):
"""
チェックポイントをロード
Args:
filepath: チェックポイントファイルパス
model: ロード先モデル
optimizer: ロード先オプティマイザー(オプション)
Returns:
epoch, metrics
"""
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model_state_dict'])
if optimizer is not None and 'optimizer_state_dict' in checkpoint:
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint.get('epoch', 0)
metrics = checkpoint.get('metrics', {})
print(f"✓ Checkpoint loaded: {filepath}")
print(f" Epoch: {epoch}, Metrics: {metrics}")
return epoch, metrics
# 使用例: 学習ループ内でのチェックポイント保存
checkpoint_manager = ModelCheckpoint(save_dir='./checkpoints', monitor='val_mae', mode='min')
for epoch in range(100):
# 学習処理
train_loss = 0.0 # 実際の学習で計算
# 検証処理
val_loss = 0.0 # 実際の検証で計算
val_mae = 0.0
# 10エポックごとにチェックポイント保存
if (epoch + 1) % 10 == 0:
metrics = {
'train_loss': train_loss,
'val_loss': val_loss,
'val_mae': val_mae
}
checkpoint_manager.save(model, optimizer, epoch + 1, metrics)
# ベストモデルのロード
model_new = CGCNNModel()
checkpoint_manager.load('./checkpoints/best_model.pt', model_new)
ONNX(Open Neural Network Exchange)形式にエクスポートすることで、推論速度を最大化し、異なるフレームワーク(TensorFlow、C++等)でモデルを使用できます。
# コード例8: ONNXエクスポートと推論
# Google Colabで実行可能
import torch
import torch.onnx
from torch_geometric.data import Batch
def export_to_onnx(model, sample_data, onnx_path='model.onnx'):
"""
PyTorch GeometricモデルをONNX形式にエクスポート
Args:
model: PyTorchモデル
sample_data: サンプル入力データ(Data型)
onnx_path: 保存パス
"""
model.eval()
# サンプルデータをバッチ形式に変換
batch = Batch.from_data_list([sample_data])
# ダミー入力作成(ONNXエクスポートに必要)
dummy_input = (
batch.x,
batch.edge_index,
batch.edge_attr,
batch.batch
)
# ONNXエクスポート
torch.onnx.export(
model,
dummy_input,
onnx_path,
export_params=True,
opset_version=14,
do_constant_folding=True,
input_names=['x', 'edge_index', 'edge_attr', 'batch'],
output_names=['output'],
dynamic_axes={
'x': {0: 'num_nodes'},
'edge_index': {1: 'num_edges'},
'edge_attr': {0: 'num_edges'},
'batch': {0: 'num_nodes'}
}
)
print(f"✓ ONNX export completed: {onnx_path}")
# ONNXモデルの検証
import onnx
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)
print("✓ ONNX model validation passed")
# ONNX Runtime推論(高速推論)
def inference_onnx(onnx_path, data):
"""
ONNX Runtimeを用いた高速推論
Args:
onnx_path: ONNXモデルパス
data: 入力Data
Returns:
予測値
"""
import onnxruntime as ort
import numpy as np
# ONNX Runtimeセッション作成
ort_session = ort.InferenceSession(onnx_path)
# バッチ化
batch = Batch.from_data_list([data])
# NumPy配列に変換
ort_inputs = {
'x': batch.x.numpy(),
'edge_index': batch.edge_index.numpy(),
'edge_attr': batch.edge_attr.numpy(),
'batch': batch.batch.numpy()
}
# 推論
ort_outputs = ort_session.run(None, ort_inputs)
prediction = ort_outputs[0]
return prediction[0]
# 使用例
# model = CGCNNModel() # 訓練済みモデル
# sample_data = dataset[0] # サンプルデータ
# export_to_onnx(model, sample_data, 'cgcnn_model.onnx')
# prediction = inference_onnx('cgcnn_model.onnx', sample_data)
# print(f"ONNX予測: {prediction:.4f}")
訓練済みモデルをREST APIとして公開し、Webアプリケーションや他のシステムから利用可能にします。FastAPIを用いた実装例を示します。
# コード例9: FastAPI REST APIデプロイメント
# ローカル環境またはサーバーで実行
# requirements.txt:
# fastapi
# uvicorn
# torch
# torch-geometric
# pymatgen
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
from pymatgen.core import Structure
import json
app = FastAPI(title="Materials Property Prediction API")
# グローバル変数としてモデルをロード
MODEL = None
DEVICE = None
class CrystalInput(BaseModel):
"""
入力データスキーマ
"""
structure: dict # pymatgen Structure辞書表現
# または
cif_string: str = None # CIF文字列
class PredictionResponse(BaseModel):
"""
予測結果スキーマ
"""
prediction: float
uncertainty: float = None
material_id: str = None
@app.on_event("startup")
async def load_model():
"""
サーバー起動時にモデルをロード
"""
global MODEL, DEVICE
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# モデルロード
MODEL = CGCNNModel()
checkpoint = torch.load('checkpoints/best_model.pt', map_location=DEVICE)
MODEL.load_state_dict(checkpoint['model_state_dict'])
MODEL.to(DEVICE)
MODEL.eval()
print(f"✓ Model loaded on {DEVICE}")
@app.post("/predict", response_model=PredictionResponse)
async def predict_property(input_data: CrystalInput):
"""
材料物性予測エンドポイント
Args:
input_data: 結晶構造データ
Returns:
予測結果
"""
try:
# 構造データのパース
if input_data.cif_string:
structure = Structure.from_str(input_data.cif_string, fmt='cif')
else:
structure = Structure.from_dict(input_data.structure)
# グラフ変換
converter = StructureToGraph(cutoff=5.0)
data = converter.convert(structure)
data = data.to(DEVICE)
# 推論
with torch.no_grad():
prediction = MODEL(data).item()
return PredictionResponse(
prediction=prediction,
material_id=input_data.structure.get('material_id', 'unknown')
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/health")
async def health_check():
"""
ヘルスチェックエンドポイント
"""
return {
"status": "healthy",
"model_loaded": MODEL is not None,
"device": str(DEVICE)
}
@app.get("/")
async def root():
"""
APIルート
"""
return {
"message": "Materials Property Prediction API",
"endpoints": {
"POST /predict": "Predict material property from structure",
"GET /health": "Health check",
"GET /docs": "API documentation (Swagger UI)"
}
}
# サーバー起動コマンド:
# uvicorn api:app --host 0.0.0.0 --port 8000 --reload
# クライアント使用例(Python):
"""
import requests
import json
# CIF文字列(例)
cif_string = '''
data_mp-1234
_cell_length_a 3.905
_cell_length_b 3.905
_cell_length_c 3.905
_cell_angle_alpha 90.0
_cell_angle_beta 90.0
_cell_angle_gamma 90.0
_symmetry_space_group_name_H-M 'P 1'
loop_
_atom_site_label
_atom_site_type_symbol
_atom_site_fract_x
_atom_site_fract_y
_atom_site_fract_z
Ti1 Ti 0.0 0.0 0.0
O1 O 0.5 0.5 0.0
O2 O 0.5 0.0 0.5
O3 O 0.0 0.5 0.5
'''
# API呼び出し
response = requests.post(
'http://localhost:8000/predict',
json={'cif_string': cif_string}
)
result = response.json()
print(f"予測バンドギャップ: {result['prediction']:.3f} eV")
"""
http://localhost:8000/docsでインタラクティブなAPI文書を確認できます。
本章では、PyTorch GeometricとMaterials Project APIを用いた実践的なワークフローを学びました。カスタムデータセットの作成から本番デプロイメントまで、実際のプロジェクトで必要となる技術を包括的に習得しました。
InMemoryDataset継承による効率的なデータローディング、キャッシング機能DataParallelによるマルチGPU学習、Mixed Precision Training(1.5-2倍高速化、40%メモリ削減)本シリーズで学んだ知識を活用して、以下のような実践プロジェクトに挑戦してみましょう:
Materials Project APIを用いて、形成エネルギーが-2.0 eV/atom以下の安定な酸化物材料(O含有)を100件取得し、以下の統計情報を表示するコードを作成してください。
from mp_api.client import MPRester
import pandas as pd
from collections import Counter
API_KEY = "your_api_key_here"
with MPRester(API_KEY) as mpr:
docs = mpr.materials.summary.search(
elements=["O"], # 酸素含有
formation_energy_per_atom=(None, -2.0), # -2.0 eV/atom以下
num_elements=(2, 5), # 2-5元素系
fields=["material_id", "formula_pretty", "formation_energy_per_atom",
"elements", "symmetry"]
)
# 統計情報計算
formation_energies = [doc.formation_energy_per_atom for doc in docs]
all_elements = []
space_groups = []
for doc in docs:
all_elements.extend([str(el) for el in doc.elements])
space_groups.append(doc.symmetry.symbol)
print(f"=== 統計情報({len(docs)}件) ===")
print(f"平均形成エネルギー: {sum(formation_energies)/len(formation_energies):.3f} eV/atom")
print(f"\n元素頻度(上位10):")
for elem, count in Counter(all_elements).most_common(10):
print(f" {elem}: {count}回")
print(f"\n空間群分布(上位5):")
for sg, count in Counter(space_groups).most_common(5):
print(f" {sg}: {count}件")
Materials Projectから任意の材料(material_id指定)の結晶構造を取得し、CIF形式で保存後、再度ロードして原子サイト情報を表示するコードを作成してください。
from mp_api.client import MPRester
from pymatgen.io.cif import CifWriter, CifParser
API_KEY = "your_api_key_here"
material_id = "mp-1234" # 実際のIDに置き換え
# 1. データ取得とCIF保存
with MPRester(API_KEY) as mpr:
structure = mpr.get_structure_by_material_id(material_id)
cif_writer = CifWriter(structure)
cif_writer.write_file(f"{material_id}.cif")
print(f"✓ CIF保存: {material_id}.cif")
# 2. CIFロード
parser = CifParser(f"{material_id}.cif")
structure_loaded = parser.get_structures()[0]
# 3. 原子サイト情報表示
print(f"\n=== 原子サイト情報 ===")
print(f"化学式: {structure_loaded.composition.reduced_formula}")
for i, site in enumerate(structure_loaded):
print(f"Site {i+1}: {site.species_string} at fractional coords {site.frac_coords}")
コード例4のMaterialsProjectDatasetを拡張し、以下の機能を追加してください:
__len__()と__getitem__()メソッドの明示的実装statistics()メソッドの追加class MultiPropertyDataset(InMemoryDataset):
def __init__(self, root, api_key, property_names=['band_gap', 'formation_energy_per_atom'],
cutoff=5.0, transform=None, pre_transform=None, pre_filter=None):
self.api_key = api_key
self.property_names = property_names
self.converter = StructureToGraph(cutoff=cutoff)
super().__init__(root, transform, pre_transform, pre_filter)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return ['materials.pkl']
@property
def processed_file_names(self):
return ['data.pt']
def download(self):
with MPRester(self.api_key) as mpr:
docs = mpr.materials.summary.search(
energy_above_hull=(0, 0.05),
fields=["material_id"] + self.property_names
)
materials_data = []
for doc in docs:
try:
structure = mpr.get_structure_by_material_id(doc.material_id)
properties = {prop: getattr(doc, prop) for prop in self.property_names}
materials_data.append({
'material_id': doc.material_id,
'structure': structure,
'properties': properties
})
except:
pass
with open(self.raw_paths[0], 'wb') as f:
pickle.dump(materials_data, f)
def process(self):
with open(self.raw_paths[0], 'rb') as f:
materials_data = pickle.load(f)
data_list = []
for item in materials_data:
data = self.converter.convert(item['structure'])
# 複数物性をテンソル化
y = torch.tensor([item['properties'][prop] for prop in self.property_names],
dtype=torch.float)
data.y = y
data.material_id = item['material_id']
data_list.append(data)
data, slices = self.collate(data_list)
torch.save((data, slices), self.processed_paths[0])
def __len__(self):
return len(self.slices['x']) - 1
def __getitem__(self, idx):
data = self.get(idx)
return data
def statistics(self):
"""データセットの統計情報を返す"""
stats = {
'num_samples': len(self),
'properties': {}
}
for i, prop in enumerate(self.property_names):
values = [self[j].y[i].item() for j in range(len(self))]
stats['properties'][prop] = {
'mean': np.mean(values),
'std': np.std(values),
'min': np.min(values),
'max': np.max(values)
}
return stats
通常のFP32学習とMixed Precision Training(FP16)を比較し、以下を検証するコードを作成してください:
import torch
from torch.cuda.amp import autocast, GradScaler
import time
def compare_training_precision(model, dataset, epochs=50):
device = torch.device('cuda')
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
results = {}
# 1. FP32学習
print("=== FP32学習 ===")
model_fp32 = model.to(device)
optimizer = torch.optim.Adam(model_fp32.parameters(), lr=0.001)
criterion = torch.nn.MSELoss()
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
for epoch in range(epochs):
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
output = model_fp32(batch)
loss = criterion(output, batch.y)
loss.backward()
optimizer.step()
fp32_time = time.time() - start_time
fp32_memory = torch.cuda.max_memory_allocated() / 1024**3 # GB
results['fp32'] = {'time': fp32_time, 'memory': fp32_memory}
# 2. Mixed Precision学習
print("\n=== Mixed Precision学習 ===")
model_fp16 = model.to(device)
optimizer = torch.optim.Adam(model_fp16.parameters(), lr=0.001)
scaler = GradScaler()
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
for epoch in range(epochs):
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
with autocast():
output = model_fp16(batch)
loss = criterion(output, batch.y)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
fp16_time = time.time() - start_time
fp16_memory = torch.cuda.max_memory_allocated() / 1024**3
results['fp16'] = {'time': fp16_time, 'memory': fp16_memory}
# 結果表示
print("\n=== 比較結果 ===")
print(f"学習時間: FP32={fp32_time:.2f}s, FP16={fp16_time:.2f}s (高速化率: {fp32_time/fp16_time:.2f}x)")
print(f"GPU メモリ: FP32={fp32_memory:.2f}GB, FP16={fp16_memory:.2f}GB (削減率: {(1-fp16_memory/fp32_memory)*100:.1f}%)")
return results
学習を途中で中断し、保存されたチェックポイントから学習を再開するコードを作成してください。エポック番号、loss、optimizer状態を正しく復元してください。
import torch
def train_with_resume(model, dataset, total_epochs=100, checkpoint_path=None):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.MSELoss()
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
start_epoch = 0
# チェックポイントから再開
if checkpoint_path and os.path.exists(checkpoint_path):
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch']
print(f"✓ チェックポイントから再開: Epoch {start_epoch}")
# 学習ループ
for epoch in range(start_epoch, total_epochs):
model.train()
total_loss = 0
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
output = model(batch)
loss = criterion(output, batch.y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
print(f"Epoch {epoch+1}/{total_epochs}, Loss: {avg_loss:.4f}")
# 10エポックごとにチェックポイント保存
if (epoch + 1) % 10 == 0:
checkpoint = {
'epoch': epoch + 1,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': avg_loss
}
torch.save(checkpoint, f'checkpoint_epoch{epoch+1}.pt')
print(f"✓ Checkpoint saved")
return model
# 使用例
# model = train_with_resume(CGCNNModel(), dataset, total_epochs=100, checkpoint_path='checkpoint_epoch50.pt')
PyTorchネイティブ推論とONNX Runtime推論の速度を比較するベンチマークコードを作成してください。以下の条件で測定してください:
import torch
import onnxruntime as ort
import time
import numpy as np
from torch_geometric.data import DataLoader, Batch
def benchmark_inference(model, dataset, onnx_path, batch_sizes=[1, 32, 64, 128], n_iterations=100):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.eval()
# ONNX Runtime セッション
ort_session = ort.InferenceSession(onnx_path)
results = []
for batch_size in batch_sizes:
print(f"\n=== Batch Size: {batch_size} ===")
loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
# PyTorch推論
torch_times = []
for _ in range(n_iterations):
batch = next(iter(loader))
batch = batch.to(device)
start = time.time()
with torch.no_grad():
_ = model(batch)
torch.cuda.synchronize() if torch.cuda.is_available() else None
torch_times.append((time.time() - start) * 1000) # ms
torch_avg = np.mean(torch_times)
torch_throughput = batch_size * 1000 / torch_avg
# ONNX Runtime推論
onnx_times = []
for _ in range(n_iterations):
batch = next(iter(loader))
ort_inputs = {
'x': batch.x.numpy(),
'edge_index': batch.edge_index.numpy(),
'edge_attr': batch.edge_attr.numpy(),
'batch': batch.batch.numpy()
}
start = time.time()
_ = ort_session.run(None, ort_inputs)
onnx_times.append((time.time() - start) * 1000)
onnx_avg = np.mean(onnx_times)
onnx_throughput = batch_size * 1000 / onnx_avg
# 結果保存
results.append({
'batch_size': batch_size,
'pytorch_ms': torch_avg,
'onnx_ms': onnx_avg,
'speedup': torch_avg / onnx_avg,
'pytorch_throughput': torch_throughput,
'onnx_throughput': onnx_throughput
})
print(f"PyTorch: {torch_avg:.2f} ms/batch ({torch_throughput:.1f} samples/sec)")
print(f"ONNX: {onnx_avg:.2f} ms/batch ({onnx_throughput:.1f} samples/sec)")
print(f"高速化率: {torch_avg/onnx_avg:.2f}x")
return results
# 使用例
# results = benchmark_inference(model, dataset, 'cgcnn_model.onnx')
FastAPIのバックグラウンドタスク機能を用いて、複数の予測リクエストをバッチ処理する非同期APIを実装してください。以下の要件を満たしてください:
/result/{job_id}エンドポイントで結果を取得from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio
import uuid
from collections import defaultdict
import torch
app = FastAPI()
# グローバル状態
pending_requests = []
results_store = {}
MODEL = None
class PredictionRequest(BaseModel):
structure: dict
class JobResponse(BaseModel):
job_id: str
status: str
class ResultResponse(BaseModel):
job_id: str
prediction: float = None
status: str
async def batch_processor():
"""バックグラウンドでバッチ処理を実行"""
while True:
await asyncio.sleep(1.0) # 1秒ごとにバッチ処理
if len(pending_requests) == 0:
continue
# バッファされたリクエストを取得
batch_requests = pending_requests.copy()
pending_requests.clear()
# バッチ推論
job_ids = [req['job_id'] for req in batch_requests]
structures = [req['structure'] for req in batch_requests]
# グラフ変換(並列処理)
data_list = []
for structure in structures:
converter = StructureToGraph(cutoff=5.0)
data = converter.convert(Structure.from_dict(structure))
data_list.append(data)
# バッチ化
from torch_geometric.data import Batch
batch = Batch.from_data_list(data_list)
batch = batch.to('cuda' if torch.cuda.is_available() else 'cpu')
# 推論
with torch.no_grad():
predictions = MODEL(batch).cpu().numpy()
# 結果保存
for job_id, pred in zip(job_ids, predictions):
results_store[job_id] = {
'status': 'completed',
'prediction': float(pred)
}
@app.on_event("startup")
async def startup_event():
global MODEL
MODEL = CGCNNModel()
checkpoint = torch.load('checkpoints/best_model.pt')
MODEL.load_state_dict(checkpoint['model_state_dict'])
MODEL.eval()
# バックグラウンドタスク開始
asyncio.create_task(batch_processor())
@app.post("/predict/async", response_model=JobResponse)
async def predict_async(request: PredictionRequest):
"""非同期予測リクエスト"""
job_id = str(uuid.uuid4())
# リクエストをバッファに追加
pending_requests.append({
'job_id': job_id,
'structure': request.structure
})
# 結果ストアに初期状態を保存
results_store[job_id] = {'status': 'pending'}
return JobResponse(job_id=job_id, status='pending')
@app.get("/result/{job_id}", response_model=ResultResponse)
async def get_result(job_id: str):
"""結果取得"""
if job_id not in results_store:
return ResultResponse(job_id=job_id, status='not_found')
result = results_store[job_id]
return ResultResponse(
job_id=job_id,
prediction=result.get('prediction'),
status=result['status']
)
Monte Carlo Dropout(MCドロップアウト)を用いて、予測値の不確実性を推定するAPIを実装してください。以下を含めてください:
import torch
import torch.nn as nn
from torch_geometric.nn import CGConv, global_mean_pool
import numpy as np
class CGCNNWithDropout(nn.Module):
"""ドロップアウト層を持つCGCNN"""
def __init__(self, atom_fea_len=92, nbr_fea_len=41,
hidden_dim=128, n_conv=3, dropout=0.1):
super().__init__()
self.atom_embedding = nn.Linear(atom_fea_len, hidden_dim)
self.conv_layers = nn.ModuleList([
CGConv(hidden_dim, nbr_fea_len) for _ in range(n_conv)
])
self.bn_layers = nn.ModuleList([
nn.BatchNorm1d(hidden_dim) for _ in range(n_conv)
])
self.dropout = nn.Dropout(p=dropout)
self.fc1 = nn.Linear(hidden_dim, 64)
self.fc2 = nn.Linear(64, 1)
self.activation = nn.Softplus()
def forward(self, data):
x, edge_index, edge_attr, batch = data.x, data.edge_index, data.edge_attr, data.batch
x = self.atom_embedding(x)
for conv, bn in zip(self.conv_layers, self.bn_layers):
x_new = conv(x, edge_index, edge_attr)
x_new = bn(x_new)
x_new = self.activation(x_new)
x_new = self.dropout(x_new) # ドロップアウト
x = x + x_new
x = global_mean_pool(x, batch)
x = self.dropout(self.activation(self.fc1(x))) # ドロップアウト
x = self.fc2(x)
return x.squeeze()
def predict_with_uncertainty(self, data, n_samples=30):
"""
MC Dropoutによる不確実性推定
Returns:
mean, std (予測値の平均と標準偏差)
"""
self.train() # ドロップアウトを有効化
predictions = []
with torch.no_grad():
for _ in range(n_samples):
pred = self.forward(data).item()
predictions.append(pred)
mean = np.mean(predictions)
std = np.std(predictions)
return mean, std
# FastAPI統合
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
MODEL = None
class UncertaintyResponse(BaseModel):
prediction: float
uncertainty: float
confidence_interval_95: tuple
@app.post("/predict/uncertainty", response_model=UncertaintyResponse)
async def predict_with_uncertainty(request: CrystalInput):
"""不確実性推定付き予測"""
# 構造データのパース
structure = Structure.from_dict(request.structure)
# グラフ変換
converter = StructureToGraph(cutoff=5.0)
data = converter.convert(structure)
data = data.to('cuda' if torch.cuda.is_available() else 'cpu')
# MC Dropout推論
mean, std = MODEL.predict_with_uncertainty(data, n_samples=30)
# 95%信頼区間
ci_lower = mean - 1.96 * std
ci_upper = mean + 1.96 * std
return UncertaintyResponse(
prediction=mean,
uncertainty=std,
confidence_interval_95=(ci_lower, ci_upper)
)