from collections import deque from typing import Set class DiGraph: """Really simple unweighted directed graph data structure to track dependencies. The API is pretty much the same as networkx so if you add something just copy their API. """ def __init__(self): # Dict of node -> dict of arbitrary attributes self._node = {} # Nested dict of node -> successor node -> nothing. # (didn't implement edge data) self._succ = {} # Nested dict of node -> predecessor node -> nothing. self._pred = {} def add_node(self, n, **kwargs): """Add a node to the graph. Args: n: the node. Can we any object that is a valid dict key. **kwargs: any attributes you want to attach to the node. """ if n not in self._node: self._node[n] = kwargs self._succ[n] = {} self._pred[n] = {} else: self._node[n].update(kwargs) def add_edge(self, u, v): """Add an edge to graph between nodes ``u`` and ``v`` ``u`` and ``v`` will be created if they do not already exist. """ # add nodes if u not in self._node: self._node[u] = {} self._succ[u] = {} self._pred[u] = {} if v not in self._node: self._node[v] = {} self._succ[v] = {} self._pred[v] = {} # add the edge self._succ[u][v] = True self._pred[v][u] = True def successors(self, n): """Returns an iterator over successor nodes of n.""" try: return iter(self._succ[n]) except KeyError as e: raise ValueError(f"The node {n} is not in the digraph.") from e def predecessors(self, n): """Returns an iterator over predecessors nodes of n.""" try: return iter(self._pred[n]) except KeyError as e: raise ValueError(f"The node {n} is not in the digraph.") from e @property def edges(self): """Returns an iterator over all edges (u, v) in the graph""" for n, successors in self._succ.items(): for succ in successors: yield n, succ @property def nodes(self): """Returns a dictionary of all nodes to their attributes.""" return self._node def __iter__(self): """Iterate over the nodes.""" return iter(self._node) def __contains__(self, n): """Returns True if ``n`` is a node in the graph, False otherwise.""" try: return n in self._node except TypeError: return False def forward_transitive_closure(self, src: str) -> Set[str]: """Returns a set of nodes that are reachable from src""" result = set(src) working_set = deque(src) while len(working_set) > 0: cur = working_set.popleft() for n in self.successors(cur): if n not in result: result.add(n) working_set.append(n) return result def backward_transitive_closure(self, src: str) -> Set[str]: """Returns a set of nodes that are reachable from src in reverse direction""" result = set(src) working_set = deque(src) while len(working_set) > 0: cur = working_set.popleft() for n in self.predecessors(cur): if n not in result: result.add(n) working_set.append(n) return result def all_paths(self, src: str, dst: str): """Returns a subgraph rooted at src that shows all the paths to dst.""" result_graph = DiGraph() # First compute forward transitive closure of src (all things reachable from src). forward_reachable_from_src = self.forward_transitive_closure(src) if dst not in forward_reachable_from_src: return result_graph # Second walk the reverse dependencies of dst, adding each node to # the output graph iff it is also present in forward_reachable_from_src. # we don't use backward_transitive_closures for optimization purposes working_set = deque(dst) while len(working_set) > 0: cur = working_set.popleft() for n in self.predecessors(cur): if n in forward_reachable_from_src: result_graph.add_edge(n, cur) # only explore further if its reachable from src working_set.append(n) return result_graph.to_dot() def to_dot(self) -> str: """Returns the dot representation of the graph. Returns: A dot representation of the graph. """ edges = "\n".join(f'"{f}" -> "{t}";' for f, t in self.edges) return f"""\ digraph G {{ rankdir = LR; node [shape=box]; {edges} }} """