🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 4: Process Knowledge Reasoning and Inference Engines

RDFS/OWL Reasoning, SWRL, Custom Inference Rules, Anomaly Detection Applications

📖 Reading Time: 35-40 minutes 📊 Difficulty: Advanced 💻 Code Examples: 7

This chapter covers Process Knowledge Reasoning and Inference Engines. You will learn class hierarchies, inference rules for process safety checks, and Perform ontology consistency checking.

Learning Objectives

By reading this chapter, you will master the following:


4.1 RDFS Reasoning Fundamentals

Code Example 1: RDFS Class Hierarchy Reasoning

from rdflib import Graph, Namespace, RDF, RDFS, Literal, URIRef
from rdflib.plugins.sparql import prepareQuery

# Implementation of RDFS class hierarchy reasoning

# Namespace definitions
PROC = Namespace("http://example.org/process#")
XSD = Namespace("http://www.w3.org/2001/XMLSchema#")

def create_rdfs_hierarchy():
    """
    Build RDFS class hierarchy

    Hierarchy structure:
    Equipment
    ├── RotatingEquipment
    │   ├── Pump
    │   └── Compressor
    └── HeatExchanger
        ├── ShellAndTube
        └── PlateHeatExchanger

    Returns:
        Graph: RDF graph
    """
    g = Graph()
    g.bind("proc", PROC)
    g.bind("xsd", XSD)

    # Class hierarchy definition
    # Top level: Equipment
    g.add((PROC.Equipment, RDF.type, RDFS.Class))
    g.add((PROC.Equipment, RDFS.label, Literal("Equipment", lang="en")))

    # Subclass: RotatingEquipment
    g.add((PROC.RotatingEquipment, RDF.type, RDFS.Class))
    g.add((PROC.RotatingEquipment, RDFS.subClassOf, PROC.Equipment))
    g.add((PROC.RotatingEquipment, RDFS.label, Literal("Rotating Equipment", lang="en")))

    # Pump class
    g.add((PROC.Pump, RDF.type, RDFS.Class))
    g.add((PROC.Pump, RDFS.subClassOf, PROC.RotatingEquipment))
    g.add((PROC.Pump, RDFS.label, Literal("Pump", lang="en")))

    # Compressor class
    g.add((PROC.Compressor, RDF.type, RDFS.Class))
    g.add((PROC.Compressor, RDFS.subClassOf, PROC.RotatingEquipment))
    g.add((PROC.Compressor, RDFS.label, Literal("Compressor", lang="en")))

    # HeatExchanger class
    g.add((PROC.HeatExchanger, RDF.type, RDFS.Class))
    g.add((PROC.HeatExchanger, RDFS.subClassOf, PROC.Equipment))
    g.add((PROC.HeatExchanger, RDFS.label, Literal("Heat Exchanger", lang="en")))

    # Property definitions
    g.add((PROC.hasMaintenanceInterval, RDF.type, RDF.Property))
    g.add((PROC.hasMaintenanceInterval, RDFS.domain, PROC.Equipment))
    g.add((PROC.hasMaintenanceInterval, RDFS.range, XSD.integer))

    # Instance creation
    g.add((PROC.P101, RDF.type, PROC.Pump))
    g.add((PROC.P101, RDFS.label, Literal("Feed Pump P-101")))
    g.add((PROC.P101, PROC.hasMaintenanceInterval, Literal(180, datatype=XSD.integer)))

    g.add((PROC.C201, RDF.type, PROC.Compressor))
    g.add((PROC.C201, RDFS.label, Literal("Air Compressor C-201")))

    return g


def infer_rdfs_reasoning(g):
    """
    Execute RDFS reasoning to infer implicit knowledge

    Parameters:
        g (Graph): RDF graph

    Returns:
        dict: Inference results
    """
    results = {}

    # Inference 1: Type inference through class hierarchy
    # P101 is a Pump, and Pump is a subclass of RotatingEquipment
    # Therefore P101 is also a RotatingEquipment and Equipment

    query_types = prepareQuery("""
        SELECT ?instance ?type
        WHERE {
            ?instance a ?directType .
            ?directType rdfs:subClassOf* ?type .
        }
    """, initNs={"rdfs": RDFS})

    qres_types = g.query(query_types)
    inferred_types = {}
    for row in qres_types:
        inst = str(row.instance).split('#')[-1]
        typ = str(row.type).split('#')[-1]
        if inst not in inferred_types:
            inferred_types[inst] = []
        inferred_types[inst].append(typ)

    results['inferred_types'] = inferred_types

    # Inference 2: Property inheritance
    # RotatingEquipment is a subclass of Equipment, so
    # it inherits the hasMaintenanceInterval property
    query_props = prepareQuery("""
        SELECT ?class ?property
        WHERE {
            ?property rdfs:domain ?superClass .
            ?class rdfs:subClassOf* ?superClass .
        }
    """, initNs={"rdfs": RDFS})

    qres_props = g.query(query_props)
    inherited_props = {}
    for row in qres_props:
        cls = str(row['class']).split('#')[-1]
        prop = str(row.property).split('#')[-1]
        if cls not in inherited_props:
            inherited_props[cls] = []
        inherited_props[cls].append(prop)

    results['inherited_properties'] = inherited_props

    return results


# Execution and demo
print("="*60)
print("RDFS Reasoning: Class Hierarchy and Property Inheritance")
print("="*60)

g = create_rdfs_hierarchy()

print("\n[Graph Statistics]")
print(f"  Number of triples: {len(g)}")
print(f"  Number of classes: {len(list(g.subjects(RDF.type, RDFS.Class)))}")

print("\n[Inference Execution]")
inference_results = infer_rdfs_reasoning(g)

print("\n[Inference Result 1: Type Inheritance]")
for instance, types in inference_results['inferred_types'].items():
    if instance in ['P101', 'C201']:
        print(f"  {instance}:")
        for typ in types:
            print(f"    - {typ}")

print("\n[Inference Result 2: Property Inheritance]")
for cls, props in inference_results['inherited_properties'].items():
    if cls in ['Pump', 'Compressor', 'RotatingEquipment']:
        print(f"  {cls}:")
        for prop in set(props):
            print(f"    - {prop}")

# Output in Turtle format
print("\n[RDF Graph (Turtle Format)]")
print(g.serialize(format='turtle')[:500] + "...")

Example Output:

============================================================
RDFS Reasoning: Class Hierarchy and Property Inheritance
============================================================

[Graph Statistics]
  Number of triples: 19
  Number of classes: 5

[Inference Execution]

