from Graph.Edge import Edge
from Graph.Node import Node
class DiGraph:
"""General purpose directed graph library."""
def __init__(self):
self._nodes = []
self._edges = []
def add_node(self, node_object, label):
"""
Add a node to the graph
:param node_object: an object of any type to be stored in the node.
:param label: a string containing the node's label. This label must be unique for each node.
:return: returns True if the node was added, false otherwise due to a node with the same label already existing.
"""
if self.find_node(label) is None:
node = Node(label, node_object)
self._nodes.append(node)
return True
else:
return False
def add_edge(self, source_label, dest_label, edge_label=""):
"""
Add an edge to the graph.
Search for an edge with the same source and destination nodes
If found, update the label, do not make a new edge.
:param source_label: the source node or label of the source node
:param dest_label: the destination node or label
:param edge_label: the label for the edge, this is optional for edges.
"""
for edge in self._edges:
if source_label == edge.get_source() and dest_label == edge.get_destination():
self._edges.remove(edge) # found existing edge, delete and update label.
edge = Edge(edge_label, source_label, dest_label)
self._edges.append(edge)
def get_edges_for_node(self, source_label):
"""
Returns a list of edges leaving the specified node
:param source_label: the source node or label of the source node
:return: the list of edges
"""
edge_list = []
for edge in self._edges:
if edge.get_source() == source_label:
edge_list.append(edge)
return edge_list
def find_node(self, label):
"""
Returns a node given it's label
:param label: the label of the node
:return: the node, or if the node was not found, None is returned
"""
for node in self._nodes:
if node == label:
return node
return None
def get_nodes(self):
"""
Returns a list of the graph's nodes.
:return: the list of nodes
"""
return self._nodes
def get_edges(self):
"""
Returns a list of the graph's edges.
:return: the list of edges
"""
return self._edges
def clear(self):
"""Graph deconstructor"""
self._nodes.clear()
self._edges.clear()
def get_edge_from_nodes(self, source_label, dest_label):
"""
Finds the edge from source_label to dest_label
:param source_label: label of source node
:param dest_label: label of dest node
:return: the edge, or None if not found
"""
for edge in self._edges:
if edge.get_destination() == dest_label and edge.get_source() == source_label:
return edge
return None
def remove_node(self, label, remove_edges=True):
"""
Remove a node from the graph. Also removes all edges containing this node.
:param label: the label of the node to remove
:return: True if node was removed, False if node was not removed due to the node not existing
"""
node = self.find_node(label)
if node is None:
return False
#Remove all edges including this node, they are not valid any more!
if remove_edges:
edges = self.get_edges()
edges_to_remove = []
for edge in edges:
if edge.get_destination() == label or edge.get_source() == label:
edges_to_remove.append(edge)
for edge in edges_to_remove:
self.remove_edge(edge.get_source(), edge.get_destination())
#remove the node.
print("Removed node:")
self._nodes.remove(node)
return True
def remove_edge(self, source_label, dest_label):
"""
Remove an edge from the graph.
:param source_label: Source node
:param dest_label: Destination node
:return: True if edge was removed, False if node was not removed due to the edge not existing
"""
for edge in self._edges:
if edge.get_destination() == dest_label and edge.get_source() == source_label:
self._edges.remove(edge)
return True
return False
def remove_edges_containing_node(self, node_label):
"""
Remove an edge from the graph.
:param source_label: Source node
:param dest_label: Destination node
:return: True if edge was removed, False if node was not removed due to the edge not existing
"""
edges_to_remove = []
for edge in self._edges: # add edges to update to the update lists
if edge.get_destination() == node_label or edge.get_source() == node_label:
edges_to_remove.append(edge)
for edge in edges_to_remove:
self._edges.remove(edge)
def merge(self, other_graph):
"""
Merge another graph into graph
:param other_graph: The graph to merge
"""
for edge in other_graph.get_edges(): # merge edges
self.add_edge(edge.get_source(), edge.get_destination(), edge)
for node in other_graph.get_nodes(): # merge nodes
self.add_node(node.get_object(), node)
other_graph.clear()
def get_undirected_edges(self):
"""
Makes a new edge list and removes edges which are the same in the opposite direction
Converts edges from directed to undirected.
:return: List of edges, undirected.
"""
edges = self.get_edges()
new_edges = []
for edge in edges:
source = edge.get_source()
dest = edge.get_destination()
dest_edges = self.get_edges_for_node(dest)
#add edge in opposite direction to new_edges
if edge not in new_edges: # make sure this edge was is not already in new_edges
for d_edge in dest_edges: # find the identical edge in opposite direction
if d_edge.get_destination() == source:
new_edges.append(d_edge)
return new_edges
def dijkstra(self, source_node, dest_node):
"""
Dijkstra's algorithm to find shortest path from source_node to dest_node
:param source_node: source
:param dest_node: destination
:return: list of nodes containing the shortest path. None if no path found.
"""
dist_dict = {} # dictionary for distances
prev_dict = {} # dictionary for previous shortest node.
unvisited = [] # unvisited nodes
dist_dict[source_node] = 0 # distance from source to source is 0
nodes = self._nodes
if source_node not in nodes or dest_node not in nodes: # make sure the nodes exist
return None
for node in nodes:
if node != source_node:
dist_dict[node] = float('inf') # set tentative distance for all nodes but source to infinity
prev_dict[node] = None # previous shortest set to none.
unvisited.append(node) # add all nodes to unvisited list
while len(unvisited) != 0: # visit all unvisited nodes
min_node = _min_dist_node(dist_dict, unvisited) # find node with shortest current distance and search
unvisited.remove(min_node) # node has been visited
if min_node == dest_node: # if destination node was reached with the shortest path, break
break
neighbor_edges = self.get_edges_for_node(min_node) # get paths to neighbors
for neighbor_edge in neighbor_edges:
neighbor = neighbor_edge.get_destination() # get neighbor
alt_dist = dist_dict[min_node] + int(neighbor_edge) # alternitive distance to check
if alt_dist < dist_dict[neighbor]: # shorter path to neighbor found
dist_dict[neighbor] = alt_dist # update dictionaries
prev_dict[neighbor] = min_node
path = [] # list treated as a stack containing the path
node = dest_node
while prev_dict[node] is not None:
path.append(node) # add nodes to path.
node = prev_dict[node]
path.append(source_node)
path.reverse() # reverse path so it is returned in the correct order
print(path)
if len(path) > 1:
return path
else:
return None
def _min_dist_node(dict_dist, unvisited):
"""
Helper function for dijkstra
Find the unvisited node with shortest distance
:param dict_dist: dictionary of node distances
:param unvisited: list: unvisited nodes
:return: node with shortest distance
"""
min_node = None # node with shortest distance
for node in unvisited:
if min_node is None or dict_dist[node] < dict_dist[min_node]: # shorter node found, update min_node
min_node = node
return min_node