🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 5: Implementation and Integration Applications

SPARQL API, Visualization, Automated Document Generation, Recommendation Systems, Complete Integrated Systems

📖 Reading time: 40-45 min 📊 Difficulty: Advanced 💻 Code examples: 7

This chapter focuses on practical applications of Implementation and Integration Applications. You will learn Build SPARQL endpoint APIs with Flask/FastAPI, Visualize knowledge graphs using NetworkX, and Root Cause Analysis (RCA) applications.

Learning Objectives

By reading this chapter, you will be able to:


5.1 SPARQL Endpoint API

Code Example 1: Flask-based SPARQL Query API

# Requirements:
# - Python 3.9+
# - flask>=2.3.0

"""
Example: Code Example 1: Flask-based SPARQL Query API

Purpose: Demonstrate core concepts and implementation patterns
Target: Intermediate
Execution time: 10-30 seconds
Dependencies: None
"""

from flask import Flask, request, jsonify
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import json

# Flask SPARQL endpoint API implementation

app = Flask(__name__)

# Global knowledge graph
PROC = Namespace("http://example.org/process#")
g = Graph()
g.bind("proc", PROC)

# Initialize sample data
def init_knowledge_graph():
    """Initialize the knowledge graph"""
    # Equipment data
    equipment_data = [
        {"id": "P101", "type": "Pump", "flow": 50.0, "pressure": 500.0, "status": "Running"},
        {"id": "P102", "type": "Pump", "flow": 45.0, "pressure": 480.0, "status": "Running"},
        {"id": "R201", "type": "Reactor", "temp": 180.0, "pressure": 800.0, "status": "Running"},
        {"id": "T301", "type": "Tank", "level": 75.0, "temp": 25.0, "status": "Running"},
    ]

    for eq in equipment_data:
        eq_uri = PROC[eq['id']]
        g.add((eq_uri, RDF.type, PROC[eq['type']]))
        g.add((eq_uri, PROC.hasStatus, Literal(eq['status'])))

        if 'flow' in eq:
            g.add((eq_uri, PROC.hasFlowRate, Literal(eq['flow'])))
        if 'pressure' in eq:
            g.add((eq_uri, PROC.hasPressure, Literal(eq['pressure'])))
        if 'temp' in eq:
            g.add((eq_uri, PROC.hasTemperature, Literal(eq['temp'])))
        if 'level' in eq:
            g.add((eq_uri, PROC.hasLevel, Literal(eq['level'])))

    print(f"Knowledge graph initialization complete: {len(g)} triples")


# API endpoints

@app.route('/api/sparql', methods=['POST'])
def sparql_endpoint():
    """
    SPARQL query endpoint

    POST /api/sparql
    Body: {"query": "SELECT ..."}

    Returns:
        JSON: Query results
    """
    try:
        data = request.get_json()
        query_str = data.get('query', '')

        if not query_str:
            return jsonify({"error": "Query is empty"}), 400

        # Execute SPARQL query
        query = prepareQuery(query_str, initNs={"proc": PROC, "rdf": RDF, "rdfs": RDFS})
        results = g.query(query)

        # Convert results to JSON format
        response = []
        for row in results:
            row_dict = {}
            for var in results.vars:
                value = row[var]
                # Shorten URIs
                if hasattr(value, 'toPython'):
                    row_dict[str(var)] = str(value).split('#')[-1]
                else:
                    row_dict[str(var)] = str(value)
            response.append(row_dict)

        return jsonify({
            "status": "success",
            "count": len(response),
            "results": response
        }), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route('/api/equipment', methods=['GET'])
def get_all_equipment():
    """
    Get all equipment list

    GET /api/equipment

    Returns:
        JSON: Equipment list
    """
    query = prepareQuery("""
        SELECT ?equipment ?type ?status
        WHERE {
            ?equipment a ?type .
            ?equipment proc:hasStatus ?status .
            FILTER (strstarts(str(?type), str(proc:)))
        }
    """, initNs={"proc": PROC})

    results = g.query(query)
    equipment_list = []

    for row in results:
        equipment_list.append({
            "id": str(row.equipment).split('#')[-1],
            "type": str(row.type).split('#')[-1],
            "status": str(row.status)
        })

    return jsonify({
        "status": "success",
        "count": len(equipment_list),
        "equipment": equipment_list
    }), 200


@app.route('/api/equipment/<eq_id>', methods=['GET'])
def get_equipment_details(eq_id):
    """
    Get specific equipment details

    GET /api/equipment/{eq_id}

    Returns:
        JSON: Equipment details
    """
    eq_uri = PROC[eq_id]

    # Get all properties of the equipment
    query = prepareQuery("""
        SELECT ?property ?value
        WHERE {
            ?equipment ?property ?value .
        }
    """, initNs={"proc": PROC})

    results = g.query(query, initBindings={'equipment': eq_uri})

    if len(list(results)) == 0:
        return jsonify({"error": f"Equipment {eq_id} not found"}), 404

    # Re-query (iterator consumed)
    results = g.query(query, initBindings={'equipment': eq_uri})

    details = {"id": eq_id}
    for row in results:
        prop = str(row.property).split('#')[-1]
        value = row.value.toPython() if hasattr(row.value, 'toPython') else str(row.value)
        details[prop] = value

    return jsonify({
        "status": "success",
        "equipment": details
    }), 200


@app.route('/api/statistics', methods=['GET'])
def get_statistics():
    """
    Get statistics information

    GET /api/statistics

    Returns:
        JSON: Knowledge graph statistics
    """
    stats = {
        "total_triples": len(g),
        "total_equipment": len(list(g.subjects(RDF.type, None))),
        "equipment_by_type": {}
    }

    # Count by equipment type
    query = prepareQuery("""
        SELECT ?type (COUNT(?equipment) as ?count)
        WHERE {
            ?equipment a ?type .
            FILTER (strstarts(str(?type), str(proc:)))
        }
        GROUP BY ?type
    """, initNs={"proc": PROC})

    for row in g.query(query):
        type_name = str(row.type).split('#')[-1]
        stats['equipment_by_type'][type_name] = int(row['count'])

    return jsonify({
        "status": "success",
        "statistics": stats
    }), 200


# Demo execution (normally started with if __name__ == '__main__')
def demo_api():
    """API demonstration"""
    print("="*60)
    print("SPARQL Endpoint API")
    print("="*60)

    init_knowledge_graph()

    print("\n【API Endpoints】")
    print("  POST /api/sparql - Execute SPARQL query")
    print("  GET  /api/equipment - All equipment list")
    print("  GET  /api/equipment/{id} - Equipment details")
    print("  GET  /api/statistics - Statistics information")

    print("\n【Sample Query】")
    sample_query = """
    SELECT ?equipment ?pressure
    WHERE {
        ?equipment proc:hasPressure ?pressure .
        FILTER (?pressure > 600)
    }
    """
    print(f"  {sample_query.strip()}")

    # Execute query (directly without API)
    query = prepareQuery(sample_query, initNs={"proc": PROC})
    results = g.query(query)

    print("\n【Query Results】")
    for row in results:
        eq_id = str(row.equipment).split('#')[-1]
        print(f"  {eq_id}: Pressure {row.pressure} kPa")

    print("\n【Flask Launch Command】")
    print("  python app.py")
    print("  or: flask run --port 5000")


# Run demo
demo_api()

# Flask application startup (for actual use)
# if __name__ == '__main__':
#     init_knowledge_graph()
#     app.run(debug=True, port=5000)
</eq_id>

Output Example:

============================================================
SPARQL Endpoint API
============================================================
Knowledge graph initialization complete: 16 triples

【API Endpoints】
  POST /api/sparql - Execute SPARQL query
  GET  /api/equipment - All equipment list
  GET  /api/equipment/{id} - Equipment details
  GET  /api/statistics - Statistics information

【Sample Query】
  SELECT ?equipment ?pressure
    WHERE {
        ?equipment proc:hasPressure ?pressure .
        FILTER (?pressure > 600)
    }

【Query Results】
  R201: Pressure 800.0 kPa

【Flask Launch Command】
  python app.py
  or: flask run --port 5000

Explanation: Build a SPARQL endpoint API with Flask to access the knowledge graph via RESTful API. Execute any SPARQL query with POST /api/sparql, and retrieve equipment information and statistics with other endpoints.


5.2 Knowledge Graph Visualization

Code Example 2: Graph Visualization with NetworkX and Plotly