[Inference Result 1: Type Inheritance]
  P101:
    - Pump
    - RotatingEquipment
    - Equipment
  C201:
    - Compressor
    - RotatingEquipment
    - Equipment

[Inference Result 2: Property Inheritance]
  Pump:
    - hasMaintenanceInterval
  Compressor:
    - hasMaintenanceInterval
  RotatingEquipment:
    - hasMaintenanceInterval

Explanation: RDFS reasoning automatically infers type inheritance and property inheritance based on class hierarchies. P-101 is defined as a Pump, but RDFS reasoning derives that it is also a RotatingEquipment and Equipment.


4.2 OWL Reasoning and Owlready2

Code Example 2: OWL Reasoning Engine (HermiT/Pellet)

from owlready2 import *
import owlready2

# OWL reasoning engine implementation

def create_owl_process_ontology():
    """
    Build process knowledge with OWL ontology

    Returns:
        Ontology: Owlready2 ontology
    """
    # Create new ontology
    onto = get_ontology("http://example.org/process.owl")

    with onto:
        # Class definitions
        class Equipment(Thing):
            """Base class for equipment"""
            pass

        class Pump(Equipment):
            """Pump"""
            pass

        class CentrifugalPump(Pump):
            """Centrifugal pump"""
            pass

        class Reactor(Equipment):
            """Reactor"""
            pass

        class CSTR(Reactor):
            """Continuous Stirred Tank Reactor"""
            pass

        # Property definitions
        class hasFlowRate(Equipment >> float, FunctionalProperty):
            """Flow rate [m³/h]"""
            pass

        class hasPressure(Equipment >> float):
            """Pressure [kPa]"""
            pass

        class hasTemperature(Equipment >> float):
            """Temperature [°C]"""
            pass

        class isConnectedTo(Equipment >> Equipment):
            """Connection relationship"""
            pass

        # Constraint definitions (OWL constraints)
        class HighPressureEquipment(Equipment):
            """High pressure equipment (pressure > 1000 kPa)"""
            equivalent_to = [Equipment & hasPressure.some(float >= 1000.0)]

        class LowTemperatureEquipment(Equipment):
            """Low temperature equipment (temperature < 0°C)"""
            equivalent_to = [Equipment & hasTemperature.some(float < 0.0)]

        # Instance creation
        p101 = CentrifugalPump("P101")
        p101.hasFlowRate = [50.0]
        p101.hasPressure = [1500.0]  # High pressure
        p101.hasTemperature = [25.0]

        r201 = CSTR("R201")
        r201.hasPressure = [800.0]
        r201.hasTemperature = [180.0]

        p102 = Pump("P102")
        p102.hasPressure = [1200.0]
        p102.hasTemperature = [-15.0]  # Low temperature

    return onto


def run_owl_reasoning(onto):
    """
    Execute reasoning with OWL reasoning engine

    Parameters:
        onto (Ontology): Ontology

    Returns:
        dict: Inference results
    """
    print("\n[Class Classification Before Reasoning]")
    print(f"  P101 classes: {onto.P101.is_a}")
    print(f"  P102 classes: {onto.P102.is_a}")

    # Execute HermiT reasoning engine
    # (Java environment required. Use Pellet otherwise)
    print("\n[Executing OWL Reasoning...]")
    try:
        # sync_reasoner_pellet() or sync_reasoner_hermit()
        with onto:
            sync_reasoner_pellet(infer_property_values=True, infer_data_property_values=True)

        print("  Reasoning successful (Using Pellet)")
    except Exception as e:
        print(f"  Reasoning engine error: {e}")
        print("  (Java environment required)")
        return None

    print("\n[Class Classification After Reasoning]")
    print(f"  P101 classes: {onto.P101.is_a}")
    print(f"  P102 classes: {onto.P102.is_a}")

    # Collect inference results
    results = {
        'high_pressure_equipment': list(onto.HighPressureEquipment.instances()),
        'low_temperature_equipment': list(onto.LowTemperatureEquipment.instances())
    }

    return results


# Execution demo
print("="*60)
print("OWL Reasoning Engine: Automatic Class Classification")
print("="*60)

onto = create_owl_process_ontology()

print("\n[Ontology Statistics]")
print(f"  Number of classes: {len(list(onto.classes()))}")
print(f"  Number of properties: {len(list(onto.properties()))}")
print(f"  Number of instances: {len(list(onto.individuals()))}")

# Execute OWL reasoning
inference_results = run_owl_reasoning(onto)

if inference_results:
    print("\n[Inference Results: High Pressure Equipment]")
    for eq in inference_results['high_pressure_equipment']:
        print(f"  - {eq.name}: Pressure {eq.hasPressure[0]} kPa")

    print("\n[Inference Results: Low Temperature Equipment]")
    for eq in inference_results['low_temperature_equipment']:
        print(f"  - {eq.name}: Temperature {eq.hasTemperature[0]} °C")

# Save in OWL/RDF format
onto.save(file="process_ontology.owl", format="rdfxml")
print("\n[Ontology Saved]")
print("  File: process_ontology.owl")

Example Output:

============================================================
OWL Reasoning Engine: Automatic Class Classification
============================================================

[Ontology Statistics]
  Number of classes: 9
  Number of properties: 5
  Number of instances: 3

[Class Classification Before Reasoning]
  P101 classes: [process.CentrifugalPump]
  P102 classes: [process.Pump]

[Executing OWL Reasoning...]
  Reasoning successful (Using Pellet)

[Class Classification After Reasoning]
  P101 classes: [process.CentrifugalPump, process.HighPressureEquipment]
  P102 classes: [process.Pump, process.HighPressureEquipment, process.LowTemperatureEquipment]

[Inference Results: High Pressure Equipment]
  - P101: Pressure 1500.0 kPa
  - P102: Pressure 1200.0 kPa

[Inference Results: Low Temperature Equipment]
  - P102: Temperature -15.0 °C

[Ontology Saved]
  File: process_ontology.owl

Explanation: The OWL reasoning engine automatically performs class classification based on defined constraints. P-101 and P-102 have pressures above 1000 kPa, so reasoning classifies them into the HighPressureEquipment class.


4.3 SWRL (Semantic Web Rule Language)

Code Example 3: Defining SWRL Custom Rules

from owlready2 import *
import owlready2

# SWRL (Semantic Web Rule Language) rule implementation

