132 lines
3.5 KiB
Python
132 lines
3.5 KiB
Python
import random
|
|
from statistics import mean
|
|
|
|
import matplotlib.pyplot as plt
|
|
import networkx as nx
|
|
from matplotlib import animation
|
|
|
|
|
|
class Node:
|
|
def __init__(self, is_infected=False):
|
|
self.id = random.randint(1, 2000000)
|
|
self.is_infected = is_infected
|
|
|
|
def as_tuple(self):
|
|
return self.id, self.is_infected
|
|
|
|
def __repr__(self):
|
|
return f'id: {self.id}, infected: {self.is_infected}'
|
|
|
|
|
|
class Edge:
|
|
def __init__(self, node_a: Node, node_b: Node, weight: float):
|
|
self.node_a = node_a
|
|
self.node_b = node_b
|
|
self.weight = weight
|
|
|
|
def as_tuple(self):
|
|
return self.node_a, self.node_b, {'weight': self.weight}
|
|
|
|
|
|
class Graph:
|
|
def __init__(self):
|
|
self.edges = []
|
|
|
|
def add_edge(self, edge: Edge):
|
|
self.edges.append(edge)
|
|
|
|
def add_edges(self, edges: [Edge]):
|
|
[self.edges.append(e) for e in edges]
|
|
|
|
def get_nodes(self) -> [Node]:
|
|
nodes = set()
|
|
for edge in self.edges:
|
|
nodes.add(edge.node_a)
|
|
nodes.add(edge.node_b)
|
|
return nodes
|
|
|
|
|
|
def update(num, layout, g_repr, ax, our_graph: Graph):
|
|
"""
|
|
This function is called every 'step', so if you wish to update the graph, do it here
|
|
"""
|
|
ax.clear()
|
|
|
|
for n in our_graph.get_nodes():
|
|
n.is_infected = bool(random.getrandbits(1))
|
|
|
|
colors = ['red' if n.is_infected else 'blue' for n in g_repr]
|
|
nx.draw_networkx(g_repr, ax=ax, pos=layout, node_color=colors, with_labels=False)
|
|
|
|
|
|
def do_graph_animation(output_file_name: str, in_graph: Graph, frame_count: int):
|
|
g_repr = nx.Graph()
|
|
# Convert our graph class into tuples understood by networkx
|
|
g_repr.add_edges_from([e.as_tuple() for e in in_graph.edges])
|
|
|
|
layout = nx.spring_layout(g_repr)
|
|
fig, ax = plt.subplots()
|
|
|
|
anim = animation.FuncAnimation(fig, update, frames=frame_count, fargs=(layout, g_repr, ax, in_graph))
|
|
anim.save(output_file_name)
|
|
plt.show()
|
|
|
|
|
|
def bus_network(n=30) -> Graph:
|
|
network = Graph()
|
|
nodes = [Node() for _ in range(n)]
|
|
edges = [Edge(nodes[i], nodes[i + 1], 1.0) for i in range(n - 1)]
|
|
|
|
network.add_edges(edges)
|
|
return network
|
|
|
|
|
|
def rank_avg(edges, digits=2):
|
|
ranks = {}
|
|
for e in edges:
|
|
ranks[e.node_a] = ranks.get(e.node_a, 0) + 1
|
|
ranks[e.node_b] = ranks.get(e.node_b, 0) + 1
|
|
return round(mean(ranks.values()), digits)
|
|
|
|
|
|
def star_network(cluster_count=5, starsize=6) -> tuple[Graph, float]:
|
|
node_count = cluster_count + cluster_count * starsize + 1
|
|
nodes = [Node() for _ in range(node_count)]
|
|
|
|
edges = []
|
|
for x in range(cluster_count):
|
|
center_node = x * starsize + x
|
|
edges += [Edge(nodes[center_node], nodes[i], 1.0) for i in range(center_node + 1, center_node + starsize + 1)]
|
|
edges.append(Edge(nodes[-1], nodes[center_node], 1.0))
|
|
|
|
network = Graph()
|
|
network.add_edges(edges)
|
|
|
|
return network, rank_avg(edges)
|
|
|
|
|
|
def main():
|
|
network = Graph()
|
|
nodes = [Node(True), Node(), Node(), Node(True), Node()]
|
|
|
|
network.add_edges([
|
|
Edge(nodes[1], nodes[0], 0.02),
|
|
Edge(nodes[1], nodes[2], 0.2),
|
|
Edge(nodes[2], nodes[0], 0.7),
|
|
Edge(nodes[3], nodes[2], 0.2),
|
|
Edge(nodes[3], nodes[1], 0.2),
|
|
Edge(nodes[4], nodes[3], 0.2)
|
|
])
|
|
|
|
do_graph_animation('test.gif', network, 5)
|
|
|
|
bus = bus_network()
|
|
do_graph_animation('bus.gif', bus, 5)
|
|
|
|
star, star_avg_rank = star_network()
|
|
do_graph_animation('star.gif', star, 5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|