# Requirements:
# - Python 3.9+
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
# - plotly>=5.14.0

import networkx as nx
import plotly.graph_objects as go
from rdflib import Graph, Namespace, RDF, RDFS, Literal
import numpy as np

# Knowledge graph visualization

PROC = Namespace("http://example.org/process#")

def create_process_graph():
    """Create process knowledge graph"""
    g = Graph()
    g.bind("proc", PROC)

    # Process flow definition
    # Feed → P-101 → R-201 → T-301 → Product
    g.add((PROC.Feed, RDF.type, PROC.Stream))
    g.add((PROC.P101, RDF.type, PROC.Pump))
    g.add((PROC.R201, RDF.type, PROC.Reactor))
    g.add((PROC.T301, RDF.type, PROC.Tank))
    g.add((PROC.Product, RDF.type, PROC.Stream))

    # Connection relationships
    g.add((PROC.Feed, PROC.flowsTo, PROC.P101))
    g.add((PROC.P101, PROC.flowsTo, PROC.R201))
    g.add((PROC.R201, PROC.flowsTo, PROC.T301))
    g.add((PROC.T301, PROC.flowsTo, PROC.Product))

    # Properties
    g.add((PROC.P101, PROC.hasFlowRate, Literal(50.0)))
    g.add((PROC.P101, PROC.hasPressure, Literal(500.0)))
    g.add((PROC.R201, PROC.hasTemperature, Literal(180.0)))
    g.add((PROC.T301, PROC.hasLevel, Literal(75.0)))

    return g


def rdf_to_networkx(rdf_graph):
    """
    Convert RDF graph to NetworkX graph

    Parameters:
        rdf_graph (Graph): RDF graph

    Returns:
        nx.DiGraph: NetworkX directed graph
    """
    nx_graph = nx.DiGraph()

    # Add nodes and edges
    for s, p, o in rdf_graph:
        subj = str(s).split('#')[-1]
        pred = str(p).split('#')[-1]
        obj = str(o).split('#')[-1] if '#' in str(o) else str(o)

        # Get node type
        if p == RDF.type:
            nx_graph.add_node(subj, node_type=obj)
        elif p == PROC.flowsTo:
            nx_graph.add_edge(subj, obj, relation='flowsTo')
        else:
            # Add property as node attribute
            if subj in nx_graph:
                nx_graph.nodes[subj][pred] = obj
            else:
                nx_graph.add_node(subj, **{pred: obj})

    return nx_graph


def visualize_with_plotly(nx_graph):
    """
    Visualize NetworkX graph with Plotly

    Parameters:
        nx_graph (nx.DiGraph): NetworkX graph
    """
    # Calculate layout (hierarchical layout)
    pos = nx.spring_layout(nx_graph, seed=42, k=1.5, iterations=50)

    # Edge trace
    edge_trace = go.Scatter(
        x=[], y=[],
        line=dict(width=2, color='#888'),
        hoverinfo='none',
        mode='lines',
        showlegend=False
    )

    # Annotations for edge arrows
    annotations = []

    for edge in nx_graph.edges():
        x0, y0 = pos[edge[0]]
        x1, y1 = pos[edge[1]]
        edge_trace['x'] += tuple([x0, x1, None])
        edge_trace['y'] += tuple([y0, y1, None])

        # Arrow annotation
        annotations.append(
            dict(
                ax=x0, ay=y0,
                x=x1, y=y1,
                xref='x', yref='y',
                axref='x', ayref='y',
                showarrow=True,
                arrowhead=2,
                arrowsize=1.5,
                arrowwidth=2,
                arrowcolor='#888'
            )
        )

    # Node trace
    node_trace = go.Scatter(
        x=[], y=[],
        mode='markers+text',
        hoverinfo='text',
        text=[],
        textposition='top center',
        marker=dict(
            showscale=True,
            colorscale='Greens',
            size=[],
            color=[],
            colorbar=dict(
                thickness=15,
                title='Node Type',
                xanchor='left',
                titleside='right'
            ),
            line=dict(width=2, color='white')
        ),
        showlegend=False
    )

    # Add node information
    node_colors = {'Stream': 0, 'Pump': 1, 'Reactor': 2, 'Tank': 3}

    for node in nx_graph.nodes():
        x, y = pos[node]
        node_trace['x'] += tuple([x])
        node_trace['y'] += tuple([y])
        node_trace['text'] += tuple([node])

        # Color based on node type
        node_type = nx_graph.nodes[node].get('node_type', 'Unknown')
        color_idx = node_colors.get(node_type, 0)
        node_trace['marker']['color'] += tuple([color_idx])
        node_trace['marker']['size'] += tuple([30])

        # Hover information
        hover_text = f"<b>{node}</b><br/>"
        hover_text += f"Type: {node_type}<br/>"
        for key, value in nx_graph.nodes[node].items():
            if key != 'node_type':
                hover_text += f"{key}: {value}<br/>"

        if 'hovertext' not in node_trace:
            node_trace['hovertext'] = []
        node_trace['hovertext'].append(hover_text)

    # Layout settings
    layout = go.Layout(
        title=dict(
            text='Process Knowledge Graph Visualization',
            x=0.5,
            xanchor='center',
            font=dict(size=20, color='#2c3e50')
        ),
        showlegend=False,
        hovermode='closest',
        margin=dict(b=20, l=5, r=5, t=60),
        xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
        yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
        plot_bgcolor='white',
        width=900,
        height=600
    )

    # Create graph
    fig = go.Figure(data=[edge_trace, node_trace], layout=layout)
    fig.show()

    print("\n【Visualization Complete】")
    print(f"  Nodes: {nx_graph.number_of_nodes()}")
    print(f"  Edges: {nx_graph.number_of_edges()}")


# Run demo
print("="*60)
print("Knowledge Graph Visualization (NetworkX + Plotly)")
print("="*60)

rdf_g = create_process_graph()
print(f"\nRDF graph created: {len(rdf_g)} triples")

nx_g = rdf_to_networkx(rdf_g)
print(f"Converted to NetworkX graph: {nx_g.number_of_nodes()} nodes, {nx_g.number_of_edges()} edges")

print("\n【Node List】")
for node, data in nx_g.nodes(data=True):
    print(f"  {node}: {data}")

print("\n【Edge List】")
for u, v, data in nx_g.edges(data=True):
    print(f"  {u} → {v}: {data}")

# Visualize with Plotly
visualize_with_plotly(nx_g)

Output Example:

============================================================
Knowledge Graph Visualization (NetworkX + Plotly)
============================================================

RDF graph created: 13 triples
Converted to NetworkX graph: 5 nodes, 4 edges

【Node List】
  Feed: {'node_type': 'Stream'}
  P101: {'node_type': 'Pump', 'hasFlowRate': '50.0', 'hasPressure': '500.0'}
  R201: {'node_type': 'Reactor', 'hasTemperature': '180.0'}
  T301: {'node_type': 'Tank', 'hasLevel': '75.0'}
  Product: {'node_type': 'Stream'}

【Edge List】
  Feed → P101: {'relation': 'flowsTo'}
  P101 → R201: {'relation': 'flowsTo'}
  R201 → T301: {'relation': 'flowsTo'}
  T301 → Product: {'relation': 'flowsTo'}

【Visualization Complete】
  Nodes: 5
  Edges: 4

Explanation: Convert RDF graphs to NetworkX graphs and visualize interactively with Plotly. Nodes represent process equipment, edges represent material flows, and hovering displays detailed information.


5.3 Automated Process Document Generation

Code Example 3: Template-based Automated Document Generation

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
from jinja2 import Template
from datetime import datetime

# Process document auto-generation system

PROC = Namespace("http://example.org/process#")