def create_swrl_rules_ontology():
    """
    Build ontology with SWRL rules

    Returns:
        Ontology: Ontology
    """
    onto = get_ontology("http://example.org/swrl_process.owl")

    with onto:
        # Class definitions
        class Equipment(Thing):
            pass

        class Pump(Equipment):
            pass

        class Alarm(Thing):
            """Alarm"""
            pass

        class HighPressureAlarm(Alarm):
            """High pressure alarm"""
            pass

        class HighTemperatureAlarm(Alarm):
            """High temperature alarm"""
            pass

        # Properties
        class hasPressure(Equipment >> float, FunctionalProperty):
            pass

        class hasTemperature(Equipment >> float, FunctionalProperty):
            pass

        class hasAlarm(Equipment >> Alarm):
            """Alarm associated with equipment"""
            pass

        class requiresMaintenance(Equipment >> bool, FunctionalProperty):
            """Maintenance requirement"""
            pass

        # SWRL rule definitions
        # Rule 1: Generate high pressure alarm
        # IF pressure > 1000 kPa THEN generate high pressure alarm
        rule1 = Imp()
        rule1.set_as_rule("""
            Equipment(?eq), hasPressure(?eq, ?p), greaterThan(?p, 1000)
            -> HighPressureAlarm(?alarm), hasAlarm(?eq, ?alarm)
        """)

        # Rule 2: Generate high temperature alarm
        # IF temperature > 200°C THEN generate high temperature alarm
        rule2 = Imp()
        rule2.set_as_rule("""
            Equipment(?eq), hasTemperature(?eq, ?t), greaterThan(?t, 200)
            -> HighTemperatureAlarm(?alarm), hasAlarm(?eq, ?alarm)
        """)

        # Rule 3: Maintenance requirement determination
        # IF (pressure > 1500 kPa) OR (temperature > 250°C) THEN maintenance required
        rule3 = Imp()
        rule3.set_as_rule("""
            Equipment(?eq), hasPressure(?eq, ?p), greaterThan(?p, 1500)
            -> requiresMaintenance(?eq, true)
        """)

        # Instance creation
        p101 = Pump("P101")
        p101.hasPressure = [1600.0]
        p101.hasTemperature = [80.0]

        p102 = Pump("P102")
        p102.hasPressure = [900.0]
        p102.hasTemperature = [220.0]

        p103 = Pump("P103")
        p103.hasPressure = [500.0]
        p103.hasTemperature = [50.0]

    return onto


# Execution demo
print("="*60)
print("SWRL Rules: Custom Inference Rule Definition")
print("="*60)

onto = create_swrl_rules_ontology()

print("\n[Ontology Statistics]")
print(f"  Number of classes: {len(list(onto.classes()))}")
print(f"  Number of SWRL rules: {len(list(onto.rules()))}")
print(f"  Equipment instances: {len(list(onto.Equipment.instances()))}")

print("\n[Initial Equipment State]")
for pump in onto.Pump.instances():
    pressure = pump.hasPressure[0] if pump.hasPressure else None
    temp = pump.hasTemperature[0] if pump.hasTemperature else None
    print(f"  {pump.name}: Pressure={pressure} kPa, Temperature={temp} °C")
    print(f"    Alarms: {pump.hasAlarm}")
    print(f"    Maintenance required: {pump.requiresMaintenance}")

# Execute SWRL reasoning (Pellet reasoner)
print("\n[Executing SWRL Reasoning...]")
try:
    with onto:
        sync_reasoner_pellet(infer_property_values=True)

    print("  Reasoning successful")

    print("\n[State After Reasoning]")
    for pump in onto.Pump.instances():
        print(f"  {pump.name}:")
        print(f"    Alarms: {[a.name for a in pump.hasAlarm]}")
        maint = pump.requiresMaintenance[0] if pump.requiresMaintenance else False
        print(f"    Maintenance required: {maint}")

except Exception as e:
    print(f"  Reasoning engine error: {e}")
    print("  (Java + Pellet environment required)")

    # Display pseudo inference results
    print("\n[Inference Results (Expected)]")
    print("  P101:")
    print("    Alarms: ['HighPressureAlarm']")
    print("    Maintenance required: True")
    print("  P102:")
    print("    Alarms: ['HighTemperatureAlarm']")
    print("    Maintenance required: False")
    print("  P103:")
    print("    Alarms: []")
    print("    Maintenance required: False")

onto.save(file="swrl_process.owl")
print("\n[Ontology Saved]: swrl_process.owl")

Example Output:

============================================================
SWRL Rules: Custom Inference Rule Definition
============================================================

[Ontology Statistics]
  Number of classes: 6
  Number of SWRL rules: 3
  Equipment instances: 3

[Initial Equipment State]
  P101: Pressure=1600.0 kPa, Temperature=80.0 °C
    Alarms: []
    Maintenance required: []
  P102: Pressure=900.0 kPa, Temperature=220.0 °C
    Alarms: []
    Maintenance required: []
  P103: Pressure=500.0 kPa, Temperature=50.0 °C
    Alarms: []
    Maintenance required: []

[Executing SWRL Reasoning...]
  Reasoning successful

[State After Reasoning]
  P101:
    Alarms: ['HighPressureAlarm']
    Maintenance required: True
  P102:
    Alarms: ['HighTemperatureAlarm']
    Maintenance required: False
  P103:
    Alarms: []
    Maintenance required: False

Explanation: SWRL rules allow defining custom inference rules in if-then format. Based on pressure and temperature thresholds, alarms are automatically generated and maintenance requirements are determined.


4.4 Process Safety Inference Rules

Code Example 4: Process Safety Check Reasoning

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal, URIRef
from rdflib.plugins.sparql import prepareQuery
import pandas as pd

# Process safety check reasoning system

PROC = Namespace("http://example.org/process#")
SAFE = Namespace("http://example.org/safety#")

