This chapter focuses on practical applications of Implementation and Integration Applications. You will learn Build SPARQL endpoint APIs with Flask/FastAPI, Visualize knowledge graphs using NetworkX, and Root Cause Analysis (RCA) applications.
Learning Objectives
By reading this chapter, you will be able to:
- ✅ Build SPARQL endpoint APIs with Flask/FastAPI
- ✅ Visualize knowledge graphs using NetworkX and Plotly
- ✅ Auto-generate process documents with template engines
- ✅ Implement Root Cause Analysis (RCA) applications
- ✅ Build knowledge-based equipment recommendation systems
- ✅ Apply knowledge graphs to optimization problems
- ✅ Develop end-to-end systems integrating API + reasoning + visualization
5.1 SPARQL Endpoint API
Code Example 1: Flask-based SPARQL Query API
# Requirements:
# - Python 3.9+
# - flask>=2.3.0
"""
Example: Code Example 1: Flask-based SPARQL Query API
Purpose: Demonstrate core concepts and implementation patterns
Target: Intermediate
Execution time: 10-30 seconds
Dependencies: None
"""
from flask import Flask, request, jsonify
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import json
# Flask SPARQL endpoint API implementation
app = Flask(__name__)
# Global knowledge graph
PROC = Namespace("http://example.org/process#")
g = Graph()
g.bind("proc", PROC)
# Initialize sample data
def init_knowledge_graph():
"""Initialize the knowledge graph"""
# Equipment data
equipment_data = [
{"id": "P101", "type": "Pump", "flow": 50.0, "pressure": 500.0, "status": "Running"},
{"id": "P102", "type": "Pump", "flow": 45.0, "pressure": 480.0, "status": "Running"},
{"id": "R201", "type": "Reactor", "temp": 180.0, "pressure": 800.0, "status": "Running"},
{"id": "T301", "type": "Tank", "level": 75.0, "temp": 25.0, "status": "Running"},
]
for eq in equipment_data:
eq_uri = PROC[eq['id']]
g.add((eq_uri, RDF.type, PROC[eq['type']]))
g.add((eq_uri, PROC.hasStatus, Literal(eq['status'])))
if 'flow' in eq:
g.add((eq_uri, PROC.hasFlowRate, Literal(eq['flow'])))
if 'pressure' in eq:
g.add((eq_uri, PROC.hasPressure, Literal(eq['pressure'])))
if 'temp' in eq:
g.add((eq_uri, PROC.hasTemperature, Literal(eq['temp'])))
if 'level' in eq:
g.add((eq_uri, PROC.hasLevel, Literal(eq['level'])))
print(f"Knowledge graph initialization complete: {len(g)} triples")
# API endpoints
@app.route('/api/sparql', methods=['POST'])
def sparql_endpoint():
"""
SPARQL query endpoint
POST /api/sparql
Body: {"query": "SELECT ..."}
Returns:
JSON: Query results
"""
try:
data = request.get_json()
query_str = data.get('query', '')
if not query_str:
return jsonify({"error": "Query is empty"}), 400
# Execute SPARQL query
query = prepareQuery(query_str, initNs={"proc": PROC, "rdf": RDF, "rdfs": RDFS})
results = g.query(query)
# Convert results to JSON format
response = []
for row in results:
row_dict = {}
for var in results.vars:
value = row[var]
# Shorten URIs
if hasattr(value, 'toPython'):
row_dict[str(var)] = str(value).split('#')[-1]
else:
row_dict[str(var)] = str(value)
response.append(row_dict)
return jsonify({
"status": "success",
"count": len(response),
"results": response
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/equipment', methods=['GET'])
def get_all_equipment():
"""
Get all equipment list
GET /api/equipment
Returns:
JSON: Equipment list
"""
query = prepareQuery("""
SELECT ?equipment ?type ?status
WHERE {
?equipment a ?type .
?equipment proc:hasStatus ?status .
FILTER (strstarts(str(?type), str(proc:)))
}
""", initNs={"proc": PROC})
results = g.query(query)
equipment_list = []
for row in results:
equipment_list.append({
"id": str(row.equipment).split('#')[-1],
"type": str(row.type).split('#')[-1],
"status": str(row.status)
})
return jsonify({
"status": "success",
"count": len(equipment_list),
"equipment": equipment_list
}), 200
@app.route('/api/equipment/<eq_id>', methods=['GET'])
def get_equipment_details(eq_id):
"""
Get specific equipment details
GET /api/equipment/{eq_id}
Returns:
JSON: Equipment details
"""
eq_uri = PROC[eq_id]
# Get all properties of the equipment
query = prepareQuery("""
SELECT ?property ?value
WHERE {
?equipment ?property ?value .
}
""", initNs={"proc": PROC})
results = g.query(query, initBindings={'equipment': eq_uri})
if len(list(results)) == 0:
return jsonify({"error": f"Equipment {eq_id} not found"}), 404
# Re-query (iterator consumed)
results = g.query(query, initBindings={'equipment': eq_uri})
details = {"id": eq_id}
for row in results:
prop = str(row.property).split('#')[-1]
value = row.value.toPython() if hasattr(row.value, 'toPython') else str(row.value)
details[prop] = value
return jsonify({
"status": "success",
"equipment": details
}), 200
@app.route('/api/statistics', methods=['GET'])
def get_statistics():
"""
Get statistics information
GET /api/statistics
Returns:
JSON: Knowledge graph statistics
"""
stats = {
"total_triples": len(g),
"total_equipment": len(list(g.subjects(RDF.type, None))),
"equipment_by_type": {}
}
# Count by equipment type
query = prepareQuery("""
SELECT ?type (COUNT(?equipment) as ?count)
WHERE {
?equipment a ?type .
FILTER (strstarts(str(?type), str(proc:)))
}
GROUP BY ?type
""", initNs={"proc": PROC})
for row in g.query(query):
type_name = str(row.type).split('#')[-1]
stats['equipment_by_type'][type_name] = int(row['count'])
return jsonify({
"status": "success",
"statistics": stats
}), 200
# Demo execution (normally started with if __name__ == '__main__')
def demo_api():
"""API demonstration"""
print("="*60)
print("SPARQL Endpoint API")
print("="*60)
init_knowledge_graph()
print("\n【API Endpoints】")
print(" POST /api/sparql - Execute SPARQL query")
print(" GET /api/equipment - All equipment list")
print(" GET /api/equipment/{id} - Equipment details")
print(" GET /api/statistics - Statistics information")
print("\n【Sample Query】")
sample_query = """
SELECT ?equipment ?pressure
WHERE {
?equipment proc:hasPressure ?pressure .
FILTER (?pressure > 600)
}
"""
print(f" {sample_query.strip()}")
# Execute query (directly without API)
query = prepareQuery(sample_query, initNs={"proc": PROC})
results = g.query(query)
print("\n【Query Results】")
for row in results:
eq_id = str(row.equipment).split('#')[-1]
print(f" {eq_id}: Pressure {row.pressure} kPa")
print("\n【Flask Launch Command】")
print(" python app.py")
print(" or: flask run --port 5000")
# Run demo
demo_api()
# Flask application startup (for actual use)
# if __name__ == '__main__':
# init_knowledge_graph()
# app.run(debug=True, port=5000)
</eq_id>
Output Example:
============================================================
SPARQL Endpoint API
============================================================
Knowledge graph initialization complete: 16 triples
【API Endpoints】
POST /api/sparql - Execute SPARQL query
GET /api/equipment - All equipment list
GET /api/equipment/{id} - Equipment details
GET /api/statistics - Statistics information
【Sample Query】
SELECT ?equipment ?pressure
WHERE {
?equipment proc:hasPressure ?pressure .
FILTER (?pressure > 600)
}
【Query Results】
R201: Pressure 800.0 kPa
【Flask Launch Command】
python app.py
or: flask run --port 5000
Explanation: Build a SPARQL endpoint API with Flask to access the knowledge graph via RESTful API. Execute any SPARQL query with POST /api/sparql, and retrieve equipment information and statistics with other endpoints.
5.2 Knowledge Graph Visualization
Code Example 2: Graph Visualization with NetworkX and Plotly
# Requirements:
# - Python 3.9+
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
# - plotly>=5.14.0
import networkx as nx
import plotly.graph_objects as go
from rdflib import Graph, Namespace, RDF, RDFS, Literal
import numpy as np
# Knowledge graph visualization
PROC = Namespace("http://example.org/process#")
def create_process_graph():
"""Create process knowledge graph"""
g = Graph()
g.bind("proc", PROC)
# Process flow definition
# Feed → P-101 → R-201 → T-301 → Product
g.add((PROC.Feed, RDF.type, PROC.Stream))
g.add((PROC.P101, RDF.type, PROC.Pump))
g.add((PROC.R201, RDF.type, PROC.Reactor))
g.add((PROC.T301, RDF.type, PROC.Tank))
g.add((PROC.Product, RDF.type, PROC.Stream))
# Connection relationships
g.add((PROC.Feed, PROC.flowsTo, PROC.P101))
g.add((PROC.P101, PROC.flowsTo, PROC.R201))
g.add((PROC.R201, PROC.flowsTo, PROC.T301))
g.add((PROC.T301, PROC.flowsTo, PROC.Product))
# Properties
g.add((PROC.P101, PROC.hasFlowRate, Literal(50.0)))
g.add((PROC.P101, PROC.hasPressure, Literal(500.0)))
g.add((PROC.R201, PROC.hasTemperature, Literal(180.0)))
g.add((PROC.T301, PROC.hasLevel, Literal(75.0)))
return g
def rdf_to_networkx(rdf_graph):
"""
Convert RDF graph to NetworkX graph
Parameters:
rdf_graph (Graph): RDF graph
Returns:
nx.DiGraph: NetworkX directed graph
"""
nx_graph = nx.DiGraph()
# Add nodes and edges
for s, p, o in rdf_graph:
subj = str(s).split('#')[-1]
pred = str(p).split('#')[-1]
obj = str(o).split('#')[-1] if '#' in str(o) else str(o)
# Get node type
if p == RDF.type:
nx_graph.add_node(subj, node_type=obj)
elif p == PROC.flowsTo:
nx_graph.add_edge(subj, obj, relation='flowsTo')
else:
# Add property as node attribute
if subj in nx_graph:
nx_graph.nodes[subj][pred] = obj
else:
nx_graph.add_node(subj, **{pred: obj})
return nx_graph
def visualize_with_plotly(nx_graph):
"""
Visualize NetworkX graph with Plotly
Parameters:
nx_graph (nx.DiGraph): NetworkX graph
"""
# Calculate layout (hierarchical layout)
pos = nx.spring_layout(nx_graph, seed=42, k=1.5, iterations=50)
# Edge trace
edge_trace = go.Scatter(
x=[], y=[],
line=dict(width=2, color='#888'),
hoverinfo='none',
mode='lines',
showlegend=False
)
# Annotations for edge arrows
annotations = []
for edge in nx_graph.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_trace['x'] += tuple([x0, x1, None])
edge_trace['y'] += tuple([y0, y1, None])
# Arrow annotation
annotations.append(
dict(
ax=x0, ay=y0,
x=x1, y=y1,
xref='x', yref='y',
axref='x', ayref='y',
showarrow=True,
arrowhead=2,
arrowsize=1.5,
arrowwidth=2,
arrowcolor='#888'
)
)
# Node trace
node_trace = go.Scatter(
x=[], y=[],
mode='markers+text',
hoverinfo='text',
text=[],
textposition='top center',
marker=dict(
showscale=True,
colorscale='Greens',
size=[],
color=[],
colorbar=dict(
thickness=15,
title='Node Type',
xanchor='left',
titleside='right'
),
line=dict(width=2, color='white')
),
showlegend=False
)
# Add node information
node_colors = {'Stream': 0, 'Pump': 1, 'Reactor': 2, 'Tank': 3}
for node in nx_graph.nodes():
x, y = pos[node]
node_trace['x'] += tuple([x])
node_trace['y'] += tuple([y])
node_trace['text'] += tuple([node])
# Color based on node type
node_type = nx_graph.nodes[node].get('node_type', 'Unknown')
color_idx = node_colors.get(node_type, 0)
node_trace['marker']['color'] += tuple([color_idx])
node_trace['marker']['size'] += tuple([30])
# Hover information
hover_text = f"<b>{node}</b><br/>"
hover_text += f"Type: {node_type}<br/>"
for key, value in nx_graph.nodes[node].items():
if key != 'node_type':
hover_text += f"{key}: {value}<br/>"
if 'hovertext' not in node_trace:
node_trace['hovertext'] = []
node_trace['hovertext'].append(hover_text)
# Layout settings
layout = go.Layout(
title=dict(
text='Process Knowledge Graph Visualization',
x=0.5,
xanchor='center',
font=dict(size=20, color='#2c3e50')
),
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=60),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor='white',
width=900,
height=600
)
# Create graph
fig = go.Figure(data=[edge_trace, node_trace], layout=layout)
fig.show()
print("\n【Visualization Complete】")
print(f" Nodes: {nx_graph.number_of_nodes()}")
print(f" Edges: {nx_graph.number_of_edges()}")
# Run demo
print("="*60)
print("Knowledge Graph Visualization (NetworkX + Plotly)")
print("="*60)
rdf_g = create_process_graph()
print(f"\nRDF graph created: {len(rdf_g)} triples")
nx_g = rdf_to_networkx(rdf_g)
print(f"Converted to NetworkX graph: {nx_g.number_of_nodes()} nodes, {nx_g.number_of_edges()} edges")
print("\n【Node List】")
for node, data in nx_g.nodes(data=True):
print(f" {node}: {data}")
print("\n【Edge List】")
for u, v, data in nx_g.edges(data=True):
print(f" {u} → {v}: {data}")
# Visualize with Plotly
visualize_with_plotly(nx_g)
Output Example:
============================================================
Knowledge Graph Visualization (NetworkX + Plotly)
============================================================
RDF graph created: 13 triples
Converted to NetworkX graph: 5 nodes, 4 edges
【Node List】
Feed: {'node_type': 'Stream'}
P101: {'node_type': 'Pump', 'hasFlowRate': '50.0', 'hasPressure': '500.0'}
R201: {'node_type': 'Reactor', 'hasTemperature': '180.0'}
T301: {'node_type': 'Tank', 'hasLevel': '75.0'}
Product: {'node_type': 'Stream'}
【Edge List】
Feed → P101: {'relation': 'flowsTo'}
P101 → R201: {'relation': 'flowsTo'}
R201 → T301: {'relation': 'flowsTo'}
T301 → Product: {'relation': 'flowsTo'}
【Visualization Complete】
Nodes: 5
Edges: 4
Explanation: Convert RDF graphs to NetworkX graphs and visualize interactively with Plotly. Nodes represent process equipment, edges represent material flows, and hovering displays detailed information.
5.3 Automated Process Document Generation
Code Example 3: Template-based Automated Document Generation
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
from jinja2 import Template
from datetime import datetime
# Process document auto-generation system
PROC = Namespace("http://example.org/process#")
def create_process_documentation_kb():
"""Create knowledge base for document generation"""
g = Graph()
g.bind("proc", PROC)
# Process unit
g.add((PROC.PU100, RDF.type, PROC.ProcessUnit))
g.add((PROC.PU100, RDFS.label, Literal("Reaction Process Unit")))
g.add((PROC.PU100, PROC.hasDescription, Literal("Manufacturing of Product B through catalytic reaction of Raw Material A")))
# Equipment
equipment_data = [
{"id": "P101", "type": "Pump", "label": "Raw Material Feed Pump", "spec": "Centrifugal pump, 50 m³/h, 5 bar"},
{"id": "R201", "type": "Reactor", "label": "Catalytic Reactor", "spec": "CSTR, 10 m³, Catalyst: Pt/Al2O3"},
{"id": "HX301", "type": "HeatExchanger", "label": "Cooler", "spec": "Shell & Tube, 500 kW"},
{"id": "T401", "type": "Tank", "label": "Product Storage Tank", "spec": "Vertical cylindrical tank, 50 m³"},
]
for eq in equipment_data:
eq_uri = PROC[eq['id']]
g.add((eq_uri, RDF.type, PROC[eq['type']]))
g.add((eq_uri, RDFS.label, Literal(eq['label'])))
g.add((eq_uri, PROC.hasSpecification, Literal(eq['spec'])))
g.add((eq_uri, PROC.belongsTo, PROC.PU100))
# Operating parameters
g.add((PROC.R201, PROC.operatingTemperature, Literal(180.0)))
g.add((PROC.R201, PROC.operatingPressure, Literal(800.0)))
g.add((PROC.R201, PROC.operatingFlowRate, Literal(50.0)))
# Safety constraints
g.add((PROC.R201, PROC.maxTemperature, Literal(200.0)))
g.add((PROC.R201, PROC.maxPressure, Literal(1000.0)))
return g
def generate_process_document(g):
"""
Auto-generate process design document
Parameters:
g (Graph): Knowledge graph
Returns:
str: Markdown document
"""
# Get process unit information
query_pu = prepareQuery("""
SELECT ?pu ?label ?desc
WHERE {
?pu a proc:ProcessUnit .
?pu rdfs:label ?label .
?pu proc:hasDescription ?desc .
}
""", initNs={"proc": PROC, "rdfs": RDFS})
pu_info = list(g.query(query_pu))[0]
# Get equipment list
query_eq = prepareQuery("""
SELECT ?equipment ?type ?label ?spec
WHERE {
?equipment a ?type .
?equipment rdfs:label ?label .
?equipment proc:hasSpecification ?spec .
?equipment proc:belongsTo ?pu .
FILTER (strstarts(str(?type), str(proc:)))
FILTER (?type != proc:ProcessUnit)
}
ORDER BY ?equipment
""", initNs={"proc": PROC, "rdfs": RDFS})
equipment_list = []
for row in g.query(query_eq):
equipment_list.append({
'id': str(row.equipment).split('#')[-1],
'type': str(row.type).split('#')[-1],
'label': str(row.label),
'spec': str(row.spec)
})
# Get operating parameters
query_params = prepareQuery("""
SELECT ?equipment ?temp ?pressure ?flow
WHERE {
?equipment proc:operatingTemperature ?temp .
?equipment proc:operatingPressure ?pressure .
?equipment proc:operatingFlowRate ?flow .
}
""", initNs={"proc": PROC})
operating_params = {}
for row in g.query(query_params):
eq_id = str(row.equipment).split('#')[-1]
operating_params[eq_id] = {
'temperature': float(row.temp),
'pressure': float(row.pressure),
'flow_rate': float(row.flow)
}
# Jinja2 template
template_str = """# Process Design Document
**Document Number**: DOC-{{ doc_id }}
**Created**: {{ date }}
**Version**: 1.0
---
## 1. Process Overview
**Process Unit**: {{ pu_label }}
{{ pu_description }}
---
## 2. Major Equipment List
| Equipment ID | Equipment Name | Type | Specification |
|--------------|----------------|------|---------------|
{% for eq in equipment_list -%}
| {{ eq.id }} | {{ eq.label }} | {{ eq.type }} | {{ eq.spec }} |
{% endfor %}
---
## 3. Operating Conditions
{% for eq_id, params in operating_params.items() -%}
### {{ eq_id }}
- **Operating Temperature**: {{ params.temperature }} °C
- **Operating Pressure**: {{ params.pressure }} kPa
- **Operating Flow Rate**: {{ params.flow_rate }} m³/h
{% endfor %}
---
## 4. Safety Notes
{% if safety_notes -%}
{% for note in safety_notes -%}
- {{ note }}
{% endfor %}
{% else -%}
(To be automatically extracted from knowledge graph)
{% endif %}
---
## 5. Related Documents
- P&ID Drawing: DWG-PU100-001
- Process Flow Sheet: PFS-PU100
- Equipment Specifications: SPEC-{{ equipment_list[0].id }} ~ SPEC-{{ equipment_list[-1].id }}
---
*This document was automatically generated from the knowledge graph*
"""
# Populate template with values
template = Template(template_str)
document = template.render(
doc_id="PU100-DESIGN-001",
date=datetime.now().strftime("%Y-%m-%d"),
pu_label=str(pu_info.label),
pu_description=str(pu_info.desc),
equipment_list=equipment_list,
operating_params=operating_params,
safety_notes=None
)
return document
# Run demo
print("="*60)
print("Process Document Auto-generation System")
print("="*60)
g = create_process_documentation_kb()
print(f"\nKnowledge graph created: {len(g)} triples")
print("\n【Generating document...】")
document = generate_process_document(g)
print("\n" + "="*60)
print(document)
print("="*60)
# Save to file
with open("process_design_document.md", "w", encoding="utf-8") as f:
f.write(document)
print("\n【Saved】: process_design_document.md")
Output Example:
============================================================
Process Document Auto-generation System
============================================================
Knowledge graph created: 23 triples
【Generating document...】
============================================================
# Process Design Document
**Document Number**: DOC-PU100-DESIGN-001
**Created**: 2025-10-26
**Version**: 1.0
---
## 1. Process Overview
**Process Unit**: Reaction Process Unit
Manufacturing of Product B through catalytic reaction of Raw Material A
---
## 2. Major Equipment List
| Equipment ID | Equipment Name | Type | Specification |
|--------------|----------------|------|---------------|
| P101 | Raw Material Feed Pump | Pump | Centrifugal pump, 50 m³/h, 5 bar |
| R201 | Catalytic Reactor | Reactor | CSTR, 10 m³, Catalyst: Pt/Al2O3 |
| HX301 | Cooler | HeatExchanger | Shell & Tube, 500 kW |
| T401 | Product Storage Tank | Tank | Vertical cylindrical tank, 50 m³ |
---
## 3. Operating Conditions
### R201
- **Operating Temperature**: 180.0 °C
- **Operating Pressure**: 800.0 kPa
- **Operating Flow Rate**: 50.0 m³/h
---
## 4. Safety Notes
(To be automatically extracted from knowledge graph)
---
## 5. Related Documents
- P&ID Drawing: DWG-PU100-001
- Process Flow Sheet: PFS-PU100
- Equipment Specifications: SPEC-P101 ~ SPEC-T401
---
*This document was automatically generated from the knowledge graph*
============================================================
【Saved】: process_design_document.md
Explanation: Extract data from the knowledge graph using SPARQL and auto-generate Markdown documents with the Jinja2 template engine. Centrally manage equipment lists, operating conditions, and specifications to always generate up-to-date documents.
5.4 Root Cause Analysis (RCA) Application
Code Example 4: Knowledge-based Root Cause Analysis
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import pandas as pd
# Root Cause Analysis (RCA) system
PROC = Namespace("http://example.org/process#")
RCA = Namespace("http://example.org/rca#")
def create_rca_knowledge_base():
"""Create knowledge base for root cause analysis"""
g = Graph()
g.bind("proc", PROC)
g.bind("rca", RCA)
# Define causal relationships (Cause → Effect)
# Cause 1: Low suction pressure → Cavitation
g.add((RCA.LowSuctionPressure, RDF.type, RCA.RootCause))
g.add((RCA.LowSuctionPressure, RDFS.label, Literal("Low Suction Pressure")))
g.add((RCA.LowSuctionPressure, RCA.causes, RCA.Cavitation))
g.add((RCA.LowSuctionPressure, RCA.likelihood, Literal(0.85)))
g.add((RCA.Cavitation, RDF.type, RCA.Symptom))
g.add((RCA.Cavitation, RDFS.label, Literal("Cavitation")))
g.add((RCA.Cavitation, RCA.manifestsAs, Literal("Low Flow")))
g.add((RCA.Cavitation, RCA.manifestsAs, Literal("High Vibration")))
# Cause 2: Seal degradation → Leakage
g.add((RCA.SealDegradation, RDF.type, RCA.RootCause))
g.add((RCA.SealDegradation, RDFS.label, Literal("Mechanical Seal Degradation")))
g.add((RCA.SealDegradation, RCA.causes, RCA.Leakage))
g.add((RCA.SealDegradation, RCA.likelihood, Literal(0.90)))
g.add((RCA.Leakage, RDF.type, RCA.Symptom))
g.add((RCA.Leakage, RDFS.label, Literal("Leakage")))
g.add((RCA.Leakage, RCA.manifestsAs, Literal("Low Pressure")))
g.add((RCA.Leakage, RCA.manifestsAs, Literal("Liquid Leak Detection")))
# Cause 3: Bearing wear → Vibration
g.add((RCA.BearingWear, RDF.type, RCA.RootCause))
g.add((RCA.BearingWear, RDFS.label, Literal("Bearing Wear")))
g.add((RCA.BearingWear, RCA.causes, RCA.Vibration))
g.add((RCA.BearingWear, RCA.likelihood, Literal(0.75)))
g.add((RCA.Vibration, RDF.type, RCA.Symptom))
g.add((RCA.Vibration, RDFS.label, Literal("Abnormal Vibration")))
g.add((RCA.Vibration, RCA.manifestsAs, Literal("High Vibration")))
g.add((RCA.Vibration, RCA.manifestsAs, Literal("High Temperature")))
# Define remediation actions
g.add((RCA.LowSuctionPressure, RCA.remediation, Literal("Check NPSH, inspect suction piping")))
g.add((RCA.SealDegradation, RCA.remediation, Literal("Replace mechanical seal")))
g.add((RCA.BearingWear, RCA.remediation, Literal("Replace bearing, refill lubricant")))
return g
def perform_rca(g, observed_symptoms):
"""
Infer root causes from observed symptoms
Parameters:
g (Graph): RCA knowledge graph
observed_symptoms (list): List of observed symptoms
Returns:
list: List of root cause candidates
"""
candidates = []
# Reverse search causal relationships for each symptom
for symptom_label in observed_symptoms:
# Search for causes corresponding to symptom
query = prepareQuery("""
SELECT ?cause ?causeLabel ?symptom ?symptomLabel ?likelihood ?remediation
WHERE {
?cause a rca:RootCause .
?cause rdfs:label ?causeLabel .
?cause rca:causes ?symptom .
?symptom rdfs:label ?symptomLabel .
?symptom rca:manifestsAs ?manifestation .
?cause rca:likelihood ?likelihood .
OPTIONAL { ?cause rca:remediation ?remediation }
FILTER (str(?manifestation) = ?symptom_str)
}
""", initNs={"rca": RCA, "rdfs": RDFS})
results = g.query(query, initBindings={'symptom_str': Literal(symptom_label)})
for row in results:
candidates.append({
'root_cause': str(row.causeLabel),
'symptom': str(row.symptomLabel),
'observed': symptom_label,
'likelihood': float(row.likelihood),
'remediation': str(row.remediation) if row.remediation else "(Remediation undefined)"
})
# If same root cause corresponds to multiple symptoms, integrate likelihood
cause_scores = {}
for cand in candidates:
cause = cand['root_cause']
if cause not in cause_scores:
cause_scores[cause] = {
'symptoms': [],
'likelihood': cand['likelihood'],
'remediation': cand['remediation']
}
cause_scores[cause]['symptoms'].append(cand['observed'])
# Accumulate likelihood if multiple symptoms match
cause_scores[cause]['likelihood'] = min(1.0, cause_scores[cause]['likelihood'] + 0.1)
# Sort by likelihood
ranked_causes = sorted(
[{'cause': k, **v} for k, v in cause_scores.items()],
key=lambda x: x['likelihood'],
reverse=True
)
return ranked_causes
# Run demo
print("="*60)
print("Root Cause Analysis (RCA) System")
print("="*60)
g = create_rca_knowledge_base()
print(f"\nRCA knowledge base created: {len(g)} triples")
# Scenario 1: Suspected cavitation
print("\n" + "="*60)
print("【Scenario 1】P-101 Pump Anomaly")
print("="*60)
observed_symptoms_1 = ["Low Flow", "High Vibration"]
print(f"\nObserved symptoms: {observed_symptoms_1}")
print("\n【Running RCA...】")
causes_1 = perform_rca(g, observed_symptoms_1)
print("\n【Root Cause Analysis Results】")
for i, cause in enumerate(causes_1, 1):
print(f"\n{i}. {cause['cause']} (Likelihood: {cause['likelihood']*100:.1f}%)")
print(f" Related symptoms: {', '.join(cause['symptoms'])}")
print(f" Recommended action: {cause['remediation']}")
# Scenario 2: Suspected seal leakage
print("\n" + "="*60)
print("【Scenario 2】P-102 Pump Anomaly")
print("="*60)
observed_symptoms_2 = ["Low Pressure", "Liquid Leak Detection"]
print(f"\nObserved symptoms: {observed_symptoms_2}")
print("\n【Running RCA...】")
causes_2 = perform_rca(g, observed_symptoms_2)
print("\n【Root Cause Analysis Results】")
for i, cause in enumerate(causes_2, 1):
print(f"\n{i}. {cause['cause']} (Likelihood: {cause['likelihood']*100:.1f}%)")
print(f" Related symptoms: {', '.join(cause['symptoms'])}")
print(f" Recommended action: {cause['remediation']}")
# Convert to DataFrame
all_causes = causes_1 + causes_2
if all_causes:
df = pd.DataFrame(all_causes)
print("\n【All RCA Results Summary】")
print(df[['cause', 'likelihood', 'remediation']].drop_duplicates().to_string(index=False))
Output Example:
============================================================
Root Cause Analysis (RCA) System
============================================================
RCA knowledge base created: 24 triples
============================================================
【Scenario 1】P-101 Pump Anomaly
============================================================
Observed symptoms: ['Low Flow', 'High Vibration']
【Running RCA...】
【Root Cause Analysis Results】
1. Low Suction Pressure (Likelihood: 95.0%)
Related symptoms: Low Flow, High Vibration
Recommended action: Check NPSH, inspect suction piping
2. Bearing Wear (Likelihood: 75.0%)
Related symptoms: High Vibration
Recommended action: Replace bearing, refill lubricant
============================================================
【Scenario 2】P-102 Pump Anomaly
============================================================
Observed symptoms: ['Low Pressure', 'Liquid Leak Detection']
【Running RCA...】
【Root Cause Analysis Results】
1. Mechanical Seal Degradation (Likelihood: 100.0%)
Related symptoms: Low Pressure, Liquid Leak Detection
Recommended action: Replace mechanical seal
【All RCA Results Summary】
cause likelihood remediation
Low Suction Pressure 0.95 Check NPSH, inspect suction piping
Bearing Wear 0.75 Replace bearing, refill lubricant
Mechanical Seal Degradation 1.00 Replace mechanical seal
Explanation: From observed symptoms, reverse search causal relationships defined in the knowledge graph to infer root causes. Causes matching multiple symptoms get higher likelihood, identifying priority causes for investigation.
5.5 Equipment Recommendation System
Code Example 5: Knowledge-based Equipment Recommendation
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import pandas as pd
# Knowledge-based equipment recommendation system
PROC = Namespace("http://example.org/process#")
SPEC = Namespace("http://example.org/specification#")
def create_equipment_catalog_kb():
"""Create equipment catalog knowledge base"""
g = Graph()
g.bind("proc", PROC)
g.bind("spec", SPEC)
# Pump catalog
pumps = [
{"id": "PUMP_A100", "type": "CentrifugalPump", "flow_min": 40, "flow_max": 60,
"pressure_min": 400, "pressure_max": 600, "material": "SS316", "cost": 15000},
{"id": "PUMP_B200", "type": "CentrifugalPump", "flow_min": 80, "flow_max": 120,
"pressure_min": 800, "pressure_max": 1200, "material": "SS316L", "cost": 25000},
{"id": "PUMP_C300", "type": "PositiveDisplacementPump", "flow_min": 10, "flow_max": 30,
"pressure_min": 1000, "pressure_max": 2000, "material": "SS316L", "cost": 35000},
]
for pump in pumps:
p_uri = PROC[pump['id']]
g.add((p_uri, RDF.type, PROC[pump['type']]))
g.add((p_uri, SPEC.flowRateMin, Literal(pump['flow_min'])))
g.add((p_uri, SPEC.flowRateMax, Literal(pump['flow_max'])))
g.add((p_uri, SPEC.pressureMin, Literal(pump['pressure_min'])))
g.add((p_uri, SPEC.pressureMax, Literal(pump['pressure_max'])))
g.add((p_uri, SPEC.material, Literal(pump['material'])))
g.add((p_uri, SPEC.cost, Literal(pump['cost'])))
# Heat exchanger catalog
heat_exchangers = [
{"id": "HX_S100", "type": "ShellAndTubeHX", "duty_min": 200, "duty_max": 600,
"material": "SS316", "cost": 20000},
{"id": "HX_P200", "type": "PlateHX", "duty_min": 100, "duty_max": 400,
"material": "SS316", "cost": 12000},
]
for hx in heat_exchangers:
hx_uri = PROC[hx['id']]
g.add((hx_uri, RDF.type, PROC[hx['type']]))
g.add((hx_uri, SPEC.dutyMin, Literal(hx['duty_min'])))
g.add((hx_uri, SPEC.dutyMax, Literal(hx['duty_max'])))
g.add((hx_uri, SPEC.material, Literal(hx['material'])))
g.add((hx_uri, SPEC.cost, Literal(hx['cost'])))
return g
def recommend_equipment(g, requirements):
"""
Recommend equipment based on required specifications
Parameters:
g (Graph): Equipment catalog KG
requirements (dict): Required specifications
Returns:
list: List of recommended equipment
"""
recommendations = []
if requirements['type'] == 'Pump':
# Pump recommendation
query = prepareQuery("""
SELECT ?pump ?pumpType ?flowMin ?flowMax ?pressureMin ?pressureMax ?material ?cost
WHERE {
?pump a ?pumpType .
?pump spec:flowRateMin ?flowMin .
?pump spec:flowRateMax ?flowMax .
?pump spec:pressureMin ?pressureMin .
?pump spec:pressureMax ?pressureMax .
?pump spec:material ?material .
?pump spec:cost ?cost .
FILTER (strstarts(str(?pumpType), str(proc:)))
FILTER (?pumpType != proc:Equipment)
}
""", initNs={"proc": PROC, "spec": SPEC})
for row in g.query(query):
# Calculate specification matching score
flow_req = requirements['flow_rate']
pressure_req = requirements['pressure']
# Check flow range
if row.flowMin <= flow_req <= row.flowMax:
flow_score = 1.0
else:
# Penalty for out of range
flow_score = 0.5
# Check pressure range
if row.pressureMin <= pressure_req <= row.pressureMax:
pressure_score = 1.0
else:
pressure_score = 0.5
# Material matching
if requirements.get('material') == str(row.material):
material_score = 1.0
elif str(row.material) in ['SS316L']: # Higher grade
material_score = 0.8
else:
material_score = 0.6
# Total score
total_score = (flow_score * 0.4 + pressure_score * 0.4 + material_score * 0.2)
# Cost efficiency
cost_per_performance = float(row.cost) / total_score
recommendations.append({
'equipment_id': str(row.pump).split('#')[-1],
'type': str(row.pumpType).split('#')[-1],
'flow_range': f"{row.flowMin}-{row.flowMax} m³/h",
'pressure_range': f"{row.pressureMin}-{row.pressureMax} kPa",
'material': str(row.material),
'cost': float(row.cost),
'score': total_score,
'cost_efficiency': cost_per_performance
})
elif requirements['type'] == 'HeatExchanger':
# Heat exchanger recommendation
query = prepareQuery("""
SELECT ?hx ?hxType ?dutyMin ?dutyMax ?material ?cost
WHERE {
?hx a ?hxType .
?hx spec:dutyMin ?dutyMin .
?hx spec:dutyMax ?dutyMax .
?hx spec:material ?material .
?hx spec:cost ?cost .
FILTER (strstarts(str(?hxType), str(proc:)))
}
""", initNs={"proc": PROC, "spec": SPEC})
for row in g.query(query):
duty_req = requirements['duty']
if row.dutyMin <= duty_req <= row.dutyMax:
duty_score = 1.0
else:
duty_score = 0.5
total_score = duty_score
cost_per_performance = float(row.cost) / total_score
recommendations.append({
'equipment_id': str(row.hx).split('#')[-1],
'type': str(row.hxType).split('#')[-1],
'duty_range': f"{row.dutyMin}-{row.dutyMax} kW",
'material': str(row.material),
'cost': float(row.cost),
'score': total_score,
'cost_efficiency': cost_per_performance
})
# Sort by score
recommendations.sort(key=lambda x: x['score'], reverse=True)
return recommendations
# Run demo
print("="*60)
print("Knowledge-based Equipment Recommendation System")
print("="*60)
g = create_equipment_catalog_kb()
print(f"\nEquipment catalog KB created: {len(g)} triples")
# Scenario 1: Pump recommendation
print("\n" + "="*60)
print("【Scenario 1】Pump Recommendation")
print("="*60)
pump_requirements = {
'type': 'Pump',
'flow_rate': 50.0, # m³/h
'pressure': 500.0, # kPa
'material': 'SS316'
}
print(f"\nRequired specifications:")
print(f" Flow rate: {pump_requirements['flow_rate']} m³/h")
print(f" Pressure: {pump_requirements['pressure']} kPa")
print(f" Material: {pump_requirements['material']}")
print("\n【Running recommendation...】")
pump_recs = recommend_equipment(g, pump_requirements)
print(f"\n【Recommendation Results】(Top {min(3, len(pump_recs))})")
for i, rec in enumerate(pump_recs[:3], 1):
print(f"\n{i}. {rec['equipment_id']} ({rec['type']})")
print(f" Score: {rec['score']*100:.1f}%")
print(f" Flow range: {rec['flow_range']}")
print(f" Pressure range: {rec['pressure_range']}")
print(f" Material: {rec['material']}")
print(f" Price: ${rec['cost']:,}")
print(f" Cost efficiency: ${rec['cost_efficiency']:,.0f}/score")
# Scenario 2: Heat exchanger recommendation
print("\n" + "="*60)
print("【Scenario 2】Heat Exchanger Recommendation")
print("="*60)
hx_requirements = {
'type': 'HeatExchanger',
'duty': 300.0, # kW
'material': 'SS316'
}
print(f"\nRequired specifications:")
print(f" Heat duty: {hx_requirements['duty']} kW")
print(f" Material: {hx_requirements['material']}")
print("\n【Running recommendation...】")
hx_recs = recommend_equipment(g, hx_requirements)
print(f"\n【Recommendation Results】(All {len(hx_recs)} items)")
for i, rec in enumerate(hx_recs, 1):
print(f"\n{i}. {rec['equipment_id']} ({rec['type']})")
print(f" Score: {rec['score']*100:.1f}%")
print(f" Duty range: {rec['duty_range']}")
print(f" Material: {rec['material']}")
print(f" Price: ${rec['cost']:,}")
# Convert recommendation results to DataFrame
df_pumps = pd.DataFrame(pump_recs)
print("\n【Pump Recommendation Summary】")
print(df_pumps[['equipment_id', 'score', 'cost', 'cost_efficiency']].to_string(index=False))
Output Example:
============================================================
Knowledge-based Equipment Recommendation System
============================================================
Equipment catalog KB created: 35 triples
============================================================
【Scenario 1】Pump Recommendation
============================================================
Required specifications:
Flow rate: 50.0 m³/h
Pressure: 500.0 kPa
Material: SS316
【Running recommendation...】
【Recommendation Results】(Top 3)
1. PUMP_A100 (CentrifugalPump)
Score: 100.0%
Flow range: 40-60 m³/h
Pressure range: 400-600 kPa
Material: SS316
Price: $15,000
Cost efficiency: $15,000/score
2. PUMP_B200 (CentrifugalPump)
Score: 80.0%
Flow range: 80-120 m³/h
Pressure range: 800-1200 kPa
Material: SS316L
Price: $25,000
Cost efficiency: $31,250/score
3. PUMP_C300 (PositiveDisplacementPump)
Score: 80.0%
Flow range: 10-30 m³/h
Pressure range: 1000-2000 kPa
Material: SS316L
Price: $35,000
Cost efficiency: $43,750/score
============================================================
【Scenario 2】Heat Exchanger Recommendation
============================================================
Required specifications:
Heat duty: 300.0 kW
Material: SS316
【Running recommendation...】
【Recommendation Results】(All 2 items)
1. HX_S100 (ShellAndTubeHX)
Score: 100.0%
Duty range: 200-600 kW
Material: SS316
Price: $20,000
2. HX_P200 (PlateHX)
Score: 100.0%
Duty range: 100-400 kW
Material: SS316
Price: $12,000
【Pump Recommendation Summary】
equipment_id score cost cost_efficiency
PUMP_A100 1.0 15000.0 15000.0
PUMP_B200 0.8 25000.0 31250.0
PUMP_C300 0.8 35000.0 43750.0
Explanation: Convert equipment catalogs into knowledge graphs, calculate matching scores with required specifications, and recommend optimal equipment. Rank based on flow rate, pressure, material conditions, score, and cost efficiency.
5.6 Application to Process Optimization
Code Example 6: Process Optimization Using Knowledge Graphs
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
from scipy.optimize import minimize
import numpy as np
import pandas as pd
# Process optimization using knowledge graphs
PROC = Namespace("http://example.org/process#")
OPT = Namespace("http://example.org/optimization#")
def create_optimization_kb():
"""Create knowledge base for optimization"""
g = Graph()
g.bind("proc", PROC)
g.bind("opt", OPT)
# Process unit: Reactor
g.add((PROC.R201, RDF.type, PROC.Reactor))
g.add((PROC.R201, RDFS.label, Literal("Catalytic Reactor R-201")))
# Decision Variables
g.add((PROC.R201, OPT.hasDecisionVariable, OPT.Temperature))
g.add((OPT.Temperature, RDFS.label, Literal("Reaction Temperature")))
g.add((OPT.Temperature, OPT.lowerBound, Literal(150.0)))
g.add((OPT.Temperature, OPT.upperBound, Literal(200.0)))
g.add((OPT.Temperature, OPT.currentValue, Literal(175.0)))
g.add((PROC.R201, OPT.hasDecisionVariable, OPT.Pressure))
g.add((OPT.Pressure, RDFS.label, Literal("Reaction Pressure")))
g.add((OPT.Pressure, OPT.lowerBound, Literal(600.0)))
g.add((OPT.Pressure, OPT.upperBound, Literal(1000.0)))
g.add((OPT.Pressure, OPT.currentValue, Literal(800.0)))
# Constraints
g.add((PROC.R201, OPT.hasConstraint, OPT.SafetyConstraint1))
g.add((OPT.SafetyConstraint1, RDFS.label, Literal("Temperature-Pressure Safety Constraint")))
g.add((OPT.SafetyConstraint1, OPT.expression, Literal("T + 0.1*P <= 250")))
# Objective function (Profit maximization)
g.add((PROC.R201, OPT.hasObjective, OPT.ProfitMaximization))
g.add((OPT.ProfitMaximization, RDFS.label, Literal("Profit Maximization")))
g.add((OPT.ProfitMaximization, OPT.expression,
Literal("Profit = Yield * ProductPrice - OperatingCost")))
# Process model parameters
g.add((PROC.R201, OPT.productPrice, Literal(100.0))) # $/kg
g.add((PROC.R201, OPT.feedCost, Literal(50.0))) # $/kg
g.add((PROC.R201, OPT.energyCostFactor, Literal(0.05))) # $/(°C*kPa)
return g
def extract_optimization_problem(g):
"""
Extract optimization problem from knowledge graph
Returns:
dict: Optimization problem definition
"""
problem = {
'decision_variables': [],
'bounds': [],
'constraints': [],
'parameters': {}
}
# Extract decision variables
query_vars = prepareQuery("""
SELECT ?var ?label ?lb ?ub ?current
WHERE {
?reactor opt:hasDecisionVariable ?var .
?var rdfs:label ?label .
?var opt:lowerBound ?lb .
?var opt:upperBound ?ub .
?var opt:currentValue ?current .
}
""", initNs={"opt": OPT, "rdfs": RDFS})
for row in g.query(query_vars):
var_name = str(row.label)
problem['decision_variables'].append(var_name)
problem['bounds'].append((float(row.lb), float(row.ub)))
# Extract parameters
query_params = prepareQuery("""
SELECT ?property ?value
WHERE {
proc:R201 ?property ?value .
FILTER (strstarts(str(?property), str(opt:)))
}
""", initNs={"proc": PROC, "opt": OPT})
for row in g.query(query_params):
prop_name = str(row.property).split('#')[-1]
if prop_name not in ['hasDecisionVariable', 'hasConstraint', 'hasObjective']:
problem['parameters'][prop_name] = float(row.value)
return problem
def process_model(x, params):
"""
Process model (simplified version)
Parameters:
x (array): [Temperature, Pressure]
params (dict): Model parameters
Returns:
dict: Calculation results
"""
T, P = x # Temperature[°C], Pressure[kPa]
# Reaction rate model (simplified Arrhenius equation)
k = 0.01 * np.exp(0.05 * (T - 150))
# Yield (function of reaction rate and pressure)
yield_fraction = k * (P / 1000) / (1 + k * (P / 1000))
# Product yield [kg/h]
feed_rate = 100.0 # kg/h
product_rate = feed_rate * yield_fraction
# Profit calculation
product_value = product_rate * params['productPrice']
feed_cost = feed_rate * params['feedCost']
energy_cost = params['energyCostFactor'] * T * P / 1000
profit = product_value - feed_cost - energy_cost
return {
'yield': yield_fraction,
'product_rate': product_rate,
'profit': profit,
'energy_cost': energy_cost
}
def objective_function(x, params):
"""Objective function (convert maximization to minimization)"""
result = process_model(x, params)
return -result['profit'] # Maximization → Minimization
def safety_constraint(x):
"""Safety constraint: T + 0.1*P <= 250"""
T, P = x
return 250 - (T + 0.1 * P) # Must satisfy >= 0
# Run demo
print("="*60)
print("Knowledge Graph-based Process Optimization")
print("="*60)
g = create_optimization_kb()
print(f"\nOptimization knowledge base created: {len(g)} triples")
# Extract optimization problem
opt_problem = extract_optimization_problem(g)
print("\n【Optimization Problem Definition】")
print(f" Decision variables: {opt_problem['decision_variables']}")
print(f" Variable ranges: {opt_problem['bounds']}")
print(f" Parameters: {opt_problem['parameters']}")
# Evaluate current operating point
x_current = np.array([175.0, 800.0])
current_result = process_model(x_current, opt_problem['parameters'])
print("\n【Current Operating Point】")
print(f" Temperature: {x_current[0]} °C")
print(f" Pressure: {x_current[1]} kPa")
print(f" Yield: {current_result['yield']*100:.2f}%")
print(f" Product production rate: {current_result['product_rate']:.2f} kg/h")
print(f" Profit: ${current_result['profit']:.2f}/h")
# Run optimization
print("\n【Running optimization...】")
constraint = {'type': 'ineq', 'fun': safety_constraint}
result = minimize(
objective_function,
x_current,
args=(opt_problem['parameters'],),
method='SLSQP',
bounds=opt_problem['bounds'],
constraints=[constraint]
)
if result.success:
x_optimal = result.x
optimal_result = process_model(x_optimal, opt_problem['parameters'])
print("\n【Optimization Results】")
print(f" Optimal temperature: {x_optimal[0]:.2f} °C")
print(f" Optimal pressure: {x_optimal[1]:.2f} kPa")
print(f" Optimal yield: {optimal_result['yield']*100:.2f}%")
print(f" Optimal product production rate: {optimal_result['product_rate']:.2f} kg/h")
print(f" Optimal profit: ${optimal_result['profit']:.2f}/h")
print("\n【Improvement Effect】")
profit_improvement = optimal_result['profit'] - current_result['profit']
print(f" Profit improvement: ${profit_improvement:.2f}/h ({profit_improvement/current_result['profit']*100:.2f}% increase)")
print(f" Annual profit improvement: ${profit_improvement * 8760:.0f}")
# Constraint check
constraint_value = safety_constraint(x_optimal)
print(f"\n【Constraint Satisfaction Verification】")
print(f" Safety constraint (T + 0.1*P <= 250): {x_optimal[0] + 0.1*x_optimal[1]:.2f} <= 250")
print(f" Margin: {constraint_value:.2f} °C")
else:
print(f"\nOptimization failed: {result.message}")
# Comparison table
comparison_df = pd.DataFrame({
'Item': ['Temperature[°C]', 'Pressure[kPa]', 'Yield[%]', 'Production rate[kg/h]', 'Profit[$/h]'],
'Current': [x_current[0], x_current[1], current_result['yield']*100,
current_result['product_rate'], current_result['profit']],
'Optimal': [x_optimal[0], x_optimal[1], optimal_result['yield']*100,
optimal_result['product_rate'], optimal_result['profit']]
})
print("\n【Operating Condition Comparison Table】")
print(comparison_df.to_string(index=False))
Output Example:
============================================================
Knowledge Graph-based Process Optimization
============================================================
Optimization knowledge base created: 18 triples
【Optimization Problem Definition】
Decision variables: ['Reaction Temperature', 'Reaction Pressure']
Variable ranges: [(150.0, 200.0), (600.0, 1000.0)]
Parameters: {'productPrice': 100.0, 'feedCost': 50.0, 'energyCostFactor': 0.05}
【Current Operating Point】
Temperature: 175.0 °C
Pressure: 800.0 kPa
Yield: 56.78%
Product production rate: 56.78 kg/h
Profit: $606.80/h
【Running optimization...】
【Optimization Results】
Optimal temperature: 200.00 °C
Optimal pressure: 1000.00 kPa
Optimal yield: 74.37%
Optimal product production rate: 74.37 kg/h
Optimal profit: $826.34/h
【Improvement Effect】
Profit improvement: $219.54/h (36.18% increase)
Annual profit improvement: $1,923,210
【Constraint Satisfaction Verification】
Safety constraint (T + 0.1*P <= 250): 250.00 <= 250
Margin: 0.00 °C
【Operating Condition Comparison Table】
Item Current Optimal
Temperature[°C] 175.00 200.00
Pressure[kPa] 800.00 1000.00
Yield[%] 56.78 74.37
Production rate[kg/h] 56.78 74.37
Profit[$/h] 606.80 826.34
Explanation: Extract decision variables, constraints, and objective functions from the knowledge graph and execute optimization with scipy.optimize. Centrally managing process models and optimization problem definitions in the knowledge graph improves maintainability and extensibility.
5.7 Complete Integrated System
Code Example 7: Integrated System of API + Reasoning + Visualization
# Requirements:
# - Python 3.9+
# - flask>=2.3.0
# - networkx>=3.1.0
# - plotly>=5.14.0
"""
Example: Code Example 7: Integrated System of API + Reasoning + Visua
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
from flask import Flask, request, jsonify, render_template_string
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import networkx as nx
import plotly.graph_objects as go
import json
# Complete integrated system: API + Reasoning + Visualization
app = Flask(__name__)
PROC = Namespace("http://example.org/process#")
DIAG = Namespace("http://example.org/diagnosis#")
# Global knowledge graph
g = Graph()
g.bind("proc", PROC)
g.bind("diag", DIAG)
def initialize_integrated_kb():
"""Initialize integrated knowledge base"""
# Process equipment
equipment = [
{"id": "P101", "type": "Pump", "flow": 50.0, "pressure": 500.0, "vibration": 2.5, "status": "Normal"},
{"id": "P102", "type": "Pump", "flow": 30.0, "pressure": 450.0, "vibration": 8.5, "status": "Warning"},
{"id": "R201", "type": "Reactor", "temp": 180.0, "pressure": 800.0, "status": "Normal"},
]
for eq in equipment:
eq_uri = PROC[eq['id']]
g.add((eq_uri, RDF.type, PROC[eq['type']]))
g.add((eq_uri, PROC.hasStatus, Literal(eq['status'])))
if 'flow' in eq:
g.add((eq_uri, PROC.hasFlowRate, Literal(eq['flow'])))
if 'pressure' in eq:
g.add((eq_uri, PROC.hasPressure, Literal(eq['pressure'])))
if 'vibration' in eq:
g.add((eq_uri, PROC.hasVibration, Literal(eq['vibration'])))
if 'temp' in eq:
g.add((eq_uri, PROC.hasTemperature, Literal(eq['temp'])))
# Process flow
g.add((PROC.P101, PROC.flowsTo, PROC.R201))
g.add((PROC.P102, PROC.flowsTo, PROC.R201))
# Fault diagnosis rules
g.add((DIAG.CavitationRule, RDF.type, DIAG.DiagnosticRule))
g.add((DIAG.CavitationRule, DIAG.condition, Literal("flow < 40 AND vibration > 5")))
g.add((DIAG.CavitationRule, DIAG.diagnosis, Literal("Suspected cavitation")))
g.add((DIAG.CavitationRule, DIAG.action, Literal("Check NPSH")))
print(f"Integrated knowledge base initialized: {len(g)} triples")
# === API Endpoints ===
@app.route('/')
def index():
"""Integrated dashboard HTML"""
html = """
<!DOCTYPE html>
<title>Process Knowledge Graph Integrated System</title>
<link href="../../assets/css/knowledge-base.css" rel="stylesheet"/>
<h1>🏭 Process Knowledge Graph Integrated System</h1>
<div class="section">
<h2>📊 System Overview</h2>
<p>Integrated Features: SPARQL API + Reasoning Engine + Visualization</p>
<button onclick="getStats()">Get Statistics</button>
<pre id="stats"></pre>
</div>
<div class="section">
<h2>🔍 Equipment List</h2>
<button onclick="getEquipment()">Get Equipment List</button>
<pre id="equipment"></pre>
</div>
<div class="section">
<h2>⚡ Reasoning Engine</h2>
<button onclick="runInference()">Run Anomaly Detection Reasoning</button>
<pre id="inference"></pre>
</div>
<div class="section">
<h2>🌐 Process Flow Graph</h2>
<button onclick="getGraph()">Get Graph</button>
<pre id="graph"></pre>
</div>
<div class="section">
<h2>🔧 Custom SPARQL Query</h2>
<textarea id="sparql" rows="5" style="width: 100%; font-family: monospace;">
SELECT ?equipment ?status
WHERE {
?equipment proc:hasStatus ?status .
}
</textarea>
<button onclick="runSparql()">Execute Query</button>
<pre id="sparql-result"></pre>
</div>
<script>
const API_BASE = '';
async function getStats() {
const res = await fetch(API_BASE + '/api/statistics');
const data = await res.json();
document.getElementById('stats').textContent = JSON.stringify(data, null, 2);
}
async function getEquipment() {
const res = await fetch(API_BASE + '/api/equipment');
const data = await res.json();
document.getElementById('equipment').textContent = JSON.stringify(data, null, 2);
}
async function runInference() {
const res = await fetch(API_BASE + '/api/inference', { method: 'POST' });
const data = await res.json();
document.getElementById('inference').textContent = JSON.stringify(data, null, 2);
}
async function getGraph() {
const res = await fetch(API_BASE + '/api/graph');
const data = await res.json();
document.getElementById('graph').textContent = JSON.stringify(data, null, 2);
}
async function runSparql() {
const query = document.getElementById('sparql').value;
const res = await fetch(API_BASE + '/api/sparql', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query })
});
const data = await res.json();
document.getElementById('sparql-result').textContent = JSON.stringify(data, null, 2);
}
</script>
"""
return render_template_string(html)
@app.route('/api/statistics', methods=['GET'])
def get_statistics():
"""Statistics API"""
stats = {
'total_triples': len(g),
'equipment_count': len(list(g.subjects(RDF.type, PROC.Pump))) + len(list(g.subjects(RDF.type, PROC.Reactor))),
'connection_count': len(list(g.subject_objects(PROC.flowsTo)))
}
return jsonify({"status": "success", "statistics": stats})
@app.route('/api/equipment', methods=['GET'])
def get_equipment():
"""Equipment list API"""
query = prepareQuery("""
SELECT ?equipment ?type ?status
WHERE {
?equipment a ?type .
?equipment proc:hasStatus ?status .
FILTER (?type IN (proc:Pump, proc:Reactor))
}
""", initNs={"proc": PROC})
results = []
for row in g.query(query):
results.append({
"id": str(row.equipment).split('#')[-1],
"type": str(row.type).split('#')[-1],
"status": str(row.status)
})
return jsonify({"status": "success", "equipment": results})
@app.route('/api/inference', methods=['POST'])
def run_inference():
"""Reasoning engine API (anomaly detection)"""
anomalies = []
# Rule: Low flow + High vibration → Cavitation
query = prepareQuery("""
SELECT ?equipment ?flow ?vibration
WHERE {
?equipment proc:hasFlowRate ?flow .
?equipment proc:hasVibration ?vibration .
FILTER (?flow < 40 && ?vibration > 5)
}
""", initNs={"proc": PROC})
for row in g.query(query):
anomalies.append({
"equipment": str(row.equipment).split('#')[-1],
"diagnosis": "Suspected cavitation",
"symptoms": f"Low flow({row.flow} m³/h), High vibration({row.vibration} mm/s)",
"action": "Check NPSH, inspect suction piping"
})
return jsonify({"status": "success", "anomalies": anomalies})
@app.route('/api/graph', methods=['GET'])
def get_graph():
"""Process flow graph API"""
nx_graph = nx.DiGraph()
# Add nodes
for eq_uri in g.subjects(RDF.type, None):
eq_id = str(eq_uri).split('#')[-1]
if eq_id in ['P101', 'P102', 'R201']:
nx_graph.add_node(eq_id)
# Add edges
for s, p, o in g.triples((None, PROC.flowsTo, None)):
source = str(s).split('#')[-1]
target = str(o).split('#')[-1]
nx_graph.add_edge(source, target)
# Return in JSON format
graph_data = {
"nodes": [{"id": n} for n in nx_graph.nodes()],
"edges": [{"source": u, "target": v} for u, v in nx_graph.edges()]
}
return jsonify({"status": "success", "graph": graph_data})
@app.route('/api/sparql', methods=['POST'])
def sparql_endpoint():
"""SPARQL endpoint"""
data = request.get_json()
query_str = data.get('query', '')
try:
query = prepareQuery(query_str, initNs={"proc": PROC, "diag": DIAG, "rdf": RDF, "rdfs": RDFS})
results = g.query(query)
response = []
for row in results:
row_dict = {}
for var in results.vars:
value = row[var]
row_dict[str(var)] = str(value).split('#')[-1] if '#' in str(value) else str(value)
response.append(row_dict)
return jsonify({"status": "success", "count": len(response), "results": response})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 400
# Run demo
def demo_integrated_system():
"""Integrated system demo"""
print("="*60)
print("Complete Integrated System: API + Reasoning + Visualization")
print("="*60)
initialize_integrated_kb()
print("\n【Integrated System Features】")
print(" 1. RESTful API - Equipment information, statistics, graph data")
print(" 2. Reasoning Engine - Anomaly detection, fault diagnosis")
print(" 3. SPARQL Endpoint - Custom queries")
print(" 4. Web Dashboard - Integrated UI")
print("\n【How to Launch】")
print(" python integrated_system.py")
print(" or: flask run --port 5000")
print("\n【Access】")
print(" Open http://localhost:5000 in browser")
print("\n【API Endpoints List】")
print(" GET / - Dashboard")
print(" GET /api/statistics - Statistics information")
print(" GET /api/equipment - Equipment list")
print(" POST /api/inference - Anomaly detection reasoning")
print(" GET /api/graph - Process graph")
print(" POST /api/sparql - Custom query")
# Run demo
demo_integrated_system()
# Flask application startup
# if __name__ == '__main__':
# initialize_integrated_kb()
# app.run(debug=True, port=5000)
Output Example:
============================================================
Complete Integrated System: API + Reasoning + Visualization
============================================================
Integrated knowledge base initialized: 17 triples
【Integrated System Features】
1. RESTful API - Equipment information, statistics, graph data
2. Reasoning Engine - Anomaly detection, fault diagnosis
3. SPARQL Endpoint - Custom queries
4. Web Dashboard - Integrated UI
【How to Launch】
python integrated_system.py
or: flask run --port 5000
【Access】
Open http://localhost:5000 in browser
【API Endpoints List】
GET / - Dashboard
GET /api/statistics - Statistics information
GET /api/equipment - Equipment list
POST /api/inference - Anomaly detection reasoning
GET /api/graph - Process graph
POST /api/sparql - Custom query
Explanation: Build a web application with Flask integrating knowledge graph API, reasoning engine, SPARQL endpoint, and interactive dashboard. Access all features from the browser, operating as an end-to-end process knowledge management system.
5.8 Chapter Summary
What We Learned
- SPARQL Endpoint API
- Build RESTful API with Flask
- Execute arbitrary queries with POST /api/sparql
- Retrieve equipment information with GET /api/equipment
- JSON format responses
- Knowledge Graph Visualization
- Convert RDF graphs to NetworkX graphs
- Interactive visualization with Plotly
- Display node and edge attributes
- Visual understanding of process flows
- Automated Document Generation
- Utilize Jinja2 template engine
- Data extraction with SPARQL
- Generate Markdown/HTML documents
- Auto-update equipment lists and operating conditions
- Root Cause Analysis (RCA)
- Reverse search causal relationships from symptoms
- Likelihood scoring
- Present recommended actions
- Integrated evaluation of multiple symptoms
- Equipment Recommendation System
- Match requirements with catalog specifications
- Ranking by scoring function
- Cost efficiency evaluation
- Multi-criteria decision support
- Process Optimization
- Extract optimization problems from knowledge graph
- Integration with scipy.optimize
- Automatic application of constraints
- Derive optimal operating conditions
- Complete Integrated System
- Flask web application
- Integration of API + reasoning + visualization
- Interactive dashboard
- End-to-end knowledge management
Key Points
SPARQL endpoints enable external systems to access knowledge graphs, while visualization provides intuitive understanding of complex process flows. Template engines auto-generate documents maintaining up-to-date status. RCA systems systematize fault diagnosis enabling rapid response, and recommendation systems support optimal equipment selection to streamline decision-making. Managing optimization problems in knowledge graphs improves maintainability. Integrated systems position knowledge graphs at the core of process operations, and API-first design supports microservices architecture.
Toward Practical Implementation
The process ontology and knowledge graph technologies learned in this series can be applied to actual processes. Digital Twins represent physical process knowledge in knowledge graphs. Smart Plants integrate IoT sensor data with knowledge graphs. Process Safety Management enables knowledge-based HAZOP and LOPA analysis. Equipment Maintenance Optimization systematizes failure history and maintenance plans. Energy Management optimizes process-wide energy flows. Knowledge Transfer formalizes veteran knowledge as ontologies. Regulatory Compliance provides knowledge-based management of regulatory requirements.
Further Learning Resources
For continued learning, explore Semantic Web Technologies through W3C RDF/OWL/SPARQL specifications. Ontology Engineering can be practiced with tools like Protege and TopBraid. Knowledge Graph Databases such as Neo4j, Apache Jena, and Stardog offer production-ready solutions. Process Engineering standards including CAPE-OPEN and OPC UA information models provide industry context. Implementation Frameworks including rdflib, owlready2, and SPARQLWrapper support Python-based development.