def create_process_documentation_kb():
    """Create knowledge base for document generation"""
    g = Graph()
    g.bind("proc", PROC)

    # Process unit
    g.add((PROC.PU100, RDF.type, PROC.ProcessUnit))
    g.add((PROC.PU100, RDFS.label, Literal("Reaction Process Unit")))
    g.add((PROC.PU100, PROC.hasDescription, Literal("Manufacturing of Product B through catalytic reaction of Raw Material A")))

    # Equipment
    equipment_data = [
        {"id": "P101", "type": "Pump", "label": "Raw Material Feed Pump", "spec": "Centrifugal pump, 50 m³/h, 5 bar"},
        {"id": "R201", "type": "Reactor", "label": "Catalytic Reactor", "spec": "CSTR, 10 m³, Catalyst: Pt/Al2O3"},
        {"id": "HX301", "type": "HeatExchanger", "label": "Cooler", "spec": "Shell & Tube, 500 kW"},
        {"id": "T401", "type": "Tank", "label": "Product Storage Tank", "spec": "Vertical cylindrical tank, 50 m³"},
    ]

    for eq in equipment_data:
        eq_uri = PROC[eq['id']]
        g.add((eq_uri, RDF.type, PROC[eq['type']]))
        g.add((eq_uri, RDFS.label, Literal(eq['label'])))
        g.add((eq_uri, PROC.hasSpecification, Literal(eq['spec'])))
        g.add((eq_uri, PROC.belongsTo, PROC.PU100))

    # Operating parameters
    g.add((PROC.R201, PROC.operatingTemperature, Literal(180.0)))
    g.add((PROC.R201, PROC.operatingPressure, Literal(800.0)))
    g.add((PROC.R201, PROC.operatingFlowRate, Literal(50.0)))

    # Safety constraints
    g.add((PROC.R201, PROC.maxTemperature, Literal(200.0)))
    g.add((PROC.R201, PROC.maxPressure, Literal(1000.0)))

    return g


def generate_process_document(g):
    """
    Auto-generate process design document

    Parameters:
        g (Graph): Knowledge graph

    Returns:
        str: Markdown document
    """
    # Get process unit information
    query_pu = prepareQuery("""
        SELECT ?pu ?label ?desc
        WHERE {
            ?pu a proc:ProcessUnit .
            ?pu rdfs:label ?label .
            ?pu proc:hasDescription ?desc .
        }
    """, initNs={"proc": PROC, "rdfs": RDFS})

    pu_info = list(g.query(query_pu))[0]

    # Get equipment list
    query_eq = prepareQuery("""
        SELECT ?equipment ?type ?label ?spec
        WHERE {
            ?equipment a ?type .
            ?equipment rdfs:label ?label .
            ?equipment proc:hasSpecification ?spec .
            ?equipment proc:belongsTo ?pu .
            FILTER (strstarts(str(?type), str(proc:)))
            FILTER (?type != proc:ProcessUnit)
        }
        ORDER BY ?equipment
    """, initNs={"proc": PROC, "rdfs": RDFS})

    equipment_list = []
    for row in g.query(query_eq):
        equipment_list.append({
            'id': str(row.equipment).split('#')[-1],
            'type': str(row.type).split('#')[-1],
            'label': str(row.label),
            'spec': str(row.spec)
        })

    # Get operating parameters
    query_params = prepareQuery("""
        SELECT ?equipment ?temp ?pressure ?flow
        WHERE {
            ?equipment proc:operatingTemperature ?temp .
            ?equipment proc:operatingPressure ?pressure .
            ?equipment proc:operatingFlowRate ?flow .
        }
    """, initNs={"proc": PROC})

    operating_params = {}
    for row in g.query(query_params):
        eq_id = str(row.equipment).split('#')[-1]
        operating_params[eq_id] = {
            'temperature': float(row.temp),
            'pressure': float(row.pressure),
            'flow_rate': float(row.flow)
        }

    # Jinja2 template
    template_str = """# Process Design Document

**Document Number**: DOC-{{ doc_id }}
**Created**: {{ date }}
**Version**: 1.0

---

## 1. Process Overview

**Process Unit**: {{ pu_label }}

{{ pu_description }}

---

## 2. Major Equipment List

| Equipment ID | Equipment Name | Type | Specification |
|--------------|----------------|------|---------------|
{% for eq in equipment_list -%}
| {{ eq.id }} | {{ eq.label }} | {{ eq.type }} | {{ eq.spec }} |
{% endfor %}

---

## 3. Operating Conditions

{% for eq_id, params in operating_params.items() -%}
### {{ eq_id }}

- **Operating Temperature**: {{ params.temperature }} °C
- **Operating Pressure**: {{ params.pressure }} kPa
- **Operating Flow Rate**: {{ params.flow_rate }} m³/h

{% endfor %}

---

## 4. Safety Notes

{% if safety_notes -%}
{% for note in safety_notes -%}
- {{ note }}
{% endfor %}
{% else -%}
(To be automatically extracted from knowledge graph)
{% endif %}

---

## 5. Related Documents

- P&ID Drawing: DWG-PU100-001
- Process Flow Sheet: PFS-PU100
- Equipment Specifications: SPEC-{{ equipment_list[0].id }} ~ SPEC-{{ equipment_list[-1].id }}

---

*This document was automatically generated from the knowledge graph*
"""

    # Populate template with values
    template = Template(template_str)
    document = template.render(
        doc_id="PU100-DESIGN-001",
        date=datetime.now().strftime("%Y-%m-%d"),
        pu_label=str(pu_info.label),
        pu_description=str(pu_info.desc),
        equipment_list=equipment_list,
        operating_params=operating_params,
        safety_notes=None
    )

    return document


# Run demo
print("="*60)
print("Process Document Auto-generation System")
print("="*60)

g = create_process_documentation_kb()
print(f"\nKnowledge graph created: {len(g)} triples")

print("\n【Generating document...】")
document = generate_process_document(g)

print("\n" + "="*60)
print(document)
print("="*60)

# Save to file
with open("process_design_document.md", "w", encoding="utf-8") as f:
    f.write(document)

print("\n【Saved】: process_design_document.md")

Output Example:

============================================================
Process Document Auto-generation System
============================================================

Knowledge graph created: 23 triples

【Generating document...】

============================================================
# Process Design Document

**Document Number**: DOC-PU100-DESIGN-001
**Created**: 2025-10-26
**Version**: 1.0

---

## 1. Process Overview

**Process Unit**: Reaction Process Unit

Manufacturing of Product B through catalytic reaction of Raw Material A

---

## 2. Major Equipment List

| Equipment ID | Equipment Name | Type | Specification |
|--------------|----------------|------|---------------|
| P101 | Raw Material Feed Pump | Pump | Centrifugal pump, 50 m³/h, 5 bar |
| R201 | Catalytic Reactor | Reactor | CSTR, 10 m³, Catalyst: Pt/Al2O3 |
| HX301 | Cooler | HeatExchanger | Shell & Tube, 500 kW |
| T401 | Product Storage Tank | Tank | Vertical cylindrical tank, 50 m³ |

---

## 3. Operating Conditions

### R201

- **Operating Temperature**: 180.0 °C
- **Operating Pressure**: 800.0 kPa
- **Operating Flow Rate**: 50.0 m³/h

---

## 4. Safety Notes

(To be automatically extracted from knowledge graph)

---

## 5. Related Documents

- P&ID Drawing: DWG-PU100-001
- Process Flow Sheet: PFS-PU100
- Equipment Specifications: SPEC-P101 ~ SPEC-T401

---

*This document was automatically generated from the knowledge graph*
============================================================

【Saved】: process_design_document.md

Explanation: Extract data from the knowledge graph using SPARQL and auto-generate Markdown documents with the Jinja2 template engine. Centrally manage equipment lists, operating conditions, and specifications to always generate up-to-date documents.


5.4 Root Cause Analysis (RCA) Application

Code Example 4: Knowledge-based Root Cause Analysis

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import pandas as pd

# Root Cause Analysis (RCA) system

PROC = Namespace("http://example.org/process#")
RCA = Namespace("http://example.org/rca#")