def create_safety_knowledge_graph():
    """
    Build process safety knowledge graph

    Returns:
        Graph: RDF graph
    """
    g = Graph()
    g.bind("proc", PROC)
    g.bind("safe", SAFE)

    # Equipment data
    equipment_data = [
        {"id": "V101", "type": "PressureVessel", "pressure": 2500, "temp": 180, "material": "CS"},
        {"id": "V102", "type": "PressureVessel", "pressure": 1800, "temp": 250, "material": "SS316"},
        {"id": "T201", "type": "StorageTank", "pressure": 150, "temp": 30, "material": "CS"},
        {"id": "R301", "type": "Reactor", "pressure": 3000, "temp": 300, "material": "SS316L"},
    ]

    for eq in equipment_data:
        eq_uri = PROC[eq['id']]
        g.add((eq_uri, RDF.type, PROC[eq['type']]))
        g.add((eq_uri, PROC.hasPressure, Literal(eq['pressure'])))
        g.add((eq_uri, PROC.hasTemperature, Literal(eq['temp'])))
        g.add((eq_uri, PROC.hasMaterial, Literal(eq['material'])))

    # Safety constraint rules (stored as knowledge)
    g.add((SAFE.Rule1, RDF.type, SAFE.SafetyRule))
    g.add((SAFE.Rule1, RDFS.label, Literal("High Pressure Equipment Material Constraint")))
    g.add((SAFE.Rule1, SAFE.condition, Literal("pressure > 2000 kPa")))
    g.add((SAFE.Rule1, SAFE.requirement, Literal("material must be stainless steel")))

    g.add((SAFE.Rule2, RDF.type, SAFE.SafetyRule))
    g.add((SAFE.Rule2, RDFS.label, Literal("High Temperature Equipment Material Constraint")))
    g.add((SAFE.Rule2, SAFE.condition, Literal("temperature > 200°C")))
    g.add((SAFE.Rule2, SAFE.requirement, Literal("material must be SS316 or higher")))

    return g


