Network_attack_propagation/network_attack_propagation.py
Marcin Kostrzewski 78e3b287fb Merge remote-tracking branch 'origin/stars'
# Conflicts:
#	network_attack_propagation.py
2022-06-16 22:06:23 +02:00

196 lines
5.4 KiB
Python

import random
from statistics import mean
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib import animation
class Node:
def __init__(self, is_infected=False):
self.id = random.randint(1, 2000000)
self.is_infected = is_infected
def as_tuple(self):
return self.id, self.is_infected
def __repr__(self):
return f'id: {self.id}, infected: {self.is_infected}'
class Edge:
def __init__(self, node_a: Node, node_b: Node, weight: float):
self.node_a = node_a
self.node_b = node_b
self.weight = weight
def as_tuple(self):
return self.node_a, self.node_b, {'weight': self.weight}
def has_node(self, node: Node) -> bool:
return self.node_a is node or self.node_b is node
class Graph:
def __init__(self, use_weights=False):
self.edges = []
self.use_weights = use_weights
def add_edge(self, edge: Edge):
self.edges.append(edge)
def add_edges(self, edges: [Edge]):
[self.edges.append(e) for e in edges]
def get_nodes(self) -> [Node]:
nodes = set()
for edge in self.edges:
nodes.add(edge.node_a)
nodes.add(edge.node_b)
return nodes
def get_adjacent_nodes(self, node: Node) -> [(Node, int)]:
"""
:param node: Node to search for
:return: An array of tuples (node, weight)
"""
edges_with_node = filter(lambda ed: ed.has_node(node), self.edges)
nodes = set()
for e in edges_with_node:
if e.node_a is node:
nodes.add((e.node_b, e.weight))
else:
nodes.add((e.node_a, e.weight))
return nodes
def infect_step(self):
infected_nodes = list(filter(lambda n: n.is_infected, self.get_nodes()))
for node in infected_nodes:
adjacent_nodes = self.get_adjacent_nodes(node)
if self.use_weights:
to_be_infected = random.choices([n[0] for n in adjacent_nodes], weights=[n[1] for n in adjacent_nodes])[0]
else:
to_be_infected = random.choice([n[0] for n in adjacent_nodes])
to_be_infected.is_infected = True
def update(num, layout, g_repr, ax, our_graph: Graph):
"""
This function is called every 'step', so if you wish to update the graph, do it here
"""
ax.clear()
colors = ['red' if n.is_infected else 'blue' for n in g_repr]
sizes = [50 if n.is_infected else 1 for n in g_repr]
nx.draw(
g_repr,
ax=ax,
pos=layout,
node_color=colors,
with_labels=False,
node_size=sizes,
node_shape="s",
alpha=0.5,
linewidths=40,
)
our_graph.infect_step()
def do_graph_animation(output_file_name: str, in_graph: Graph, frame_count: int, layout):
g_repr = nx.Graph()
# Convert our graph class into tuples understood by networkx
g_repr.add_edges_from([e.as_tuple() for e in in_graph.edges])
layout = layout(g_repr)
fig, ax = plt.subplots()
fig.set_figwidth(15)
fig.set_figheight(15)
anim = animation.FuncAnimation(
fig, update, frames=frame_count, interval=500, fargs=(layout, g_repr, ax, in_graph)
)
anim.save(output_file_name)
plt.style.use('seaborn')
plt.show()
def bus_network(n=30, infected_idx=0) -> Graph:
network = Graph()
nodes = [Node() for _ in range(n)]
nodes[infected_idx].is_infected = True
edges = [Edge(nodes[i], nodes[i + 1], 1.0) for i in range(n - 1)]
network.add_edges(edges)
return network
def rank_avg(edges, digits=2):
ranks = {}
for e in edges:
ranks[e.node_a] = ranks.get(e.node_a, 0) + 1
ranks[e.node_b] = ranks.get(e.node_b, 0) + 1
return round(mean(ranks.values()), digits)
def star_network(cluster_count=5, starsize=6) -> tuple[Graph, float]:
node_count = cluster_count + cluster_count * starsize + 1
nodes = [Node() for _ in range(node_count)]
edges = []
for x in range(cluster_count):
center_node = x * starsize + x
edges += [Edge(nodes[center_node], nodes[i], 1.0) for i in range(center_node + 1, center_node + starsize + 1)]
edges.append(Edge(nodes[-1], nodes[center_node], 1.0))
network = Graph()
network.add_edges(edges)
return network, rank_avg(edges)
def ring_network(n=30) -> Graph:
network = Graph()
nodes = [Node() for _ in range(n)]
nodes[0].is_infected = True
edges = [Edge(nodes[i], nodes[i + 1], 1.0) for i in range(n - 1)]
end_edge = Edge(nodes[n - 1], nodes[0], 1.0)
edges.append(end_edge)
network.add_edges(edges)
return network
def main():
network = Graph()
nodes = [Node(True), Node(), Node(), Node(True), Node()]
network.add_edges(
[
Edge(nodes[1], nodes[0], 0.02),
Edge(nodes[1], nodes[2], 0.2),
Edge(nodes[2], nodes[0], 0.7),
Edge(nodes[3], nodes[2], 0.2),
Edge(nodes[3], nodes[1], 0.2),
Edge(nodes[4], nodes[3], 0.2),
]
)
do_graph_animation('test.gif', network, 5, nx.spring_layout)
bus = bus_network()
do_graph_animation('bus.gif', bus, 20, nx.spiral_layout)
ring = ring_network()
do_graph_animation('ring.gif', ring, 20, nx.circular_layout)
star, star_avg_rank = star_network()
do_graph_animation('star.gif', star, 5, nx.spring_layout)
if __name__ == "__main__":
main()