def create_rca_knowledge_base():
    """Create knowledge base for root cause analysis"""
    g = Graph()
    g.bind("proc", PROC)
    g.bind("rca", RCA)

    # Define causal relationships (Cause → Effect)

    # Cause 1: Low suction pressure → Cavitation
    g.add((RCA.LowSuctionPressure, RDF.type, RCA.RootCause))
    g.add((RCA.LowSuctionPressure, RDFS.label, Literal("Low Suction Pressure")))
    g.add((RCA.LowSuctionPressure, RCA.causes, RCA.Cavitation))
    g.add((RCA.LowSuctionPressure, RCA.likelihood, Literal(0.85)))

    g.add((RCA.Cavitation, RDF.type, RCA.Symptom))
    g.add((RCA.Cavitation, RDFS.label, Literal("Cavitation")))
    g.add((RCA.Cavitation, RCA.manifestsAs, Literal("Low Flow")))
    g.add((RCA.Cavitation, RCA.manifestsAs, Literal("High Vibration")))

    # Cause 2: Seal degradation → Leakage
    g.add((RCA.SealDegradation, RDF.type, RCA.RootCause))
    g.add((RCA.SealDegradation, RDFS.label, Literal("Mechanical Seal Degradation")))
    g.add((RCA.SealDegradation, RCA.causes, RCA.Leakage))
    g.add((RCA.SealDegradation, RCA.likelihood, Literal(0.90)))

    g.add((RCA.Leakage, RDF.type, RCA.Symptom))
    g.add((RCA.Leakage, RDFS.label, Literal("Leakage")))
    g.add((RCA.Leakage, RCA.manifestsAs, Literal("Low Pressure")))
    g.add((RCA.Leakage, RCA.manifestsAs, Literal("Liquid Leak Detection")))

    # Cause 3: Bearing wear → Vibration
    g.add((RCA.BearingWear, RDF.type, RCA.RootCause))
    g.add((RCA.BearingWear, RDFS.label, Literal("Bearing Wear")))
    g.add((RCA.BearingWear, RCA.causes, RCA.Vibration))
    g.add((RCA.BearingWear, RCA.likelihood, Literal(0.75)))

    g.add((RCA.Vibration, RDF.type, RCA.Symptom))
    g.add((RCA.Vibration, RDFS.label, Literal("Abnormal Vibration")))
    g.add((RCA.Vibration, RCA.manifestsAs, Literal("High Vibration")))
    g.add((RCA.Vibration, RCA.manifestsAs, Literal("High Temperature")))

    # Define remediation actions
    g.add((RCA.LowSuctionPressure, RCA.remediation, Literal("Check NPSH, inspect suction piping")))
    g.add((RCA.SealDegradation, RCA.remediation, Literal("Replace mechanical seal")))
    g.add((RCA.BearingWear, RCA.remediation, Literal("Replace bearing, refill lubricant")))

    return g


def perform_rca(g, observed_symptoms):
    """
    Infer root causes from observed symptoms

    Parameters:
        g (Graph): RCA knowledge graph
        observed_symptoms (list): List of observed symptoms

    Returns:
        list: List of root cause candidates
    """
    candidates = []

    # Reverse search causal relationships for each symptom
    for symptom_label in observed_symptoms:
        # Search for causes corresponding to symptom
        query = prepareQuery("""
            SELECT ?cause ?causeLabel ?symptom ?symptomLabel ?likelihood ?remediation
            WHERE {
                ?cause a rca:RootCause .
                ?cause rdfs:label ?causeLabel .
                ?cause rca:causes ?symptom .
                ?symptom rdfs:label ?symptomLabel .
                ?symptom rca:manifestsAs ?manifestation .
                ?cause rca:likelihood ?likelihood .
                OPTIONAL { ?cause rca:remediation ?remediation }
                FILTER (str(?manifestation) = ?symptom_str)
            }
        """, initNs={"rca": RCA, "rdfs": RDFS})

        results = g.query(query, initBindings={'symptom_str': Literal(symptom_label)})

        for row in results:
            candidates.append({
                'root_cause': str(row.causeLabel),
                'symptom': str(row.symptomLabel),
                'observed': symptom_label,
                'likelihood': float(row.likelihood),
                'remediation': str(row.remediation) if row.remediation else "(Remediation undefined)"
            })

    # If same root cause corresponds to multiple symptoms, integrate likelihood
    cause_scores = {}
    for cand in candidates:
        cause = cand['root_cause']
        if cause not in cause_scores:
            cause_scores[cause] = {
                'symptoms': [],
                'likelihood': cand['likelihood'],
                'remediation': cand['remediation']
            }
        cause_scores[cause]['symptoms'].append(cand['observed'])
        # Accumulate likelihood if multiple symptoms match
        cause_scores[cause]['likelihood'] = min(1.0, cause_scores[cause]['likelihood'] + 0.1)

    # Sort by likelihood
    ranked_causes = sorted(
        [{'cause': k, **v} for k, v in cause_scores.items()],
        key=lambda x: x['likelihood'],
        reverse=True
    )

    return ranked_causes


# Run demo
print("="*60)
print("Root Cause Analysis (RCA) System")
print("="*60)

g = create_rca_knowledge_base()
print(f"\nRCA knowledge base created: {len(g)} triples")

# Scenario 1: Suspected cavitation
print("\n" + "="*60)
print("【Scenario 1】P-101 Pump Anomaly")
print("="*60)

observed_symptoms_1 = ["Low Flow", "High Vibration"]
print(f"\nObserved symptoms: {observed_symptoms_1}")

print("\n【Running RCA...】")
causes_1 = perform_rca(g, observed_symptoms_1)

print("\n【Root Cause Analysis Results】")
for i, cause in enumerate(causes_1, 1):
    print(f"\n{i}. {cause['cause']} (Likelihood: {cause['likelihood']*100:.1f}%)")
    print(f"   Related symptoms: {', '.join(cause['symptoms'])}")
    print(f"   Recommended action: {cause['remediation']}")

# Scenario 2: Suspected seal leakage
print("\n" + "="*60)
print("【Scenario 2】P-102 Pump Anomaly")
print("="*60)

observed_symptoms_2 = ["Low Pressure", "Liquid Leak Detection"]
print(f"\nObserved symptoms: {observed_symptoms_2}")

print("\n【Running RCA...】")
causes_2 = perform_rca(g, observed_symptoms_2)

print("\n【Root Cause Analysis Results】")
for i, cause in enumerate(causes_2, 1):
    print(f"\n{i}. {cause['cause']} (Likelihood: {cause['likelihood']*100:.1f}%)")
    print(f"   Related symptoms: {', '.join(cause['symptoms'])}")
    print(f"   Recommended action: {cause['remediation']}")

# Convert to DataFrame
all_causes = causes_1 + causes_2
if all_causes:
    df = pd.DataFrame(all_causes)
    print("\n【All RCA Results Summary】")
    print(df[['cause', 'likelihood', 'remediation']].drop_duplicates().to_string(index=False))

Output Example:

============================================================
Root Cause Analysis (RCA) System
============================================================

RCA knowledge base created: 24 triples

============================================================
【Scenario 1】P-101 Pump Anomaly
============================================================

Observed symptoms: ['Low Flow', 'High Vibration']

【Running RCA...】

【Root Cause Analysis Results】

1. Low Suction Pressure (Likelihood: 95.0%)
   Related symptoms: Low Flow, High Vibration
   Recommended action: Check NPSH, inspect suction piping

2. Bearing Wear (Likelihood: 75.0%)
   Related symptoms: High Vibration
   Recommended action: Replace bearing, refill lubricant

============================================================
【Scenario 2】P-102 Pump Anomaly
============================================================

Observed symptoms: ['Low Pressure', 'Liquid Leak Detection']

【Running RCA...】

【Root Cause Analysis Results】

1. Mechanical Seal Degradation (Likelihood: 100.0%)
   Related symptoms: Low Pressure, Liquid Leak Detection
   Recommended action: Replace mechanical seal

【All RCA Results Summary】
                     cause  likelihood                      remediation
       Low Suction Pressure        0.95  Check NPSH, inspect suction piping
               Bearing Wear        0.75    Replace bearing, refill lubricant
Mechanical Seal Degradation        1.00             Replace mechanical seal

Explanation: From observed symptoms, reverse search causal relationships defined in the knowledge graph to infer root causes. Causes matching multiple symptoms get higher likelihood, identifying priority causes for investigation.


5.5 Equipment Recommendation System

Code Example 5: Knowledge-based Equipment Recommendation

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import pandas as pd

# Knowledge-based equipment recommendation system

PROC = Namespace("http://example.org/process#")
SPEC = Namespace("http://example.org/specification#")