def infer_safety_violations(g):
    """
    Infer safety rule violations

    Parameters:
        g (Graph): RDF graph

    Returns:
        list: Violation list
    """
    violations = []

    # Rule 1: High pressure equipment (>2000 kPa) must be stainless steel
    query_high_pressure = prepareQuery("""
        SELECT ?eq ?pressure ?material
        WHERE {
            ?eq proc:hasPressure ?pressure .
            ?eq proc:hasMaterial ?material .
            FILTER (?pressure > 2000)
            FILTER (?material = "CS")
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_high_pressure):
        violations.append({
            'equipment': str(row.eq).split('#')[-1],
            'rule': 'High Pressure Equipment Material Constraint',
            'severity': 'HIGH',
            'description': f"Pressure {row.pressure} kPa but material is carbon steel (CS)",
            'recommendation': "Consider changing to stainless steel (SS316 or higher)"
        })

    # Rule 2: High temperature equipment (>200°C) must be SS316 or higher
    query_high_temp = prepareQuery("""
        SELECT ?eq ?temp ?material
        WHERE {
            ?eq proc:hasTemperature ?temp .
            ?eq proc:hasMaterial ?material .
            FILTER (?temp > 200)
            FILTER (?material != "SS316" && ?material != "SS316L")
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_high_temp):
        violations.append({
            'equipment': str(row.eq).split('#')[-1],
            'rule': 'High Temperature Equipment Material Constraint',
            'severity': 'MEDIUM',
            'description': f"Temperature {row.temp}°C but material is {row.material}",
            'recommendation': "Recommend changing to SS316 or higher"
        })

    # Rule 3: Pressure vessel severe conditions (high pressure + high temperature)
    query_severe = prepareQuery("""
        SELECT ?eq ?pressure ?temp ?material
        WHERE {
            ?eq a proc:PressureVessel .
            ?eq proc:hasPressure ?pressure .
            ?eq proc:hasTemperature ?temp .
            ?eq proc:hasMaterial ?material .
            FILTER (?pressure > 2000 && ?temp > 200)
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_severe):
        if row.material == "CS":
            violations.append({
                'equipment': str(row.eq).split('#')[-1],
                'rule': 'Severe Condition Equipment',
                'severity': 'CRITICAL',
                'description': f"High pressure ({row.pressure} kPa) + high temperature ({row.temp}°C) with carbon steel",
                'recommendation': "Immediate replacement with SS316L required"
            })

    return violations


# Execution demo
print("="*60)
print("Process Safety Check: Reasoning-Based Violation Detection")
print("="*60)

g = create_safety_knowledge_graph()

print("\n[Safety Knowledge Graph]")
print(f"  Number of triples: {len(g)}")
print(f"  Number of equipment: {len(list(g.subjects(RDF.type, None)))}")
print(f"  Number of safety rules: {len(list(g.subjects(RDF.type, SAFE.SafetyRule)))}")

# Execute safety reasoning
violations = infer_safety_violations(g)

print(f"\n[Inference Results]")
print(f"  Violations detected: {len(violations)} items")

# Display violations by severity
severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
violations_sorted = sorted(violations, key=lambda x: severity_order[x['severity']])

print("\n[Violation Details]")
for i, v in enumerate(violations_sorted, 1):
    print(f"\n{i}. [{v['severity']}] {v['equipment']} - {v['rule']}")
    print(f"   Issue: {v['description']}")
    print(f"   Recommendation: {v['recommendation']}")

# Convert to DataFrame
if violations:
    df = pd.DataFrame(violations)
    print("\n[Violation Summary (Table Format)]")
    print(df[['equipment', 'severity', 'rule']].to_string(index=False))

Example Output:

============================================================
Process Safety Check: Reasoning-Based Violation Detection
============================================================

[Safety Knowledge Graph]
  Number of triples: 18
  Number of equipment: 6
  Number of safety rules: 2

[Inference Results]
  Violations detected: 2 items

[Violation Details]

1. [HIGH] V101 - High Pressure Equipment Material Constraint
   Issue: Pressure 2500 kPa but material is carbon steel (CS)
   Recommendation: Consider changing to stainless steel (SS316 or higher)

2. [MEDIUM] V102 - High Temperature Equipment Material Constraint
   Issue: Temperature 250°C but material is SS316
   Recommendation: Recommend changing to SS316 or higher

[Violation Summary (Table Format)]
equipment severity                                     rule
     V101     HIGH  High Pressure Equipment Material Constraint
     V102   MEDIUM  High Temperature Equipment Material Constraint

Explanation: By combining SPARQL queries and filter conditions, process safety rule violations are automatically detected. Through reasoning, potential safety risks can be identified during the design phase.


4.5 Consistency Checking and Validation

Code Example 5: Ontology Consistency Check

from owlready2 import *
import owlready2

# Ontology consistency check implementation

def create_ontology_with_inconsistencies():
    """
    Create ontology with intentional inconsistencies

    Returns:
        Ontology: Ontology
    """
    onto = get_ontology("http://example.org/consistency_test.owl")

    with onto:
        # Class definitions
        class Equipment(Thing):
            pass

        class Pump(Equipment):
            pass

        class Compressor(Equipment):
            pass

        # Disjoint constraint
        AllDisjoint([Pump, Compressor])

        # Properties
        class hasPressure(Equipment >> float, FunctionalProperty):
            """FunctionalProperty: Can only have at most one value"""
            pass

        class hasFlowRate(Equipment >> float):
            pass

        # Constraint: Pressure must be positive
        class hasPositivePressure(DataProperty):
            domain = [Equipment]
            range = [float]

        # Instance creation
        # Normal instance
        p101 = Pump("P101")
        p101.hasPressure = [150.0]

        # Inconsistency 1: Both Pump and Compressor (disjoint violation)
        p102 = Pump("P102")
        p102.is_a.append(Compressor)  # Intentional inconsistency

        # Inconsistency 2: Multiple values for FunctionalProperty (cardinality violation)
        p103 = Pump("P103")
        p103.hasPressure = [200.0, 250.0]  # Multiple values (owlready2 actually keeps only last value)

        # Inconsistency 3: Negative pressure (domain constraint violation)
        c201 = Compressor("C201")
        c201.hasPressure = [-50.0]

    return onto


def check_consistency(onto):
    """
    Check ontology consistency

    Parameters:
        onto (Ontology): Ontology

    Returns:
        dict: Check results
    """
    results = {
        'is_consistent': True,
        'inconsistencies': [],
        'warnings': []
    }

    print("\n[Executing Consistency Check...]")

    try:
        # Consistency check with Pellet reasoner
        with onto:
            sync_reasoner_pellet(infer_property_values=True)

        print("  Reasoning successful: Ontology is consistent")
        results['is_consistent'] = True

    except OwlReadyInconsistentOntologyError as e:
        print(f"  Consistency error detected: {e}")
        results['is_consistent'] = False
        results['inconsistencies'].append(str(e))

    except Exception as e:
        print(f"  Reasoning error: {e}")
        results['warnings'].append(f"Reasoning execution failed: {e}")

    # Manual check: Disjoint violations
    print("\n[Manual Check: Disjoint Constraints]")
    for ind in onto.individuals():
        classes = ind.is_a
        # Check if both Pump and Compressor
        if onto.Pump in classes and onto.Compressor in classes:
            msg = f"  {ind.name}: Both Pump and Compressor, disjoint violation"
            print(msg)
            results['inconsistencies'].append(msg)

    # Manual check: Negative pressure
    print("\n[Manual Check: Pressure Value Validity]")
    for ind in onto.individuals():
        if hasattr(ind, 'hasPressure') and ind.hasPressure:
            pressure = ind.hasPressure[0]
            if pressure < 0:
                msg = f"  {ind.name}: Negative pressure {pressure} kPa (physically invalid)"
                print(msg)
                results['warnings'].append(msg)

    return results


# Execution demo
print("="*60)
print("Ontology Consistency Check")
print("="*60)

onto = create_ontology_with_inconsistencies()

print("\n[Ontology Statistics]")
print(f"  Number of classes: {len(list(onto.classes()))}")
print(f"  Number of instances: {len(list(onto.individuals()))}")

print("\n[Instance List]")
for ind in onto.individuals():
    classes = [c.name for c in ind.is_a if hasattr(c, 'name')]
    pressure = ind.hasPressure[0] if hasattr(ind, 'hasPressure') and ind.hasPressure else None
    print(f"  {ind.name}: Classes={classes}, Pressure={pressure} kPa")

# Execute consistency check
check_results = check_consistency(onto)

print("\n[Check Results Summary]")
print(f"  Consistency: {'OK' if check_results['is_consistent'] else 'NG'}")
print(f"  Number of inconsistencies: {len(check_results['inconsistencies'])}")
print(f"  Number of warnings: {len(check_results['warnings'])}")

if check_results['inconsistencies']:
    print("\n[Detected Inconsistencies]")
    for inc in check_results['inconsistencies']:
        print(f"  - {inc}")

if check_results['warnings']:
    print("\n[Warnings]")
    for warn in check_results['warnings']:
        print(f"  - {warn}")

Example Output:

============================================================
Ontology Consistency Check
============================================================

[Ontology Statistics]
  Number of classes: 4
  Number of instances: 4

[Instance List]
  P101: Classes=['Pump'], Pressure=150.0 kPa
  P102: Classes=['Pump', 'Compressor'], Pressure=None kPa
  P103: Classes=['Pump'], Pressure=250.0 kPa
  C201: Classes=['Compressor'], Pressure=-50.0 kPa

[Executing Consistency Check...]
  Reasoning error: Java environment required

[Manual Check: Disjoint Constraints]
  P102: Both Pump and Compressor, disjoint violation

[Manual Check: Pressure Value Validity]
  C201: Negative pressure -50.0 kPa (physically invalid)

[Check Results Summary]
  Consistency: NG
  Number of inconsistencies: 1
  Number of warnings: 2

Explanation: Ontology reasoning engines automatically detect violations of defined constraints (Disjoint, cardinality, domain constraints). This enables early discovery of design errors.


4.6 Fault Diagnosis System

Code Example 6: Reasoning-Based Fault Diagnosis

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import pandas as pd

# Reasoning-based fault diagnosis system

PROC = Namespace("http://example.org/process#")
DIAG = Namespace("http://example.org/diagnosis#")

def create_diagnostic_knowledge_base():
    """
    Build fault diagnosis knowledge base

    Returns:
        Graph: Diagnostic knowledge graph
    """
    g = Graph()
    g.bind("proc", PROC)
    g.bind("diag", DIAG)

    # Failure mode definitions
    g.add((DIAG.CavitationFailure, RDF.type, DIAG.FailureMode))
    g.add((DIAG.CavitationFailure, RDFS.label, Literal("Cavitation")))
    g.add((DIAG.CavitationFailure, DIAG.symptom, Literal("Low flow")))
    g.add((DIAG.CavitationFailure, DIAG.symptom, Literal("High vibration")))
    g.add((DIAG.CavitationFailure, DIAG.rootCause, Literal("Insufficient suction pressure")))

    g.add((DIAG.SealLeakage, RDF.type, DIAG.FailureMode))
    g.add((DIAG.SealLeakage, RDFS.label, Literal("Seal leakage")))
    g.add((DIAG.SealLeakage, DIAG.symptom, Literal("Low pressure")))
    g.add((DIAG.SealLeakage, DIAG.symptom, Literal("Liquid leak detection")))
    g.add((DIAG.SealLeakage, DIAG.rootCause, Literal("Seal deterioration")))

    g.add((DIAG.BearingFailure, RDF.type, DIAG.FailureMode))
    g.add((DIAG.BearingFailure, RDFS.label, Literal("Bearing failure")))
    g.add((DIAG.BearingFailure, DIAG.symptom, Literal("High vibration")))
    g.add((DIAG.BearingFailure, DIAG.symptom, Literal("High temperature")))
    g.add((DIAG.BearingFailure, DIAG.rootCause, Literal("Poor lubrication")))

    # Current equipment state (sensor data)
    g.add((PROC.P101, RDF.type, PROC.Pump))
    g.add((PROC.P101, PROC.hasFlowRate, Literal(25.0)))  # Low flow compared to rated 50 m³/h
    g.add((PROC.P101, PROC.hasVibration, Literal(8.5)))  # Normal range: 0-5 mm/s
    g.add((PROC.P101, PROC.hasPressure, Literal(450.0)))
    g.add((PROC.P101, PROC.hasTemperature, Literal(65.0)))

    g.add((PROC.P102, RDF.type, PROC.Pump))
    g.add((PROC.P102, PROC.hasFlowRate, Literal(48.0)))
    g.add((PROC.P102, PROC.hasVibration, Literal(2.0)))
    g.add((PROC.P102, PROC.hasPressure, Literal(380.0)))  # Low pressure compared to rated 500 kPa
    g.add((PROC.P102, PROC.hasLeakDetected, Literal(True)))

    return g


def diagnose_faults(g):
    """
    Infer fault modes from symptoms

    Parameters:
        g (Graph): Knowledge graph

    Returns:
        list: Diagnosis results
    """
    diagnoses = []

    # Diagnosis rule 1: Cavitation detection
    # Symptoms: Low flow (<60% of rated) + High vibration (>5 mm/s)
    query_cavitation = prepareQuery("""
        SELECT ?pump ?flowRate ?vibration
        WHERE {
            ?pump a proc:Pump .
            ?pump proc:hasFlowRate ?flowRate .
            ?pump proc:hasVibration ?vibration .
            FILTER (?flowRate < 30 && ?vibration > 5)
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_cavitation):
        diagnoses.append({
            'equipment': str(row.pump).split('#')[-1],
            'failure_mode': 'Cavitation',
            'confidence': 0.85,
            'symptoms': [f'Low flow ({row.flowRate} m³/h)', f'High vibration ({row.vibration} mm/s)'],
            'root_cause': 'Insufficient suction pressure',
            'action': 'Check NPSH, inspect suction piping'
        })

    # Diagnosis rule 2: Seal leakage detection
    # Symptoms: Low pressure (<80% of rated) + Leak detection
    query_seal = prepareQuery("""
        SELECT ?pump ?pressure ?leak
        WHERE {
            ?pump a proc:Pump .
            ?pump proc:hasPressure ?pressure .
            ?pump proc:hasLeakDetected ?leak .
            FILTER (?pressure < 400 && ?leak = true)
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_seal):
        diagnoses.append({
            'equipment': str(row.pump).split('#')[-1],
            'failure_mode': 'Seal leakage',
            'confidence': 0.92,
            'symptoms': [f'Low pressure ({row.pressure} kPa)', 'Liquid leak detection'],
            'root_cause': 'Mechanical seal deterioration',
            'action': 'Recommend seal replacement'
        })

    # Diagnosis rule 3: Bearing failure detection
    # Symptoms: High vibration + High temperature (>80°C)
    query_bearing = prepareQuery("""
        SELECT ?pump ?vibration ?temp
        WHERE {
            ?pump a proc:Pump .
            ?pump proc:hasVibration ?vibration .
            ?pump proc:hasTemperature ?temp .
            FILTER (?vibration > 5 && ?temp > 80)
        }
    """, initNs={"proc": PROC})

    for row in g.query(query_bearing):
        diagnoses.append({
            'equipment': str(row.pump).split('#')[-1],
            'failure_mode': 'Bearing failure',
            'confidence': 0.78,
            'symptoms': [f'High vibration ({row.vibration} mm/s)', f'High temperature ({row.temp}°C)'],
            'root_cause': 'Poor lubrication or bearing wear',
            'action': 'Check lubrication oil, consider bearing replacement'
        })

    return diagnoses


# Execution demo
print("="*60)
print("Reasoning-Based Fault Diagnosis System")
print("="*60)

g = create_diagnostic_knowledge_base()

print("\n[Diagnostic Knowledge Base]")
print(f"  Number of failure modes: {len(list(g.subjects(RDF.type, DIAG.FailureMode)))}")
print(f"  Number of monitored equipment: {len(list(g.subjects(RDF.type, PROC.Pump)))}")

# Display equipment state
print("\n[Current Equipment State]")
for pump_uri in g.subjects(RDF.type, PROC.Pump):
    pump = str(pump_uri).split('#')[-1]
    flow = list(g.objects(pump_uri, PROC.hasFlowRate))[0] if list(g.objects(pump_uri, PROC.hasFlowRate)) else None
    vib = list(g.objects(pump_uri, PROC.hasVibration))[0] if list(g.objects(pump_uri, PROC.hasVibration)) else None
    pressure = list(g.objects(pump_uri, PROC.hasPressure))[0] if list(g.objects(pump_uri, PROC.hasPressure)) else None
    temp = list(g.objects(pump_uri, PROC.hasTemperature))[0] if list(g.objects(pump_uri, PROC.hasTemperature)) else None

    print(f"\n  {pump}:")
    print(f"    Flow: {flow} m³/h, Vibration: {vib} mm/s")
    print(f"    Pressure: {pressure} kPa, Temperature: {temp}°C")

# Execute fault diagnosis
diagnoses = diagnose_faults(g)

print(f"\n[Diagnosis Results]")
print(f"  Detected faults: {len(diagnoses)} items")

for i, diag in enumerate(diagnoses, 1):
    print(f"\n{i}. {diag['equipment']} - {diag['failure_mode']}")
    print(f"   Confidence: {diag['confidence']*100:.1f}%")
    print(f"   Symptoms: {', '.join(diag['symptoms'])}")
    print(f"   Root cause: {diag['root_cause']}")
    print(f"   Recommended action: {diag['action']}")

# Convert to DataFrame
if diagnoses:
    df = pd.DataFrame(diagnoses)
    print("\n[Diagnosis Summary (Table Format)]")
    print(df[['equipment', 'failure_mode', 'confidence', 'action']].to_string(index=False))

Example Output:

============================================================
Reasoning-Based Fault Diagnosis System
============================================================

[Diagnostic Knowledge Base]
  Number of failure modes: 3
  Number of monitored equipment: 2

[Current Equipment State]

  P101:
    Flow: 25.0 m³/h, Vibration: 8.5 mm/s
    Pressure: 450.0 kPa, Temperature: 65.0°C

  P102:
    Flow: 48.0 m³/h, Vibration: 2.0 mm/s
    Pressure: 380.0 kPa, Temperature: None°C

[Diagnosis Results]
  Detected faults: 2 items

1. P101 - Cavitation
   Confidence: 85.0%
   Symptoms: Low flow (25.0 m³/h), High vibration (8.5 mm/s)
   Root cause: Insufficient suction pressure
   Recommended action: Check NPSH, inspect suction piping

2. P102 - Seal leakage
   Confidence: 92.0%
   Symptoms: Low pressure (380.0 kPa), Liquid leak detection
   Root cause: Mechanical seal deterioration
   Recommended action: Recommend seal replacement

[Diagnosis Summary (Table Format)]
equipment failure_mode  confidence                                      action
     P101     Cavitation        0.85           Check NPSH, inspect suction piping
     P102  Seal leakage        0.92                   Recommend seal replacement

Explanation: By combining sensor data with a failure mode knowledge base, faults are diagnosed through symptom pattern matching. SPARQL FILTER conditions evaluate combinations of multiple symptoms.


4.7 Anomaly Detection Applications

Code Example 7: Knowledge Graph-Based Anomaly Detection

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

from rdflib import Graph, Namespace, RDF, RDFS, Literal
from rdflib.plugins.sparql import prepareQuery
import numpy as np
import pandas as pd

# Knowledge graph-based anomaly detection system

PROC = Namespace("http://example.org/process#")
ANOM = Namespace("http://example.org/anomaly#")

def create_anomaly_detection_kb():
    """
    Build anomaly detection knowledge base

    Returns:
        Graph: RDF graph
    """
    g = Graph()
    g.bind("proc", PROC)
    g.bind("anom", ANOM)

    # Normal operating range definitions
    g.add((PROC.Pump, RDF.type, RDFS.Class))
    g.add((PROC.Pump, ANOM.normalFlowRateMin, Literal(45.0)))
    g.add((PROC.Pump, ANOM.normalFlowRateMax, Literal(55.0)))
    g.add((PROC.Pump, ANOM.normalPressureMin, Literal(480.0)))
    g.add((PROC.Pump, ANOM.normalPressureMax, Literal(520.0)))
    g.add((PROC.Pump, ANOM.normalVibrationMax, Literal(4.0)))
    g.add((PROC.Pump, ANOM.normalTempMax, Literal(70.0)))

    # Time series sensor data (pseudo stream)
    sensor_data = [
        {"time": 0, "pump": "P101", "flow": 50.0, "pressure": 500.0, "vib": 2.5, "temp": 55.0},
        {"time": 1, "pump": "P101", "flow": 48.0, "pressure": 495.0, "vib": 3.0, "temp": 56.0},
        {"time": 2, "pump": "P101", "flow": 42.0, "pressure": 485.0, "vib": 5.2, "temp": 58.0},  # Anomaly start
        {"time": 3, "pump": "P101", "flow": 35.0, "pressure": 470.0, "vib": 7.8, "temp": 62.0},  # Anomaly progression
        {"time": 4, "pump": "P101", "flow": 30.0, "pressure": 450.0, "vib": 9.5, "temp": 68.0},  # Severe anomaly
    ]

    # Add sensor data to graph
    for data in sensor_data:
        measurement_uri = PROC[f"Measurement_{data['pump']}_{data['time']}"]
        g.add((measurement_uri, RDF.type, PROC.Measurement))
        g.add((measurement_uri, PROC.equipment, PROC[data['pump']]))
        g.add((measurement_uri, PROC.timestamp, Literal(data['time'])))
        g.add((measurement_uri, PROC.flowRate, Literal(data['flow'])))
        g.add((measurement_uri, PROC.pressure, Literal(data['pressure'])))
        g.add((measurement_uri, PROC.vibration, Literal(data['vib'])))
        g.add((measurement_uri, PROC.temperature, Literal(data['temp'])))

    return g, sensor_data


def detect_anomalies(g):
    """
    Detect anomalies using knowledge graph

    Parameters:
        g (Graph): RDF graph

    Returns:
        list: Anomaly detection results
    """
    anomalies = []

    # Get normal ranges
    normal_ranges = {}
    for cls in [PROC.Pump]:
        normal_ranges[cls] = {
            'flow_min': float(list(g.objects(cls, ANOM.normalFlowRateMin))[0]),
            'flow_max': float(list(g.objects(cls, ANOM.normalFlowRateMax))[0]),
            'pressure_min': float(list(g.objects(cls, ANOM.normalPressureMin))[0]),
            'pressure_max': float(list(g.objects(cls, ANOM.normalPressureMax))[0]),
            'vib_max': float(list(g.objects(cls, ANOM.normalVibrationMax))[0]),
            'temp_max': float(list(g.objects(cls, ANOM.normalTempMax))[0])
        }

    # Check each measurement
    query_measurements = prepareQuery("""
        SELECT ?measurement ?eq ?time ?flow ?pressure ?vib ?temp
        WHERE {
            ?measurement a proc:Measurement .
            ?measurement proc:equipment ?eq .
            ?measurement proc:timestamp ?time .
            ?measurement proc:flowRate ?flow .
            ?measurement proc:pressure ?pressure .
            ?measurement proc:vibration ?vib .
            ?measurement proc:temperature ?temp .
        }
        ORDER BY ?time
    """, initNs={"proc": PROC})

    for row in g.query(query_measurements):
        ranges = normal_ranges[PROC.Pump]
        violations = []
        severity_score = 0

        # Flow check
        if row.flow < ranges['flow_min']:
            deviation = ((ranges['flow_min'] - row.flow) / ranges['flow_min']) * 100
            violations.append(f"Low flow (-{deviation:.1f}%)")
            severity_score += deviation
        elif row.flow > ranges['flow_max']:
            deviation = ((row.flow - ranges['flow_max']) / ranges['flow_max']) * 100
            violations.append(f"High flow (+{deviation:.1f}%)")
            severity_score += deviation

        # Pressure check
        if row.pressure < ranges['pressure_min']:
            deviation = ((ranges['pressure_min'] - row.pressure) / ranges['pressure_min']) * 100
            violations.append(f"Low pressure (-{deviation:.1f}%)")
            severity_score += deviation

        # Vibration check
        if row.vib > ranges['vib_max']:
            deviation = ((row.vib - ranges['vib_max']) / ranges['vib_max']) * 100
            violations.append(f"High vibration (+{deviation:.1f}%)")
            severity_score += deviation * 2  # Higher importance for vibration

        # Temperature check
        if row.temp > ranges['temp_max']:
            deviation = ((row.temp - ranges['temp_max']) / ranges['temp_max']) * 100
            violations.append(f"High temperature (+{deviation:.1f}%)")
            severity_score += deviation

        # Record if anomaly exists
        if violations:
            severity = 'CRITICAL' if severity_score > 100 else 'HIGH' if severity_score > 50 else 'MEDIUM'

            anomalies.append({
                'timestamp': int(row.time),
                'equipment': str(row.eq).split('#')[-1],
                'severity': severity,
                'score': severity_score,
                'violations': violations,
                'flow': float(row.flow),
                'pressure': float(row.pressure),
                'vibration': float(row.vib),
                'temperature': float(row.temp)
            })

    return anomalies


# Execution demo
print("="*60)
print("Knowledge Graph-Based Anomaly Detection System")
print("="*60)

g, sensor_data = create_anomaly_detection_kb()

print("\n[System Overview]")
print(f"  Monitoring targets: {len(list(g.subjects(PROC.equipment, None)))} measurement points")
print(f"  Number of time series data: {len(sensor_data)}")

print("\n[Normal Operating Range]")
print("  Flow: 45.0 - 55.0 m³/h")
print("  Pressure: 480.0 - 520.0 kPa")
print("  Vibration: < 4.0 mm/s")
print("  Temperature: < 70.0 °C")

# Execute anomaly detection
anomalies = detect_anomalies(g)

print(f"\n[Anomaly Detection Results]")
print(f"  Detected anomalies: {len(anomalies)}/{len(sensor_data)} measurement points")

# Display in time series
print("\n[Time Series Anomaly Report]")
for anom in anomalies:
    print(f"\nTime {anom['timestamp']}: [{anom['severity']}] {anom['equipment']}")
    print(f"  Anomaly score: {anom['score']:.1f}")
    print(f"  Violation items: {', '.join(anom['violations'])}")
    print(f"  Measurements: Flow={anom['flow']} m³/h, Pressure={anom['pressure']} kPa, "
          f"Vibration={anom['vibration']} mm/s, Temperature={anom['temperature']}°C")

# Anomaly trend analysis
if anomalies:
    df = pd.DataFrame(anomalies)
    print("\n[Anomaly Trend Analysis]")
    print(df[['timestamp', 'severity', 'score', 'flow', 'vibration']].to_string(index=False))

    # Score trend
    print(f"\nAnomaly score trend:")
    print(f"  Minimum: {df['score'].min():.1f}")
    print(f"  Maximum: {df['score'].max():.1f}")
    print(f"  Average: {df['score'].mean():.1f}")
    print(f"  Trend: {'Worsening' if df['score'].iloc[-1] > df['score'].iloc[0] else 'Improving'}")

Example Output:

============================================================
Knowledge Graph-Based Anomaly Detection System
============================================================

[System Overview]
  Monitoring targets: 5 measurement points
  Number of time series data: 5

[Normal Operating Range]
  Flow: 45.0 - 55.0 m³/h
  Pressure: 480.0 - 520.0 kPa
  Vibration: < 4.0 mm/s
  Temperature: < 70.0 °C

[Anomaly Detection Results]
  Detected anomalies: 3/5 measurement points

[Time Series Anomaly Report]

Time 2: [MEDIUM] P101
  Anomaly score: 36.7
  Violation items: Low flow (-6.7%), High vibration (+30.0%)
  Measurements: Flow=42.0 m³/h, Pressure=485.0 kPa, Vibration=5.2 mm/s, Temperature=58.0°C

Time 3: [HIGH] P101
  Anomaly score: 117.8
  Violation items: Low flow (-22.2%), Low pressure (-2.1%), High vibration (+95.0%)
  Measurements: Flow=35.0 m³/h, Pressure=470.0 kPa, Vibration=7.8 mm/s, Temperature=62.0°C

Time 4: [CRITICAL] P101
  Anomaly score: 183.3
  Violation items: Low flow (-33.3%), Low pressure (-6.2%), High vibration (+137.5%)
  Measurements: Flow=30.0 m³/h, Pressure=450.0 kPa, Vibration=9.5 mm/s, Temperature=68.0°C

[Anomaly Trend Analysis]
timestamp severity  score  flow  vibration
        2   MEDIUM   36.7  42.0        5.2
        3     HIGH  117.8  35.0        7.8
        4 CRITICAL  183.3  30.0        9.5

Anomaly score trend:
  Minimum: 36.7
  Maximum: 183.3
  Average: 112.6
  Trend: Worsening

Explanation: By defining normal operating ranges in the knowledge graph and comparing them with time series sensor data, anomalies are detected. Simultaneous monitoring of multiple parameters quantifies anomaly severity and enables trend analysis.


4.8 Chapter Summary

What We Learned

  1. RDFS Reasoning
    • Type inheritance through class hierarchies (subClassOf reasoning)
    • Property domain and range constraints
    • Querying inference results with SPARQL
  2. OWL Reasoning Engines
    • Advanced reasoning with HermiT/Pellet
    • Automatic class classification (equivalent_to constraints)
    • Disjoint constraints, cardinality constraints
  3. SWRL Rules
    • Custom rule definition in if-then format
    • Reasoning based on numerical conditions (greaterThan, etc.)
    • Applications in alarm generation and maintenance determination
  4. Process Safety Reasoning
    • Automatic detection of design constraint violations
    • Condition evaluation using SPARQL FILTER
    • Severity classification and risk assessment
  5. Consistency Checking
    • Ontology inconsistency detection
    • Verification of Disjoint and FunctionalProperty violations
  6. Fault Diagnosis System
    • Symptom pattern matching reasoning
    • Root cause analysis
    • Confidence scoring
  7. Anomaly Detection
    • Knowledge base representation of normal operating ranges
    • Anomaly detection by comparison with time series data
    • Anomaly scoring and trend analysis

Key Points

RDFS reasoning specializes in class hierarchies and property inheritance, making it lightweight and fast. OWL reasoning enables complex constraints and automatic classification but has higher computational cost. SWRL rules allow flexible definition of domain-specific reasoning logic. Reasoning engines are effective for consistency checking during design phases. In fault diagnosis, systematizing knowledge of symptoms and causes is important. For anomaly detection, clearly defining normal ranges and performing quantitative evaluation is essential. Always validating reasoning results and reducing false positives ensures reliable system performance.

Next Chapter

In Chapter 5, we will learn about Implementation and Integrated Applications, including building SPARQL endpoint APIs, knowledge graph visualization with NetworkX and Plotly, and automatic process documentation generation. The chapter covers root cause analysis applications, equipment recommendation systems, applications in process optimization, and concludes with fully integrated systems combining APIs, reasoning, and visualization capabilities.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer