This chapter focuses on practical applications of Practical Applications of Network Analysis. You will learn Analyze influence propagation, Build knowledge graphs, and Analyze biological networks.
Learning Objectives
By reading this chapter, you will be able to:
- β Analyze influence propagation and information diffusion in social networks
- β Build knowledge graphs and semantic networks
- β Analyze biological networks and identify pathways
- β Implement graph-based recommendation systems
- β Design and execute practical network analysis projects
5.1 Social Network Analysis
Influence Propagation Models
Influence Propagation is a field that studies how information and behaviors spread across social networks.
Linear Threshold Model
Each node has a threshold, and it becomes activated when the proportion of influenced neighboring nodes exceeds that threshold.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
def linear_threshold_model(G, seed_nodes, thresholds=None, iterations=20):
"""
Influence propagation simulation using Linear Threshold Model
Parameters:
-----------
G : NetworkX graph
Social network
seed_nodes : list
Initial influenced nodes (seeds)
thresholds : dict
Threshold for each node (random if None)
iterations : int
Maximum number of simulation iterations
Returns:
--------
history : list
Set of activated nodes at each iteration
"""
# Initialize thresholds
if thresholds is None:
thresholds = {node: np.random.uniform(0.3, 0.7) for node in G.nodes()}
# Initial state
active = set(seed_nodes)
history = [active.copy()]
for iteration in range(iterations):
new_active = set()
for node in G.nodes():
if node in active:
continue
# Calculate influence from neighbors
neighbors = list(G.neighbors(node))
if len(neighbors) == 0:
continue
active_neighbors = sum(1 for n in neighbors if n in active)
influence = active_neighbors / len(neighbors)
# Activate if threshold exceeded
if influence >= thresholds[node]:
new_active.add(node)
if len(new_active) == 0:
break
active.update(new_active)
history.append(active.copy())
return history
# Create sample network
np.random.seed(42)
G = nx.watts_strogatz_graph(50, 6, 0.3)
# Select seed nodes (high centrality nodes)
centrality = nx.degree_centrality(G)
seed_nodes = sorted(centrality, key=centrality.get, reverse=True)[:3]
# Run simulation
history = linear_threshold_model(G, seed_nodes)
print("=== Linear Threshold Model Simulation ===")
print(f"Number of seed nodes: {len(seed_nodes)}")
print(f"Total nodes: {G.number_of_nodes()}")
print(f"Influence spread iterations: {len(history)}")
print(f"\nActivated nodes at each iteration:")
for i, active_set in enumerate(history):
print(f" Iteration {i}: {len(active_set)} nodes ({len(active_set)/G.number_of_nodes()*100:.1f}%)")
# Visualization
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
pos = nx.spring_layout(G, seed=42)
snapshots = [0, 1, 2, 3, 4, len(history)-1]
for idx, (ax, snap_idx) in enumerate(zip(axes.flat, snapshots)):
if snap_idx >= len(history):
snap_idx = len(history) - 1
active_nodes = history[snap_idx]
node_colors = ['red' if n in active_nodes else 'lightblue' for n in G.nodes()]
nx.draw(G, pos, node_color=node_colors, node_size=100,
edge_color='gray', alpha=0.6, ax=ax)
ax.set_title(f'Iteration {snap_idx}\nActivated: {len(active_nodes)} nodes',
fontsize=12)
plt.tight_layout()
plt.show()
Independent Cascade Model
Activated nodes probabilistically attempt to activate each neighboring node.
def independent_cascade_model(G, seed_nodes, prob=0.1, iterations=20):
"""
Influence propagation simulation using Independent Cascade Model
Parameters:
-----------
G : NetworkX graph
Social network
seed_nodes : list
Initial influenced nodes
prob : float
Activation probability for each edge
iterations : int
Maximum number of iterations
Returns:
--------
history : list
Set of activated nodes at each iteration
"""
active = set(seed_nodes)
newly_active = set(seed_nodes)
history = [active.copy()]
for iteration in range(iterations):
next_active = set()
for node in newly_active:
for neighbor in G.neighbors(node):
if neighbor not in active:
# Probabilistic activation
if np.random.random() < prob:
next_active.add(neighbor)
if len(next_active) == 0:
break
active.update(next_active)
newly_active = next_active
history.append(active.copy())
return history
# Run simulation (average over multiple runs)
num_simulations = 100
all_spreads = []
for _ in range(num_simulations):
history = independent_cascade_model(G, seed_nodes, prob=0.15)
all_spreads.append(len(history[-1]))
print("\n=== Independent Cascade Model Simulation ===")
print(f"Number of simulations: {num_simulations}")
print(f"Influence spread statistics:")
print(f" Mean: {np.mean(all_spreads):.2f} nodes ({np.mean(all_spreads)/G.number_of_nodes()*100:.1f}%)")
print(f" Std dev: {np.std(all_spreads):.2f}")
print(f" Min: {np.min(all_spreads)} nodes")
print(f" Max: {np.max(all_spreads)} nodes")
# Visualize distribution
plt.figure(figsize=(10, 6))
plt.hist(all_spreads, bins=20, alpha=0.7, edgecolor='black')
plt.axvline(np.mean(all_spreads), color='red', linestyle='--',
linewidth=2, label=f'Mean: {np.mean(all_spreads):.2f}')
plt.xlabel('Number of influenced nodes', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.title('Independent Cascade Model: Distribution of Influence Spread', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Influencer Identification
For effective information diffusion, we identify nodes with high influence (influencers).
def identify_influencers(G, k=5, method='degree'):
"""
Identify influencers
Parameters:
-----------
G : NetworkX graph
Social network
k : int
Number of influencers to identify
method : str
Centrality metric to use ('degree', 'betweenness', 'closeness', 'pagerank')
Returns:
--------
influencers : list
List of top k nodes
scores : dict
Score for each node
"""
if method == 'degree':
scores = nx.degree_centrality(G)
elif method == 'betweenness':
scores = nx.betweenness_centrality(G)
elif method == 'closeness':
scores = nx.closeness_centrality(G)
elif method == 'pagerank':
scores = nx.pagerank(G)
else:
raise ValueError(f"Unknown method: {method}")
influencers = sorted(scores, key=scores.get, reverse=True)[:k]
return influencers, scores
# Experiment on real social network (Karate Club data)
G_karate = nx.karate_club_graph()
# Identify influencers using different methods
methods = ['degree', 'betweenness', 'closeness', 'pagerank']
k = 5
print("\n=== Comparison of Influencer Identification ===")
print(f"Network: Karate Club ({G_karate.number_of_nodes()} nodes)")
print(f"Identifying top {k} influencers:\n")
fig, axes = plt.subplots(2, 2, figsize=(14, 14))
pos = nx.spring_layout(G_karate, seed=42)
for ax, method in zip(axes.flat, methods):
influencers, scores = identify_influencers(G_karate, k=k, method=method)
print(f"{method.capitalize()}:")
for i, node in enumerate(influencers):
print(f" {i+1}. Node {node}: Score = {scores[node]:.4f}")
print()
# Visualization
node_colors = ['red' if n in influencers else 'lightblue' for n in G_karate.nodes()]
node_sizes = [scores[n] * 2000 for n in G_karate.nodes()]
nx.draw(G_karate, pos, node_color=node_colors, node_size=node_sizes,
edge_color='gray', alpha=0.6, with_labels=True, ax=ax)
ax.set_title(f'{method.capitalize()} Centrality\n(Red = Top {k} influencers)',
fontsize=12)
plt.tight_layout()
plt.show()
# Compare influence spread for each method
print("\n=== Effectiveness Comparison of Influencer Selection Methods ===")
results = {}
for method in methods:
influencers, _ = identify_influencers(G_karate, k=3, method=method)
# Average over 100 simulations
spreads = []
for _ in range(100):
history = independent_cascade_model(G_karate, influencers, prob=0.2)
spreads.append(len(history[-1]))
results[method] = {
'mean': np.mean(spreads),
'std': np.std(spreads)
}
print(f"{method.capitalize()}:")
print(f" Average influenced nodes: {results[method]['mean']:.2f} Β± {results[method]['std']:.2f}")
print(f" Coverage: {results[method]['mean']/G_karate.number_of_nodes()*100:.1f}%")
# Visualize results
plt.figure(figsize=(10, 6))
methods_list = list(results.keys())
means = [results[m]['mean'] for m in methods_list]
stds = [results[m]['std'] for m in methods_list]
plt.bar(methods_list, means, yerr=stds, alpha=0.7, capsize=5, edgecolor='black')
plt.xlabel('Centrality Metric', fontsize=12)
plt.ylabel('Average Influenced Nodes', fontsize=12)
plt.title('Effectiveness Comparison of Influencer Selection Methods', fontsize=14)
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()
5.2 Knowledge Graphs and Semantic Networks
Building Knowledge Graphs
Knowledge Graphs are networks that represent entities and their relationships.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
import networkx as nx
import matplotlib.pyplot as plt
class KnowledgeGraph:
"""Knowledge graph construction and management"""
def __init__(self):
self.G = nx.DiGraph()
def add_triple(self, subject, predicate, obj):
"""
Add a triple (subject-predicate-object)
Parameters:
-----------
subject : str
Subject (entity)
predicate : str
Predicate (relationship)
obj : str
Object (entity or value)
"""
self.G.add_edge(subject, obj, relation=predicate)
def query_relations(self, entity):
"""Get all relationships related to an entity"""
outgoing = [(entity, self.G[entity][neighbor]['relation'], neighbor)
for neighbor in self.G.successors(entity)]
incoming = [(source, self.G[source][entity]['relation'], entity)
for source in self.G.predecessors(entity)]
return {'outgoing': outgoing, 'incoming': incoming}
def find_path(self, start, end, max_length=3):
"""Search for paths between two entities"""
try:
paths = list(nx.all_simple_paths(self.G, start, end, cutoff=max_length))
return paths
except nx.NetworkXNoPath:
return []
def visualize(self, figsize=(12, 8)):
"""Visualize the knowledge graph"""
plt.figure(figsize=figsize)
pos = nx.spring_layout(self.G, k=2, seed=42)
# Draw nodes
nx.draw_networkx_nodes(self.G, pos, node_color='lightblue',
node_size=2000, alpha=0.9)
# Draw edges
nx.draw_networkx_edges(self.G, pos, edge_color='gray',
arrows=True, arrowsize=20, alpha=0.6,
connectionstyle='arc3,rad=0.1')
# Draw labels
nx.draw_networkx_labels(self.G, pos, font_size=10, font_weight='bold')
# Draw edge labels (relationships)
edge_labels = nx.get_edge_attributes(self.G, 'relation')
nx.draw_networkx_edge_labels(self.G, pos, edge_labels, font_size=8)
plt.title('Knowledge Graph', fontsize=14, fontweight='bold')
plt.axis('off')
plt.tight_layout()
plt.show()
# Build a knowledge graph about movies
kg = KnowledgeGraph()
# Add triples
triples = [
# Movies and genres
('The Matrix', 'genre', 'Sci-Fi'),
('The Matrix', 'genre', 'Action'),
('Inception', 'genre', 'Sci-Fi'),
('Inception', 'genre', 'Thriller'),
# Directors
('The Matrix', 'directed_by', 'Wachowski'),
('Inception', 'directed_by', 'Nolan'),
('Interstellar', 'directed_by', 'Nolan'),
# Actors
('The Matrix', 'starring', 'Keanu Reeves'),
('Inception', 'starring', 'Leonardo DiCaprio'),
('Interstellar', 'starring', 'Matthew McConaughey'),
# Release years
('The Matrix', 'released_in', '1999'),
('Inception', 'released_in', '2010'),
('Interstellar', 'released_in', '2014'),
]
for subject, predicate, obj in triples:
kg.add_triple(subject, predicate, obj)
print("=== Knowledge Graph Construction ===")
print(f"Number of entities: {kg.G.number_of_nodes()}")
print(f"Number of relationships: {kg.G.number_of_edges()}")
# Query entities
print("\n=== Relationships about 'Inception' ===")
relations = kg.query_relations('Inception')
print("Outgoing relations:")
for s, p, o in relations['outgoing']:
print(f" {s} --[{p}]--> {o}")
# Path search
print("\n=== Relationships between 'The Matrix' and 'Interstellar' ===")
paths = kg.find_path('The Matrix', 'Interstellar')
if paths:
print(f"Found {len(paths)} paths:")
for i, path in enumerate(paths):
print(f" Path {i+1}: {' -> '.join(path)}")
else:
print("No direct paths found")
# Visualization
kg.visualize()
Entity Relationship Extraction and Neo4j Basics
This is an example of extracting entities and relationships from actual text and storing them in a graph database.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
# - spacy>=3.6.0
import spacy
import networkx as nx
import matplotlib.pyplot as plt
# Simple entity relationship extraction (in practice, use NLP libraries like spaCy)
def extract_triples_simple(text):
"""
Simple triple extraction
(In actual projects, use spaCy or BERT-based models)
"""
# Define triples manually as a sample
triples = [
('Albert Einstein', 'born_in', 'Germany'),
('Albert Einstein', 'developed', 'Theory of Relativity'),
('Albert Einstein', 'won', 'Nobel Prize'),
('Nobel Prize', 'awarded_for', 'Physics'),
('Theory of Relativity', 'is_type_of', 'Physics Theory'),
]
return triples
# Neo4j-style Cypher query simulation
class SimpleGraphDB:
"""Simple implementation of graph database (Neo4j-style)"""
def __init__(self):
self.kg = KnowledgeGraph()
def create_node(self, label, properties=None):
"""Create node"""
node_id = f"{label}_{len(self.kg.G.nodes())}"
self.kg.G.add_node(node_id, label=label, **properties if properties else {})
return node_id
def create_relationship(self, node1, node2, rel_type):
"""Create relationship"""
self.kg.add_triple(node1, rel_type, node2)
def match(self, pattern):
"""
Pattern matching (simplified version)
Example: MATCH (a)-[r]->(b) WHERE a.label = 'Person'
"""
results = []
for u, v, data in self.kg.G.edges(data=True):
u_label = self.kg.G.nodes[u].get('label', '')
v_label = self.kg.G.nodes[v].get('label', '')
if pattern.get('source_label') and u_label != pattern['source_label']:
continue
if pattern.get('relation') and data['relation'] != pattern['relation']:
continue
if pattern.get('target_label') and v_label != pattern['target_label']:
continue
results.append((u, data['relation'], v))
return results
# Example usage of graph database
print("\n=== Graph Database Usage Example ===")
db = SimpleGraphDB()
# Extract triples from text
text = "Albert Einstein was born in Germany and developed the Theory of Relativity."
triples = extract_triples_simple(text)
# Store in graph database
for subject, predicate, obj in triples:
db.create_relationship(subject, predicate, obj)
print(f"Stored triples: {len(triples)}")
print("\nAll triples:")
for s, p, o in triples:
print(f" ({s}) -[{p}]-> ({o})")
# Execute query
print("\n=== Query: All relationships of 'Albert Einstein' ===")
pattern = {'source_label': None} # All relationships
einstein_relations = db.kg.query_relations('Albert Einstein')
for s, p, o in einstein_relations['outgoing']:
print(f" {s} -[{p}]-> {o}")
# Visualization
db.kg.visualize(figsize=(14, 10))
Practical Tip: In production environments, using dedicated graph databases like Neo4j, Amazon Neptune, or TigerGraph enables efficient management and querying of large-scale knowledge graphs.
5.3 Biological Networks
Protein-Protein Interaction Networks (PPI)
Protein-Protein Interaction Networks represent interactions between proteins within cells.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
def create_ppi_network(n_proteins=100, interaction_prob=0.05):
"""
Protein-protein interaction network simulation
Parameters:
-----------
n_proteins : int
Number of proteins
interaction_prob : float
Probability of interaction
Returns:
--------
G : NetworkX graph
PPI network
"""
G = nx.erdos_renyi_graph(n_proteins, interaction_prob)
# Randomly assign protein functions
functions = ['Metabolism', 'Signaling', 'Transport', 'Regulation', 'Structure']
for node in G.nodes():
G.nodes[node]['function'] = np.random.choice(functions)
G.nodes[node]['expression'] = np.random.uniform(0, 10) # Expression level
return G
def identify_protein_complexes(G, min_size=3):
"""
Identify protein complexes (clique detection)
Parameters:
-----------
G : NetworkX graph
PPI network
min_size : int
Minimum complex size
Returns:
--------
complexes : list
Detected protein complexes
"""
cliques = list(nx.find_cliques(G))
complexes = [c for c in cliques if len(c) >= min_size]
return complexes
def analyze_hub_proteins(G, top_k=10):
"""
Analyze hub proteins (high degree nodes)
Parameters:
-----------
G : NetworkX graph
PPI network
top_k : int
Top k hubs
Returns:
--------
hubs : list
List of hub proteins
"""
degrees = dict(G.degree())
hubs = sorted(degrees, key=degrees.get, reverse=True)[:top_k]
return hubs, degrees
# Create PPI network
np.random.seed(42)
ppi = create_ppi_network(n_proteins=80, interaction_prob=0.08)
print("=== Protein-Protein Interaction Network (PPI) ===")
print(f"Number of proteins: {ppi.number_of_nodes()}")
print(f"Number of interactions: {ppi.number_of_edges()}")
print(f"Average degree: {np.mean([d for _, d in ppi.degree()]):.2f}")
# Identify protein complexes
complexes = identify_protein_complexes(ppi, min_size=3)
print(f"\nDetected protein complexes: {len(complexes)}")
print(f"Complex size distribution:")
complex_sizes = [len(c) for c in complexes]
for size in sorted(set(complex_sizes)):
count = complex_sizes.count(size)
print(f" Size {size}: {count} complexes")
# Analyze hub proteins
hubs, degrees = analyze_hub_proteins(ppi, top_k=5)
print(f"\nTop 5 hub proteins:")
for i, hub in enumerate(hubs):
func = ppi.nodes[hub]['function']
deg = degrees[hub]
print(f" {i+1}. Protein {hub}: degree={deg}, function={func}")
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
# Entire network
pos = nx.spring_layout(ppi, seed=42)
node_colors = [ppi.nodes[n]['expression'] for n in ppi.nodes()]
node_sizes = [degrees[n] * 30 for n in ppi.nodes()]
nx.draw(ppi, pos, node_color=node_colors, node_size=node_sizes,
cmap='YlOrRd', edge_color='gray', alpha=0.7, ax=axes[0])
axes[0].set_title('PPI Network (color = expression level, size = degree)', fontsize=12)
# Highlight largest protein complex
if complexes:
largest_complex = max(complexes, key=len)
subgraph = ppi.subgraph(largest_complex)
sub_pos = nx.spring_layout(subgraph, seed=42)
nx.draw(subgraph, sub_pos, node_color='lightcoral', node_size=500,
edge_color='black', width=2, with_labels=True, ax=axes[1])
axes[1].set_title(f'Largest Protein Complex (size: {len(largest_complex)})',
fontsize=12)
plt.tight_layout()
plt.show()
Gene Regulatory Networks and Pathway Analysis
Gene regulatory networks are directed graphs that represent regulatory relationships between genes.
def create_gene_regulatory_network(n_genes=50, regulation_prob=0.1):
"""
Generate gene regulatory network
Parameters:
-----------
n_genes : int
Number of genes
regulation_prob : float
Probability of regulatory relationship
Returns:
--------
G : NetworkX DiGraph
Gene regulatory network
"""
G = nx.DiGraph()
# Add gene nodes
for i in range(n_genes):
G.add_node(f'Gene_{i}',
expression=np.random.uniform(0, 1),
type=np.random.choice(['TF', 'target'])) # TF = transcription factor
# Add regulatory relationships
for i in range(n_genes):
for j in range(n_genes):
if i != j and np.random.random() < regulation_prob:
# activation (+1) or repression (-1)
regulation_type = np.random.choice([1, -1])
G.add_edge(f'Gene_{i}', f'Gene_{j}',
weight=regulation_type)
return G
def find_regulatory_motifs(G):
"""
Detect regulatory motifs (feedback loops, etc.)
Parameters:
-----------
G : NetworkX DiGraph
Gene regulatory network
Returns:
--------
motifs : dict
Detected motifs
"""
motifs = {
'feedback_loops': [],
'feedforward_loops': [],
'cascades': []
}
# Feedback loops (cycles)
try:
cycles = list(nx.simple_cycles(G))
motifs['feedback_loops'] = [c for c in cycles if len(c) <= 4]
except:
pass
# Cascades (long paths)
for node in G.nodes():
descendants = nx.descendants(G, node)
if len(descendants) >= 3:
motifs['cascades'].append((node, len(descendants)))
return motifs
def pathway_enrichment_analysis(G, target_genes):
"""
Pathway enrichment analysis (simplified version)
Parameters:
-----------
G : NetworkX DiGraph
Gene regulatory network
target_genes : list
List of genes of interest
Returns:
--------
enriched_regulators : list
Enriched upstream regulators
"""
# Reachability from each gene to target genes
regulators = defaultdict(int)
for target in target_genes:
if target in G:
# Get all upstream regulators
ancestors = nx.ancestors(G, target)
for anc in ancestors:
regulators[anc] += 1
# Sort by score
enriched = sorted(regulators.items(), key=lambda x: x[1], reverse=True)
return enriched
# Create gene regulatory network
np.random.seed(42)
grn = create_gene_regulatory_network(n_genes=40, regulation_prob=0.12)
print("\n=== Gene Regulatory Network (GRN) ===")
print(f"Number of genes: {grn.number_of_nodes()}")
print(f"Number of regulatory relationships: {grn.number_of_edges()}")
# Identify transcription factors
tfs = [n for n in grn.nodes() if grn.nodes[n]['type'] == 'TF']
print(f"Number of transcription factors: {len(tfs)}")
# Motif detection
motifs = find_regulatory_motifs(grn)
print(f"\n=== Regulatory Motifs ===")
print(f"Feedback loops: {len(motifs['feedback_loops'])}")
if motifs['feedback_loops']:
print(f" Example: {motifs['feedback_loops'][0]}")
print(f"Regulatory cascades: {len(motifs['cascades'])}")
if motifs['cascades']:
top_cascade = max(motifs['cascades'], key=lambda x: x[1])
print(f" Largest cascade: {top_cascade[0]} β {top_cascade[1]} downstream genes")
# Pathway enrichment analysis
target_genes = list(grn.nodes())[:10] # First 10 genes as targets
enriched = pathway_enrichment_analysis(grn, target_genes)
print(f"\n=== Pathway Enrichment Analysis ===")
print(f"Number of target genes: {len(target_genes)}")
print(f"Top 5 enriched regulators:")
for i, (reg, score) in enumerate(enriched[:5]):
print(f" {i+1}. {reg}: score={score}")
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
# Entire network
pos = nx.spring_layout(grn, seed=42, k=0.5)
node_colors = ['red' if grn.nodes[n]['type'] == 'TF' else 'lightblue'
for n in grn.nodes()]
# Edge colors (activation=green, repression=red)
edge_colors = ['green' if grn[u][v]['weight'] > 0 else 'red'
for u, v in grn.edges()]
nx.draw(grn, pos, node_color=node_colors, edge_color=edge_colors,
node_size=300, alpha=0.7, arrows=True, arrowsize=10, ax=axes[0])
axes[0].set_title('Gene Regulatory Network\n(red=TF, green=activation, red=repression)',
fontsize=12)
# Highlight feedback loop
if motifs['feedback_loops']:
loop = motifs['feedback_loops'][0]
subgraph = grn.subgraph(loop)
sub_pos = nx.circular_layout(subgraph)
edge_colors_sub = ['green' if subgraph[u][v]['weight'] > 0 else 'red'
for u, v in subgraph.edges()]
nx.draw(subgraph, sub_pos, node_color='yellow', edge_color=edge_colors_sub,
node_size=800, width=2, with_labels=True, arrows=True,
arrowsize=20, ax=axes[1])
axes[1].set_title(f'Example Feedback Loop\n({len(loop)} genes)',
fontsize=12)
plt.tight_layout()
plt.show()
5.4 Applications to Recommendation Systems
Graph-Based Collaborative Filtering
We represent users and items as a bipartite graph and perform recommendations.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
import networkx as nx
from networkx.algorithms import bipartite
import numpy as np
import matplotlib.pyplot as plt
class GraphBasedRecommender:
"""Graph-based recommendation system"""
def __init__(self):
self.G = nx.Graph()
self.users = set()
self.items = set()
def add_interaction(self, user, item, rating=1.0):
"""Add user-item interaction"""
self.G.add_node(user, bipartite=0) # User side
self.G.add_node(item, bipartite=1) # Item side
self.G.add_edge(user, item, weight=rating)
self.users.add(user)
self.items.add(item)
def recommend_by_neighbors(self, user, top_k=5):
"""
Neighbor-based recommendation
1. Get user's neighboring items
2. Find other users connected to those items
3. Recommend items those users like
"""
if user not in self.G:
return []
# Items already rated by user
user_items = set(self.G.neighbors(user))
# Calculate candidate item scores
candidate_scores = {}
for item in user_items:
# Other users who like this item
for other_user in self.G.neighbors(item):
if other_user == user:
continue
# Items liked by other users
for candidate in self.G.neighbors(other_user):
if candidate not in user_items:
# Score = product of path weights
score = (self.G[user][item]['weight'] *
self.G[other_user][candidate]['weight'])
candidate_scores[candidate] = candidate_scores.get(candidate, 0) + score
# Sort by score
recommendations = sorted(candidate_scores.items(),
key=lambda x: x[1], reverse=True)[:top_k]
return recommendations
def recommend_by_random_walk(self, user, walk_length=10, n_walks=100, top_k=5):
"""
Random walk-based recommendation
Parameters:
-----------
user : str
Target user
walk_length : int
Length of each walk
n_walks : int
Number of walks to perform
top_k : int
Number of items to recommend
"""
if user not in self.G:
return []
user_items = set(self.G.neighbors(user))
visit_counts = {}
for _ in range(n_walks):
current = user
for step in range(walk_length):
neighbors = list(self.G.neighbors(current))
if not neighbors:
break
# Weighted random selection
weights = [self.G[current][n]['weight'] for n in neighbors]
weights = np.array(weights) / sum(weights)
current = np.random.choice(neighbors, p=weights)
# Count item visits
if current in self.items and current not in user_items:
visit_counts[current] = visit_counts.get(current, 0) + 1
# Sort by visit count
recommendations = sorted(visit_counts.items(),
key=lambda x: x[1], reverse=True)[:top_k]
return recommendations
# Create sample data (MovieLens-style)
np.random.seed(42)
recommender = GraphBasedRecommender()
# Users and items
users = [f'User_{i}' for i in range(20)]
items = [f'Movie_{i}' for i in range(15)]
# Generate random rating data
for user in users:
n_ratings = np.random.randint(3, 8)
rated_items = np.random.choice(items, n_ratings, replace=False)
for item in rated_items:
rating = np.random.uniform(3, 5)
recommender.add_interaction(user, item, rating)
print("=== Graph-Based Recommendation System ===")
print(f"Number of users: {len(recommender.users)}")
print(f"Number of items: {len(recommender.items)}")
print(f"Number of ratings: {recommender.G.number_of_edges()}")
# Execute recommendations
test_user = 'User_0'
print(f"\n=== Recommendations for '{test_user}' ===")
# Items already rated
user_items = list(recommender.G.neighbors(test_user))
print(f"Items already rated: {user_items}")
# Neighbor-based recommendations
neighbor_recs = recommender.recommend_by_neighbors(test_user, top_k=5)
print(f"\nNeighbor-based recommendations:")
for i, (item, score) in enumerate(neighbor_recs):
print(f" {i+1}. {item}: score={score:.3f}")
# Random walk-based recommendations
rw_recs = recommender.recommend_by_random_walk(test_user, top_k=5)
print(f"\nRandom walk-based recommendations:")
for i, (item, count) in enumerate(rw_recs):
print(f" {i+1}. {item}: visit count={count}")
# Visualization
plt.figure(figsize=(14, 10))
pos = nx.spring_layout(recommender.G, seed=42, k=2)
# Color code users and items
node_colors = ['lightblue' if n in recommender.users else 'lightcoral'
for n in recommender.G.nodes()]
# Highlight target user
node_colors = ['yellow' if n == test_user else c
for n, c in zip(recommender.G.nodes(), node_colors)]
nx.draw(recommender.G, pos, node_color=node_colors, node_size=300,
alpha=0.7, edge_color='gray', with_labels=True, font_size=8)
plt.title('Bipartite Graph Recommendation System\n(blue=users, red=items, yellow=target user)',
fontsize=14)
plt.tight_layout()
plt.show()
Recommendation Using Network Embeddings (Node2Vec)
We embed nodes into a vector space using Node2Vec and perform similarity-based recommendations.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def simple_node2vec(G, dimensions=64, walk_length=10, num_walks=80, p=1, q=1):
"""
Simple implementation of Node2Vec (for proof of concept)
Parameters:
-----------
G : NetworkX graph
Network
dimensions : int
Embedding dimensions
walk_length : int
Length of random walks
num_walks : int
Number of walks per node
p : float
Return parameter
q : float
In-out parameter
Returns:
--------
embeddings : dict
Embedding vectors for nodes
"""
# Generate random walks
walks = []
nodes = list(G.nodes())
for _ in range(num_walks):
np.random.shuffle(nodes)
for node in nodes:
walk = [node]
for _ in range(walk_length - 1):
current = walk[-1]
neighbors = list(G.neighbors(current))
if neighbors:
walk.append(np.random.choice(neighbors))
else:
break
walks.append(walk)
# Simple embedding (in practice, use Skip-gram, etc.)
# Here we use a simple co-occurrence matrix-based implementation
node_to_id = {node: i for i, node in enumerate(G.nodes())}
cooccurrence = np.zeros((len(nodes), len(nodes)))
for walk in walks:
for i, node in enumerate(walk):
for j in range(max(0, i-2), min(len(walk), i+3)):
if i != j:
cooccurrence[node_to_id[node]][node_to_id[walk[j]]] += 1
# Dimensionality reduction with SVD
from sklearn.decomposition import TruncatedSVD
svd = TruncatedSVD(n_components=min(dimensions, len(nodes)-1))
embeddings_matrix = svd.fit_transform(cooccurrence)
embeddings = {node: embeddings_matrix[node_to_id[node]]
for node in G.nodes()}
return embeddings
# Apply Node2Vec
print("\n=== Node2Vec Embeddings ===")
embeddings = simple_node2vec(recommender.G, dimensions=32, walk_length=10, num_walks=50)
print(f"Embedding dimensions: {len(list(embeddings.values())[0])}")
# Embedding-based recommendations
def recommend_by_embedding(embeddings, user, items, user_items, top_k=5):
"""Embedding-based recommendations"""
user_emb = embeddings[user].reshape(1, -1)
# Only unrated items as candidates
candidate_items = [item for item in items if item not in user_items]
if not candidate_items:
return []
# Calculate similarity
similarities = {}
for item in candidate_items:
item_emb = embeddings[item].reshape(1, -1)
sim = cosine_similarity(user_emb, item_emb)[0][0]
similarities[item] = sim
# Sort by score
recommendations = sorted(similarities.items(),
key=lambda x: x[1], reverse=True)[:top_k]
return recommendations
# Execute recommendations
user_items_set = set(recommender.G.neighbors(test_user))
emb_recs = recommend_by_embedding(embeddings, test_user,
recommender.items, user_items_set, top_k=5)
print(f"\n=== Node2Vec Embedding-Based Recommendations ===")
for i, (item, sim) in enumerate(emb_recs):
print(f" {i+1}. {item}: similarity={sim:.3f}")
# Visualize embeddings (t-SNE)
from sklearn.manifold import TSNE
print("\n=== Embedding Visualization ===")
node_list = list(embeddings.keys())
embedding_matrix = np.array([embeddings[n] for n in node_list])
tsne = TSNE(n_components=2, random_state=42)
embeddings_2d = tsne.fit_transform(embedding_matrix)
plt.figure(figsize=(12, 8))
user_indices = [i for i, n in enumerate(node_list) if n in recommender.users]
item_indices = [i for i, n in enumerate(node_list) if n in recommender.items]
plt.scatter(embeddings_2d[user_indices, 0], embeddings_2d[user_indices, 1],
c='blue', label='Users', alpha=0.6, s=100)
plt.scatter(embeddings_2d[item_indices, 0], embeddings_2d[item_indices, 1],
c='red', label='Items', alpha=0.6, s=100)
# Highlight target user
test_user_idx = node_list.index(test_user)
plt.scatter(embeddings_2d[test_user_idx, 0], embeddings_2d[test_user_idx, 1],
c='yellow', s=300, marker='*', edgecolors='black', linewidths=2,
label='Target User')
plt.xlabel('Dimension 1', fontsize=12)
plt.ylabel('Dimension 2', fontsize=12)
plt.title('Node2Vec Embeddings (t-SNE Visualization)', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
5.5 Practical Project: Comprehensive Network Analysis
Project: Analysis of a Twitter-like Social Network
As a practical project, we will conduct a comprehensive analysis of a social media network.
Project Design
Goal: Analyze a social network and achieve the following:
- Community detection and user clustering
- Influencer identification
- Information diffusion simulation
- Recommendation system construction
- Report generation and insight extraction
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - networkx>=3.1.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter, defaultdict
import pandas as pd
class SocialNetworkAnalyzer:
"""Comprehensive social network analysis system"""
def __init__(self, name="Social Network"):
self.name = name
self.G = nx.DiGraph() # Directed graph (follow relationships)
self.metrics = {}
self.communities = None
self.influencers = None
def load_network(self, edges, user_attributes=None):
"""
Load network data
Parameters:
-----------
edges : list of tuples
List of (follower, followee) pairs
user_attributes : dict
User attributes (optional)
"""
self.G.add_edges_from(edges)
if user_attributes:
for user, attrs in user_attributes.items():
if user in self.G:
for key, value in attrs.items():
self.G.nodes[user][key] = value
print(f"β Network loaded")
print(f" Number of users: {self.G.number_of_nodes()}")
print(f" Number of follow relationships: {self.G.number_of_edges()}")
def compute_basic_metrics(self):
"""Calculate basic network metrics"""
# Degree (follower count, following count)
in_degree = dict(self.G.in_degree()) # Follower count
out_degree = dict(self.G.out_degree()) # Following count
# Centrality metrics
pagerank = nx.pagerank(self.G)
# Weakly connected components
undirected = self.G.to_undirected()
n_components = nx.number_connected_components(undirected)
# Clustering coefficient
clustering = nx.average_clustering(undirected)
self.metrics = {
'in_degree': in_degree,
'out_degree': out_degree,
'pagerank': pagerank,
'n_components': n_components,
'avg_clustering': clustering,
'density': nx.density(self.G)
}
print(f"\nβ Basic metrics computed")
print(f" Number of components: {n_components}")
print(f" Average clustering coefficient: {clustering:.3f}")
print(f" Density: {self.metrics['density']:.4f}")
print(f" Average followers: {np.mean(list(in_degree.values())):.2f}")
print(f" Average following: {np.mean(list(out_degree.values())):.2f}")
def detect_communities(self, method='louvain'):
"""Community detection"""
undirected = self.G.to_undirected()
if method == 'louvain':
# Louvain algorithm (NetworkX standard feature)
import networkx.algorithms.community as nx_comm
self.communities = list(nx_comm.greedy_modularity_communities(undirected))
print(f"\nβ Community detection completed")
print(f" Number of communities detected: {len(self.communities)}")
print(f" Community size distribution:")
sizes = sorted([len(c) for c in self.communities], reverse=True)
for i, size in enumerate(sizes[:5]):
print(f" Community {i+1}: {size} users")
def identify_influencers(self, top_k=10):
"""Identify influencers"""
# Scoring by combining multiple metrics
scores = {}
for user in self.G.nodes():
score = (
0.4 * self.metrics['in_degree'][user] + # Follower count
0.3 * self.metrics['pagerank'][user] * 1000 + # PageRank
0.3 * self.metrics['out_degree'][user] # Engagement
)
scores[user] = score
self.influencers = sorted(scores, key=scores.get, reverse=True)[:top_k]
print(f"\nβ Influencer identification completed")
print(f" Top {top_k} influencers:")
for i, user in enumerate(self.influencers):
print(f" {i+1}. {user}:")
print(f" Followers: {self.metrics['in_degree'][user]}")
print(f" Following: {self.metrics['out_degree'][user]}")
print(f" PageRank: {self.metrics['pagerank'][user]:.4f}")
def simulate_information_diffusion(self, seed_users, prob=0.1, iterations=10):
"""Information diffusion simulation"""
active = set(seed_users)
history = [len(active)]
for _ in range(iterations):
new_active = set()
for user in active:
for follower in self.G.predecessors(user): # Followers
if follower not in active and np.random.random() < prob:
new_active.add(follower)
if not new_active:
break
active.update(new_active)
history.append(len(active))
print(f"\nβ Information diffusion simulation completed")
print(f" Initial users: {len(seed_users)}")
print(f" Final reach: {history[-1]} users ({history[-1]/self.G.number_of_nodes()*100:.1f}%)")
return history
def generate_report(self):
"""Generate analysis report"""
report = f"""
{'='*60}
Social Network Analysis Report: {self.name}
{'='*60}
1. Network Overview
- Number of users: {self.G.number_of_nodes():,}
- Number of follow relationships: {self.G.number_of_edges():,}
- Network density: {self.metrics['density']:.4f}
- Number of components: {self.metrics['n_components']}
2. User Behavior
- Average followers: {np.mean(list(self.metrics['in_degree'].values())):.2f}
- Average following: {np.mean(list(self.metrics['out_degree'].values())):.2f}
- Average clustering coefficient: {self.metrics['avg_clustering']:.3f}
3. Community Structure
- Number of communities: {len(self.communities) if self.communities else 'N/A'}
- Largest community size: {max(len(c) for c in self.communities) if self.communities else 'N/A'}
4. Influencers
- Top influencers: {', '.join(self.influencers[:5]) if self.influencers else 'N/A'}
5. Recommendations
- Marketing targets: Leverage top 5 influencers
- Community strategy: Create content specific to each community
- Engagement improvement: Use users with high clustering coefficients as hubs
{'='*60}
"""
return report
# Generate sample network and execute project
print("=== Social Network Analysis Project ===\n")
# Data generation (in practice, obtain from Twitter API, etc.)
np.random.seed(42)
n_users = 100
# Scale-free network (realistic social network)
G_sample = nx.scale_free_graph(n_users, seed=42)
edges = [(u, v) for u, v in G_sample.edges()]
# User attributes
user_attributes = {
i: {
'join_date': f'2020-{np.random.randint(1, 13):02d}',
'posts': np.random.randint(10, 1000),
'active': np.random.choice([True, False], p=[0.7, 0.3])
}
for i in range(n_users)
}
# Execute analysis
analyzer = SocialNetworkAnalyzer(name="Twitter-like Network")
analyzer.load_network(edges, user_attributes)
analyzer.compute_basic_metrics()
analyzer.detect_communities()
analyzer.identify_influencers(top_k=10)
# Information diffusion simulation (starting from top influencers)
diffusion_history = analyzer.simulate_information_diffusion(
seed_users=analyzer.influencers[:3],
prob=0.15,
iterations=10
)
# Generate report
report = analyzer.generate_report()
print(report)
# Visualization
fig = plt.figure(figsize=(16, 12))
gs = fig.add_gridspec(3, 2, hspace=0.3, wspace=0.3)
# 1. Overall network diagram
ax1 = fig.add_subplot(gs[0, :])
pos = nx.spring_layout(analyzer.G, seed=42, k=0.5)
node_colors = [analyzer.metrics['pagerank'][n] * 1000 for n in analyzer.G.nodes()]
node_sizes = [analyzer.metrics['in_degree'][n] * 20 + 50 for n in analyzer.G.nodes()]
nx.draw(analyzer.G, pos, node_color=node_colors, node_size=node_sizes,
cmap='YlOrRd', edge_color='gray', alpha=0.6, arrows=False, ax=ax1)
ax1.set_title('Overall Social Network\n(color=PageRank, size=follower count)',
fontsize=14, fontweight='bold')
# 2. Degree distribution
ax2 = fig.add_subplot(gs[1, 0])
in_degrees = list(analyzer.metrics['in_degree'].values())
ax2.hist(in_degrees, bins=30, alpha=0.7, edgecolor='black')
ax2.set_xlabel('Follower count', fontsize=11)
ax2.set_ylabel('Number of users', fontsize=11)
ax2.set_title('Follower Count Distribution', fontsize=12)
ax2.grid(True, alpha=0.3)
# 3. Community visualization
ax3 = fig.add_subplot(gs[1, 1])
if analyzer.communities:
community_map = {}
for i, comm in enumerate(analyzer.communities):
for node in comm:
community_map[node] = i
comm_colors = [community_map.get(n, 0) for n in analyzer.G.nodes()]
nx.draw(analyzer.G, pos, node_color=comm_colors, node_size=100,
cmap='tab10', edge_color='gray', alpha=0.6, arrows=False, ax=ax3)
ax3.set_title(f'Community Structure ({len(analyzer.communities)} communities)',
fontsize=12)
# 4. Information diffusion
ax4 = fig.add_subplot(gs[2, 0])
ax4.plot(range(len(diffusion_history)), diffusion_history,
marker='o', linewidth=2, markersize=8)
ax4.set_xlabel('Iteration', fontsize=11)
ax4.set_ylabel('Reached users', fontsize=11)
ax4.set_title('Information Diffusion Simulation', fontsize=12)
ax4.grid(True, alpha=0.3)
# 5. Influencer comparison
ax5 = fig.add_subplot(gs[2, 1])
influencer_data = {
'User': analyzer.influencers[:5],
'Followers': [analyzer.metrics['in_degree'][u] for u in analyzer.influencers[:5]],
}
df_inf = pd.DataFrame(influencer_data)
ax5.barh(df_inf['User'], df_inf['Followers'], alpha=0.7, edgecolor='black')
ax5.set_xlabel('Follower count', fontsize=11)
ax5.set_title('Top 5 Influencers', fontsize=12)
ax5.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()
print("\nβ Project completed!")
print(" Generated visualizations, analysis reports, and insights.")
Storytelling and Report Creation
Practical Tip: Storytelling to effectively communicate analysis results
- Background and Purpose: Clarify why this analysis is needed
- Data Characteristics: Present basic network statistics
- Key Findings: Focus on 3-5 important insights
- Visualization: Use intuitive and clear graphs
- Recommendations: Present specific action plans
- Next Steps: Indicate directions for further analysis
Chapter Summary
What We Learned
Social Network Analysis
- Influence propagation using Linear Threshold and Independent Cascade models
- Influencer identification using multiple centrality metrics
- Simulation and prediction of information diffusion
Knowledge Graphs
- Modeling entities and relationships (triple structure)
- Graph database basics (Neo4j-style queries)
- Construction and utilization of semantic networks
Biological Networks
- Analysis of protein-protein interaction networks
- Gene regulatory networks and motif detection
- Basics of pathway enrichment analysis
Recommendation Systems
- Graph-based collaborative filtering
- Recommendations using random walks
- Similarity recommendations using Node2Vec embeddings
Practical Project
- Design of comprehensive network analysis pipelines
- Integration of multiple analysis methods
- Effective reporting and storytelling
Real-World Applications
| Field | Application Examples | Key Technologies |
|---|---|---|
| Marketing | Influencer marketing, viral strategies | Influence propagation, centrality analysis |
| E-commerce | Product recommendation, cross-selling | Graph-based recommendations, Node2Vec |
| Pharmaceuticals | Drug discovery, disease mechanism elucidation | PPI analysis, pathway analysis |
| Finance | Fraud detection, risk analysis | Anomaly detection, community detection |
| AI/NLP | Knowledge base construction, Q&A systems | Knowledge graphs, semantic search |
Next Steps
To further deepen network analysis:
- Integration with Deep Learning: Graph Neural Networks (GNN), Graph Attention Networks (GAT)
- Dynamic Networks: Analysis of time-evolving networks
- Large-Scale Networks: Distributed processing, approximation algorithms
- Causal Inference: Identifying causal relationships in networks
- Practice with Real Data: Kaggle competitions, research projects
References
- BarabΓ‘si, A. L. (2016). Network Science. Cambridge University Press.
- Newman, M. (2018). Networks: An Introduction (2nd ed.). Oxford University Press.
- Easley, D., & Kleinberg, J. (2010). Networks, Crowds, and Markets. Cambridge University Press.
- Hamilton, W. L. (2020). Graph Representation Learning. Morgan & Claypool.
- Grover, A., & Leskovec, J. (2016). node2vec: Scalable Feature Learning for Networks. KDD.