def create_equipment_catalog_kb():
    """Create equipment catalog knowledge base"""
    g = Graph()
    g.bind("proc", PROC)
    g.bind("spec", SPEC)

    # Pump catalog
    pumps = [
        {"id": "PUMP_A100", "type": "CentrifugalPump", "flow_min": 40, "flow_max": 60,
         "pressure_min": 400, "pressure_max": 600, "material": "SS316", "cost": 15000},
        {"id": "PUMP_B200", "type": "CentrifugalPump", "flow_min": 80, "flow_max": 120,
         "pressure_min": 800, "pressure_max": 1200, "material": "SS316L", "cost": 25000},
        {"id": "PUMP_C300", "type": "PositiveDisplacementPump", "flow_min": 10, "flow_max": 30,
         "pressure_min": 1000, "pressure_max": 2000, "material": "SS316L", "cost": 35000},
    ]

    for pump in pumps:
        p_uri = PROC[pump['id']]
        g.add((p_uri, RDF.type, PROC[pump['type']]))
        g.add((p_uri, SPEC.flowRateMin, Literal(pump['flow_min'])))
        g.add((p_uri, SPEC.flowRateMax, Literal(pump['flow_max'])))
        g.add((p_uri, SPEC.pressureMin, Literal(pump['pressure_min'])))
        g.add((p_uri, SPEC.pressureMax, Literal(pump['pressure_max'])))
        g.add((p_uri, SPEC.material, Literal(pump['material'])))
        g.add((p_uri, SPEC.cost, Literal(pump['cost'])))

    # Heat exchanger catalog
    heat_exchangers = [
        {"id": "HX_S100", "type": "ShellAndTubeHX", "duty_min": 200, "duty_max": 600,
         "material": "SS316", "cost": 20000},
        {"id": "HX_P200", "type": "PlateHX", "duty_min": 100, "duty_max": 400,
         "material": "SS316", "cost": 12000},
    ]

    for hx in heat_exchangers:
        hx_uri = PROC[hx['id']]
        g.add((hx_uri, RDF.type, PROC[hx['type']]))
        g.add((hx_uri, SPEC.dutyMin, Literal(hx['duty_min'])))
        g.add((hx_uri, SPEC.dutyMax, Literal(hx['duty_max'])))
        g.add((hx_uri, SPEC.material, Literal(hx['material'])))
        g.add((hx_uri, SPEC.cost, Literal(hx['cost'])))

    return g


def recommend_equipment(g, requirements):
    """
    Recommend equipment based on required specifications

    Parameters:
        g (Graph): Equipment catalog KG
        requirements (dict): Required specifications

    Returns:
        list: List of recommended equipment
    """
    recommendations = []

    if requirements['type'] == 'Pump':
        # Pump recommendation
        query = prepareQuery("""
            SELECT ?pump ?pumpType ?flowMin ?flowMax ?pressureMin ?pressureMax ?material ?cost
            WHERE {
                ?pump a ?pumpType .
                ?pump spec:flowRateMin ?flowMin .
                ?pump spec:flowRateMax ?flowMax .
                ?pump spec:pressureMin ?pressureMin .
                ?pump spec:pressureMax ?pressureMax .
                ?pump spec:material ?material .
                ?pump spec:cost ?cost .
                FILTER (strstarts(str(?pumpType), str(proc:)))
                FILTER (?pumpType != proc:Equipment)
            }
        """, initNs={"proc": PROC, "spec": SPEC})

        for row in g.query(query):
            # Calculate specification matching score
            flow_req = requirements['flow_rate']
            pressure_req = requirements['pressure']

            # Check flow range
            if row.flowMin <= flow_req <= row.flowMax:
                flow_score = 1.0
            else:
                # Penalty for out of range
                flow_score = 0.5

            # Check pressure range
            if row.pressureMin <= pressure_req <= row.pressureMax:
                pressure_score = 1.0
            else:
                pressure_score = 0.5

            # Material matching
            if requirements.get('material') == str(row.material):
                material_score = 1.0
            elif str(row.material) in ['SS316L']:  # Higher grade
                material_score = 0.8
            else:
                material_score = 0.6

            # Total score
            total_score = (flow_score * 0.4 + pressure_score * 0.4 + material_score * 0.2)

            # Cost efficiency
            cost_per_performance = float(row.cost) / total_score

            recommendations.append({
                'equipment_id': str(row.pump).split('#')[-1],
                'type': str(row.pumpType).split('#')[-1],
                'flow_range': f"{row.flowMin}-{row.flowMax} m³/h",
                'pressure_range': f"{row.pressureMin}-{row.pressureMax} kPa",
                'material': str(row.material),
                'cost': float(row.cost),
                'score': total_score,
                'cost_efficiency': cost_per_performance
            })

    elif requirements['type'] == 'HeatExchanger':
        # Heat exchanger recommendation
        query = prepareQuery("""
            SELECT ?hx ?hxType ?dutyMin ?dutyMax ?material ?cost
            WHERE {
                ?hx a ?hxType .
                ?hx spec:dutyMin ?dutyMin .
                ?hx spec:dutyMax ?dutyMax .
                ?hx spec:material ?material .
                ?hx spec:cost ?cost .
                FILTER (strstarts(str(?hxType), str(proc:)))
            }
        """, initNs={"proc": PROC, "spec": SPEC})

        for row in g.query(query):
            duty_req = requirements['duty']

            if row.dutyMin <= duty_req <= row.dutyMax:
                duty_score = 1.0
            else:
                duty_score = 0.5

            total_score = duty_score
            cost_per_performance = float(row.cost) / total_score

            recommendations.append({
                'equipment_id': str(row.hx).split('#')[-1],
                'type': str(row.hxType).split('#')[-1],
                'duty_range': f"{row.dutyMin}-{row.dutyMax} kW",
                'material': str(row.material),
                'cost': float(row.cost),
                'score': total_score,
                'cost_efficiency': cost_per_performance
            })

    # Sort by score
    recommendations.sort(key=lambda x: x['score'], reverse=True)

    return recommendations


# Run demo
print("="*60)
print("Knowledge-based Equipment Recommendation System")
print("="*60)

g = create_equipment_catalog_kb()
print(f"\nEquipment catalog KB created: {len(g)} triples")

# Scenario 1: Pump recommendation
print("\n" + "="*60)
print("【Scenario 1】Pump Recommendation")
print("="*60)

pump_requirements = {
    'type': 'Pump',
    'flow_rate': 50.0,  # m³/h
    'pressure': 500.0,  # kPa
    'material': 'SS316'
}

print(f"\nRequired specifications:")
print(f"  Flow rate: {pump_requirements['flow_rate']} m³/h")
print(f"  Pressure: {pump_requirements['pressure']} kPa")
print(f"  Material: {pump_requirements['material']}")

print("\n【Running recommendation...】")
pump_recs = recommend_equipment(g, pump_requirements)

print(f"\n【Recommendation Results】(Top {min(3, len(pump_recs))})")
for i, rec in enumerate(pump_recs[:3], 1):
    print(f"\n{i}. {rec['equipment_id']} ({rec['type']})")
    print(f"   Score: {rec['score']*100:.1f}%")
    print(f"   Flow range: {rec['flow_range']}")
    print(f"   Pressure range: {rec['pressure_range']}")
    print(f"   Material: {rec['material']}")
    print(f"   Price: ${rec['cost']:,}")
    print(f"   Cost efficiency: ${rec['cost_efficiency']:,.0f}/score")

# Scenario 2: Heat exchanger recommendation
print("\n" + "="*60)
print("【Scenario 2】Heat Exchanger Recommendation")
print("="*60)

hx_requirements = {
    'type': 'HeatExchanger',
    'duty': 300.0,  # kW
    'material': 'SS316'
}

print(f"\nRequired specifications:")
print(f"  Heat duty: {hx_requirements['duty']} kW")
print(f"  Material: {hx_requirements['material']}")

print("\n【Running recommendation...】")
hx_recs = recommend_equipment(g, hx_requirements)

print(f"\n【Recommendation Results】(All {len(hx_recs)} items)")
for i, rec in enumerate(hx_recs, 1):
    print(f"\n{i}. {rec['equipment_id']} ({rec['type']})")
    print(f"   Score: {rec['score']*100:.1f}%")
    print(f"   Duty range: {rec['duty_range']}")
    print(f"   Material: {rec['material']}")
    print(f"   Price: ${rec['cost']:,}")

# Convert recommendation results to DataFrame
df_pumps = pd.DataFrame(pump_recs)
print("\n【Pump Recommendation Summary】")
print(df_pumps[['equipment_id', 'score', 'cost', 'cost_efficiency']].to_string(index=False))

Output Example:

============================================================
Knowledge-based Equipment Recommendation System
============================================================

Equipment catalog KB created: 35 triples

============================================================
【Scenario 1】Pump Recommendation
============================================================

Required specifications:
  Flow rate: 50.0 m³/h
  Pressure: 500.0 kPa
  Material: SS316

【Running recommendation...】

【Recommendation Results】(Top 3)

1. PUMP_A100 (CentrifugalPump)
   Score: 100.0%
   Flow range: 40-60 m³/h
   Pressure range: 400-600 kPa
   Material: SS316
   Price: $15,000
   Cost efficiency: $15,000/score

2. PUMP_B200 (CentrifugalPump)
   Score: 80.0%
   Flow range: 80-120 m³/h
   Pressure range: 800-1200 kPa
   Material: SS316L
   Price: $25,000
   Cost efficiency: $31,250/score

3. PUMP_C300 (PositiveDisplacementPump)
   Score: 80.0%
   Flow range: 10-30 m³/h
   Pressure range: 1000-2000 kPa
   Material: SS316L
   Price: $35,000
   Cost efficiency: $43,750/score

============================================================
【Scenario 2】Heat Exchanger Recommendation
============================================================

Required specifications:
  Heat duty: 300.0 kW
  Material: SS316

【Running recommendation...】

【Recommendation Results】(All 2 items)

1. HX_S100 (ShellAndTubeHX)
   Score: 100.0%
   Duty range: 200-600 kW
   Material: SS316
   Price: $20,000

2. HX_P200 (PlateHX)
   Score: 100.0%
   Duty range: 100-400 kW
   Material: SS316
   Price: $12,000

【Pump Recommendation Summary】
  equipment_id  score    cost  cost_efficiency
     PUMP_A100    1.0  15000.0          15000.0
     PUMP_B200    0.8  25000.0          31250.0
     PUMP_C300    0.8  35000.0          43750.0

Explanation: Convert equipment catalogs into knowledge graphs, calculate matching scores with required specifications, and recommend optimal equipment. Rank based on flow rate, pressure, material conditions, score, and cost efficiency.


5.6 Application to Process Optimization

Code Example 6: Process Optimization Using Knowledge Graphs

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
from scipy.optimize import minimize
import numpy as np
import pandas as pd

# Process optimization using knowledge graphs

PROC = Namespace("http://example.org/process#")
OPT = Namespace("http://example.org/optimization#")

def create_optimization_kb():
    """Create knowledge base for optimization"""
    g = Graph()
    g.bind("proc", PROC)
    g.bind("opt", OPT)

    # Process unit: Reactor
    g.add((PROC.R201, RDF.type, PROC.Reactor))
    g.add((PROC.R201, RDFS.label, Literal("Catalytic Reactor R-201")))

    # Decision Variables
    g.add((PROC.R201, OPT.hasDecisionVariable, OPT.Temperature))
    g.add((OPT.Temperature, RDFS.label, Literal("Reaction Temperature")))
    g.add((OPT.Temperature, OPT.lowerBound, Literal(150.0)))
    g.add((OPT.Temperature, OPT.upperBound, Literal(200.0)))
    g.add((OPT.Temperature, OPT.currentValue, Literal(175.0)))

    g.add((PROC.R201, OPT.hasDecisionVariable, OPT.Pressure))
    g.add((OPT.Pressure, RDFS.label, Literal("Reaction Pressure")))
    g.add((OPT.Pressure, OPT.lowerBound, Literal(600.0)))
    g.add((OPT.Pressure, OPT.upperBound, Literal(1000.0)))
    g.add((OPT.Pressure, OPT.currentValue, Literal(800.0)))

    # Constraints
    g.add((PROC.R201, OPT.hasConstraint, OPT.SafetyConstraint1))
    g.add((OPT.SafetyConstraint1, RDFS.label, Literal("Temperature-Pressure Safety Constraint")))
    g.add((OPT.SafetyConstraint1, OPT.expression, Literal("T + 0.1*P <= 250")))

    # Objective function (Profit maximization)
    g.add((PROC.R201, OPT.hasObjective, OPT.ProfitMaximization))
    g.add((OPT.ProfitMaximization, RDFS.label, Literal("Profit Maximization")))
    g.add((OPT.ProfitMaximization, OPT.expression,
           Literal("Profit = Yield * ProductPrice - OperatingCost")))

    # Process model parameters
    g.add((PROC.R201, OPT.productPrice, Literal(100.0)))  # $/kg
    g.add((PROC.R201, OPT.feedCost, Literal(50.0)))  # $/kg
    g.add((PROC.R201, OPT.energyCostFactor, Literal(0.05)))  # $/(°C*kPa)

    return g


def extract_optimization_problem(g):
    """
    Extract optimization problem from knowledge graph

    Returns:
        dict: Optimization problem definition
    """
    problem = {
        'decision_variables': [],
        'bounds': [],
        'constraints': [],
        'parameters': {}
    }

    # Extract decision variables
    query_vars = prepareQuery("""
        SELECT ?var ?label ?lb ?ub ?current
        WHERE {
            ?reactor opt:hasDecisionVariable ?var .
            ?var rdfs:label ?label .
            ?var opt:lowerBound ?lb .
            ?var opt:upperBound ?ub .
            ?var opt:currentValue ?current .
        }
    """, initNs={"opt": OPT, "rdfs": RDFS})

    for row in g.query(query_vars):
        var_name = str(row.label)
        problem['decision_variables'].append(var_name)
        problem['bounds'].append((float(row.lb), float(row.ub)))

    # Extract parameters
    query_params = prepareQuery("""
        SELECT ?property ?value
        WHERE {
            proc:R201 ?property ?value .
            FILTER (strstarts(str(?property), str(opt:)))
        }
    """, initNs={"proc": PROC, "opt": OPT})

    for row in g.query(query_params):
        prop_name = str(row.property).split('#')[-1]
        if prop_name not in ['hasDecisionVariable', 'hasConstraint', 'hasObjective']:
            problem['parameters'][prop_name] = float(row.value)

    return problem


def process_model(x, params):
    """
    Process model (simplified version)

    Parameters:
        x (array): [Temperature, Pressure]
        params (dict): Model parameters

    Returns:
        dict: Calculation results
    """
    T, P = x  # Temperature[°C], Pressure[kPa]

    # Reaction rate model (simplified Arrhenius equation)
    k = 0.01 * np.exp(0.05 * (T - 150))

    # Yield (function of reaction rate and pressure)
    yield_fraction = k * (P / 1000) / (1 + k * (P / 1000))

    # Product yield [kg/h]
    feed_rate = 100.0  # kg/h
    product_rate = feed_rate * yield_fraction

    # Profit calculation
    product_value = product_rate * params['productPrice']
    feed_cost = feed_rate * params['feedCost']
    energy_cost = params['energyCostFactor'] * T * P / 1000

    profit = product_value - feed_cost - energy_cost

    return {
        'yield': yield_fraction,
        'product_rate': product_rate,
        'profit': profit,
        'energy_cost': energy_cost
    }


def objective_function(x, params):
    """Objective function (convert maximization to minimization)"""
    result = process_model(x, params)
    return -result['profit']  # Maximization → Minimization


def safety_constraint(x):
    """Safety constraint: T + 0.1*P <= 250"""
    T, P = x
    return 250 - (T + 0.1 * P)  # Must satisfy >= 0


# Run demo
print("="*60)
print("Knowledge Graph-based Process Optimization")
print("="*60)

g = create_optimization_kb()
print(f"\nOptimization knowledge base created: {len(g)} triples")

# Extract optimization problem
opt_problem = extract_optimization_problem(g)

print("\n【Optimization Problem Definition】")
print(f"  Decision variables: {opt_problem['decision_variables']}")
print(f"  Variable ranges: {opt_problem['bounds']}")
print(f"  Parameters: {opt_problem['parameters']}")

# Evaluate current operating point
x_current = np.array([175.0, 800.0])
current_result = process_model(x_current, opt_problem['parameters'])

print("\n【Current Operating Point】")
print(f"  Temperature: {x_current[0]} °C")
print(f"  Pressure: {x_current[1]} kPa")
print(f"  Yield: {current_result['yield']*100:.2f}%")
print(f"  Product production rate: {current_result['product_rate']:.2f} kg/h")
print(f"  Profit: ${current_result['profit']:.2f}/h")

# Run optimization
print("\n【Running optimization...】")

constraint = {'type': 'ineq', 'fun': safety_constraint}

result = minimize(
    objective_function,
    x_current,
    args=(opt_problem['parameters'],),
    method='SLSQP',
    bounds=opt_problem['bounds'],
    constraints=[constraint]
)

if result.success:
    x_optimal = result.x
    optimal_result = process_model(x_optimal, opt_problem['parameters'])

    print("\n【Optimization Results】")
    print(f"  Optimal temperature: {x_optimal[0]:.2f} °C")
    print(f"  Optimal pressure: {x_optimal[1]:.2f} kPa")
    print(f"  Optimal yield: {optimal_result['yield']*100:.2f}%")
    print(f"  Optimal product production rate: {optimal_result['product_rate']:.2f} kg/h")
    print(f"  Optimal profit: ${optimal_result['profit']:.2f}/h")

    print("\n【Improvement Effect】")
    profit_improvement = optimal_result['profit'] - current_result['profit']
    print(f"  Profit improvement: ${profit_improvement:.2f}/h ({profit_improvement/current_result['profit']*100:.2f}% increase)")
    print(f"  Annual profit improvement: ${profit_improvement * 8760:.0f}")

    # Constraint check
    constraint_value = safety_constraint(x_optimal)
    print(f"\n【Constraint Satisfaction Verification】")
    print(f"  Safety constraint (T + 0.1*P <= 250): {x_optimal[0] + 0.1*x_optimal[1]:.2f} <= 250")
    print(f"  Margin: {constraint_value:.2f} °C")

else:
    print(f"\nOptimization failed: {result.message}")

# Comparison table
comparison_df = pd.DataFrame({
    'Item': ['Temperature[°C]', 'Pressure[kPa]', 'Yield[%]', 'Production rate[kg/h]', 'Profit[$/h]'],
    'Current': [x_current[0], x_current[1], current_result['yield']*100,
             current_result['product_rate'], current_result['profit']],
    'Optimal': [x_optimal[0], x_optimal[1], optimal_result['yield']*100,
             optimal_result['product_rate'], optimal_result['profit']]
})

print("\n【Operating Condition Comparison Table】")
print(comparison_df.to_string(index=False))

Output Example:

============================================================
Knowledge Graph-based Process Optimization
============================================================

Optimization knowledge base created: 18 triples

【Optimization Problem Definition】
  Decision variables: ['Reaction Temperature', 'Reaction Pressure']
  Variable ranges: [(150.0, 200.0), (600.0, 1000.0)]
  Parameters: {'productPrice': 100.0, 'feedCost': 50.0, 'energyCostFactor': 0.05}

【Current Operating Point】
  Temperature: 175.0 °C
  Pressure: 800.0 kPa
  Yield: 56.78%
  Product production rate: 56.78 kg/h
  Profit: $606.80/h

【Running optimization...】

【Optimization Results】
  Optimal temperature: 200.00 °C
  Optimal pressure: 1000.00 kPa
  Optimal yield: 74.37%
  Optimal product production rate: 74.37 kg/h
  Optimal profit: $826.34/h

【Improvement Effect】
  Profit improvement: $219.54/h (36.18% increase)
  Annual profit improvement: $1,923,210

【Constraint Satisfaction Verification】
  Safety constraint (T + 0.1*P <= 250): 250.00 <= 250
  Margin: 0.00 °C

【Operating Condition Comparison Table】
                 Item  Current  Optimal
      Temperature[°C]   175.00   200.00
       Pressure[kPa]   800.00  1000.00
            Yield[%]    56.78    74.37
Production rate[kg/h]    56.78    74.37
         Profit[$/h]   606.80   826.34

Explanation: Extract decision variables, constraints, and objective functions from the knowledge graph and execute optimization with scipy.optimize. Centrally managing process models and optimization problem definitions in the knowledge graph improves maintainability and extensibility.


5.7 Complete Integrated System

Code Example 7: Integrated System of API + Reasoning + Visualization

# Requirements:
# - Python 3.9+
# - flask>=2.3.0
# - networkx>=3.1.0
# - plotly>=5.14.0

"""
Example: Code Example 7: Integrated System of API + Reasoning + Visua

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""

from flask import Flask, request, jsonify, render_template_string
from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import networkx as nx
import plotly.graph_objects as go
import json

# Complete integrated system: API + Reasoning + Visualization

app = Flask(__name__)

PROC = Namespace("http://example.org/process#")
DIAG = Namespace("http://example.org/diagnosis#")

# Global knowledge graph
g = Graph()
g.bind("proc", PROC)
g.bind("diag", DIAG)

def initialize_integrated_kb():
    """Initialize integrated knowledge base"""
    # Process equipment
    equipment = [
        {"id": "P101", "type": "Pump", "flow": 50.0, "pressure": 500.0, "vibration": 2.5, "status": "Normal"},
        {"id": "P102", "type": "Pump", "flow": 30.0, "pressure": 450.0, "vibration": 8.5, "status": "Warning"},
        {"id": "R201", "type": "Reactor", "temp": 180.0, "pressure": 800.0, "status": "Normal"},
    ]

    for eq in equipment:
        eq_uri = PROC[eq['id']]
        g.add((eq_uri, RDF.type, PROC[eq['type']]))
        g.add((eq_uri, PROC.hasStatus, Literal(eq['status'])))

        if 'flow' in eq:
            g.add((eq_uri, PROC.hasFlowRate, Literal(eq['flow'])))
        if 'pressure' in eq:
            g.add((eq_uri, PROC.hasPressure, Literal(eq['pressure'])))
        if 'vibration' in eq:
            g.add((eq_uri, PROC.hasVibration, Literal(eq['vibration'])))
        if 'temp' in eq:
            g.add((eq_uri, PROC.hasTemperature, Literal(eq['temp'])))

    # Process flow
    g.add((PROC.P101, PROC.flowsTo, PROC.R201))
    g.add((PROC.P102, PROC.flowsTo, PROC.R201))

    # Fault diagnosis rules
    g.add((DIAG.CavitationRule, RDF.type, DIAG.DiagnosticRule))
    g.add((DIAG.CavitationRule, DIAG.condition, Literal("flow < 40 AND vibration > 5")))
    g.add((DIAG.CavitationRule, DIAG.diagnosis, Literal("Suspected cavitation")))
    g.add((DIAG.CavitationRule, DIAG.action, Literal("Check NPSH")))

    print(f"Integrated knowledge base initialized: {len(g)} triples")


# === API Endpoints ===

@app.route('/')
def index():
    """Integrated dashboard HTML"""
    html = """
<!DOCTYPE html>




    <title>Process Knowledge Graph Integrated System</title>
        <link href="../../assets/css/knowledge-base.css" rel="stylesheet"/>


    <h1>🏭 Process Knowledge Graph Integrated System</h1>

    <div class="section">
        <h2>📊 System Overview</h2>
        <p>Integrated Features: SPARQL API + Reasoning Engine + Visualization</p>
        <button onclick="getStats()">Get Statistics</button>
        <pre id="stats"></pre>
    </div>

    <div class="section">
        <h2>🔍 Equipment List</h2>
        <button onclick="getEquipment()">Get Equipment List</button>
        <pre id="equipment"></pre>
    </div>

    <div class="section">
        <h2>⚡ Reasoning Engine</h2>
        <button onclick="runInference()">Run Anomaly Detection Reasoning</button>
        <pre id="inference"></pre>
    </div>

    <div class="section">
        <h2>🌐 Process Flow Graph</h2>
        <button onclick="getGraph()">Get Graph</button>
        <pre id="graph"></pre>
    </div>

    <div class="section">
        <h2>🔧 Custom SPARQL Query</h2>
        <textarea id="sparql" rows="5" style="width: 100%; font-family: monospace;">
SELECT ?equipment ?status
WHERE {
    ?equipment proc:hasStatus ?status .
}
        </textarea>
        <button onclick="runSparql()">Execute Query</button>
        <pre id="sparql-result"></pre>
    </div>

    <script>
        const API_BASE = '';

        async function getStats() {
            const res = await fetch(API_BASE + '/api/statistics');
            const data = await res.json();
            document.getElementById('stats').textContent = JSON.stringify(data, null, 2);
        }

        async function getEquipment() {
            const res = await fetch(API_BASE + '/api/equipment');
            const data = await res.json();
            document.getElementById('equipment').textContent = JSON.stringify(data, null, 2);
        }

        async function runInference() {
            const res = await fetch(API_BASE + '/api/inference', { method: 'POST' });
            const data = await res.json();
            document.getElementById('inference').textContent = JSON.stringify(data, null, 2);
        }

        async function getGraph() {
            const res = await fetch(API_BASE + '/api/graph');
            const data = await res.json();
            document.getElementById('graph').textContent = JSON.stringify(data, null, 2);
        }

        async function runSparql() {
            const query = document.getElementById('sparql').value;
            const res = await fetch(API_BASE + '/api/sparql', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ query })
            });
            const data = await res.json();
            document.getElementById('sparql-result').textContent = JSON.stringify(data, null, 2);
        }
    </script>


    """
    return render_template_string(html)


@app.route('/api/statistics', methods=['GET'])
def get_statistics():
    """Statistics API"""
    stats = {
        'total_triples': len(g),
        'equipment_count': len(list(g.subjects(RDF.type, PROC.Pump))) + len(list(g.subjects(RDF.type, PROC.Reactor))),
        'connection_count': len(list(g.subject_objects(PROC.flowsTo)))
    }
    return jsonify({"status": "success", "statistics": stats})


@app.route('/api/equipment', methods=['GET'])
def get_equipment():
    """Equipment list API"""
    query = prepareQuery("""
        SELECT ?equipment ?type ?status
        WHERE {
            ?equipment a ?type .
            ?equipment proc:hasStatus ?status .
            FILTER (?type IN (proc:Pump, proc:Reactor))
        }
    """, initNs={"proc": PROC})

    results = []
    for row in g.query(query):
        results.append({
            "id": str(row.equipment).split('#')[-1],
            "type": str(row.type).split('#')[-1],
            "status": str(row.status)
        })

    return jsonify({"status": "success", "equipment": results})


@app.route('/api/inference', methods=['POST'])
def run_inference():
    """Reasoning engine API (anomaly detection)"""
    anomalies = []

    # Rule: Low flow + High vibration → Cavitation
    query = prepareQuery("""
        SELECT ?equipment ?flow ?vibration
        WHERE {
            ?equipment proc:hasFlowRate ?flow .
            ?equipment proc:hasVibration ?vibration .
            FILTER (?flow < 40 && ?vibration > 5)
        }
    """, initNs={"proc": PROC})

    for row in g.query(query):
        anomalies.append({
            "equipment": str(row.equipment).split('#')[-1],
            "diagnosis": "Suspected cavitation",
            "symptoms": f"Low flow({row.flow} m³/h), High vibration({row.vibration} mm/s)",
            "action": "Check NPSH, inspect suction piping"
        })

    return jsonify({"status": "success", "anomalies": anomalies})


@app.route('/api/graph', methods=['GET'])
def get_graph():
    """Process flow graph API"""
    nx_graph = nx.DiGraph()

    # Add nodes
    for eq_uri in g.subjects(RDF.type, None):
        eq_id = str(eq_uri).split('#')[-1]
        if eq_id in ['P101', 'P102', 'R201']:
            nx_graph.add_node(eq_id)

    # Add edges
    for s, p, o in g.triples((None, PROC.flowsTo, None)):
        source = str(s).split('#')[-1]
        target = str(o).split('#')[-1]
        nx_graph.add_edge(source, target)

    # Return in JSON format
    graph_data = {
        "nodes": [{"id": n} for n in nx_graph.nodes()],
        "edges": [{"source": u, "target": v} for u, v in nx_graph.edges()]
    }

    return jsonify({"status": "success", "graph": graph_data})


@app.route('/api/sparql', methods=['POST'])
def sparql_endpoint():
    """SPARQL endpoint"""
    data = request.get_json()
    query_str = data.get('query', '')

    try:
        query = prepareQuery(query_str, initNs={"proc": PROC, "diag": DIAG, "rdf": RDF, "rdfs": RDFS})
        results = g.query(query)

        response = []
        for row in results:
            row_dict = {}
            for var in results.vars:
                value = row[var]
                row_dict[str(var)] = str(value).split('#')[-1] if '#' in str(value) else str(value)
            response.append(row_dict)

        return jsonify({"status": "success", "count": len(response), "results": response})
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 400


# Run demo
def demo_integrated_system():
    """Integrated system demo"""
    print("="*60)
    print("Complete Integrated System: API + Reasoning + Visualization")
    print("="*60)

    initialize_integrated_kb()

    print("\n【Integrated System Features】")
    print("  1. RESTful API - Equipment information, statistics, graph data")
    print("  2. Reasoning Engine - Anomaly detection, fault diagnosis")
    print("  3. SPARQL Endpoint - Custom queries")
    print("  4. Web Dashboard - Integrated UI")

    print("\n【How to Launch】")
    print("  python integrated_system.py")
    print("  or: flask run --port 5000")
    print("\n【Access】")
    print("  Open http://localhost:5000 in browser")

    print("\n【API Endpoints List】")
    print("  GET  / - Dashboard")
    print("  GET  /api/statistics - Statistics information")
    print("  GET  /api/equipment - Equipment list")
    print("  POST /api/inference - Anomaly detection reasoning")
    print("  GET  /api/graph - Process graph")
    print("  POST /api/sparql - Custom query")


# Run demo
demo_integrated_system()

# Flask application startup
# if __name__ == '__main__':
#     initialize_integrated_kb()
#     app.run(debug=True, port=5000)

Output Example:

============================================================
Complete Integrated System: API + Reasoning + Visualization
============================================================
Integrated knowledge base initialized: 17 triples

【Integrated System Features】
  1. RESTful API - Equipment information, statistics, graph data
  2. Reasoning Engine - Anomaly detection, fault diagnosis
  3. SPARQL Endpoint - Custom queries
  4. Web Dashboard - Integrated UI

【How to Launch】
  python integrated_system.py
  or: flask run --port 5000

【Access】
  Open http://localhost:5000 in browser

【API Endpoints List】
  GET  / - Dashboard
  GET  /api/statistics - Statistics information
  GET  /api/equipment - Equipment list
  POST /api/inference - Anomaly detection reasoning
  GET  /api/graph - Process graph
  POST /api/sparql - Custom query

Explanation: Build a web application with Flask integrating knowledge graph API, reasoning engine, SPARQL endpoint, and interactive dashboard. Access all features from the browser, operating as an end-to-end process knowledge management system.


5.8 Chapter Summary

What We Learned

  1. SPARQL Endpoint API
    • Build RESTful API with Flask
    • Execute arbitrary queries with POST /api/sparql
    • Retrieve equipment information with GET /api/equipment
    • JSON format responses
  2. Knowledge Graph Visualization
    • Convert RDF graphs to NetworkX graphs
    • Interactive visualization with Plotly
    • Display node and edge attributes
    • Visual understanding of process flows
  3. Automated Document Generation
    • Utilize Jinja2 template engine
    • Data extraction with SPARQL
    • Generate Markdown/HTML documents
    • Auto-update equipment lists and operating conditions
  4. Root Cause Analysis (RCA)
    • Reverse search causal relationships from symptoms
    • Likelihood scoring
    • Present recommended actions
    • Integrated evaluation of multiple symptoms
  5. Equipment Recommendation System
    • Match requirements with catalog specifications
    • Ranking by scoring function
    • Cost efficiency evaluation
    • Multi-criteria decision support
  6. Process Optimization
    • Extract optimization problems from knowledge graph
    • Integration with scipy.optimize
    • Automatic application of constraints
    • Derive optimal operating conditions
  7. Complete Integrated System
    • Flask web application
    • Integration of API + reasoning + visualization
    • Interactive dashboard
    • End-to-end knowledge management

Key Points

SPARQL endpoints enable external systems to access knowledge graphs, while visualization provides intuitive understanding of complex process flows. Template engines auto-generate documents maintaining up-to-date status. RCA systems systematize fault diagnosis enabling rapid response, and recommendation systems support optimal equipment selection to streamline decision-making. Managing optimization problems in knowledge graphs improves maintainability. Integrated systems position knowledge graphs at the core of process operations, and API-first design supports microservices architecture.

Toward Practical Implementation

The process ontology and knowledge graph technologies learned in this series can be applied to actual processes. Digital Twins represent physical process knowledge in knowledge graphs. Smart Plants integrate IoT sensor data with knowledge graphs. Process Safety Management enables knowledge-based HAZOP and LOPA analysis. Equipment Maintenance Optimization systematizes failure history and maintenance plans. Energy Management optimizes process-wide energy flows. Knowledge Transfer formalizes veteran knowledge as ontologies. Regulatory Compliance provides knowledge-based management of regulatory requirements.

Further Learning Resources

For continued learning, explore Semantic Web Technologies through W3C RDF/OWL/SPARQL specifications. Ontology Engineering can be practiced with tools like Protege and TopBraid. Knowledge Graph Databases such as Neo4j, Apache Jena, and Stardog offer production-ready solutions. Process Engineering standards including CAPE-OPEN and OPC UA information models provide industry context. Implementation Frameworks including rdflib, owlready2, and SPARQLWrapper support Python-based development.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer