From 8066d8f2be846a8e158d96f5c99bfa48c82701f6 Mon Sep 17 00:00:00 2001 From: Zakaria-Fakhri Date: Thu, 7 Aug 2025 11:21:35 +0200 Subject: [PATCH 1/2] Implement Johnson's algorithm with tests; update Bellman-Ford and Dijkstra algorithms --- graphs/bellman_ford.py | 243 ++++++++---- graphs/dijkstra.py | 318 +++++++++------ graphs/johnsons_algorithm.py | 304 +++++++++++++++ graphs/tests/test_johnsons_algorithm.py | 492 ++++++++++++++++++++++++ 4 files changed, 1182 insertions(+), 175 deletions(-) create mode 100644 graphs/johnsons_algorithm.py create mode 100644 graphs/tests/test_johnsons_algorithm.py diff --git a/graphs/bellman_ford.py b/graphs/bellman_ford.py index 9ac8bae85d4f..4869c52e2877 100644 --- a/graphs/bellman_ford.py +++ b/graphs/bellman_ford.py @@ -1,73 +1,184 @@ -from __future__ import annotations +""" +Bellman-Ford Algorithm implementation for single-source shortest path. +The Bellman-Ford algorithm can handle negative edge weights and detect negative cycles. +It has O(VE) time complexity where V is the number of vertices and E is the number of edges. -def print_distance(distance: list[float], src): - print(f"Vertex\tShortest Distance from vertex {src}") - for i, d in enumerate(distance): - print(f"{i}\t\t{d}") +Author: Zakariae Fakhri +Date: August 2025 +""" +from typing import Dict, Any, Optional, List, Tuple +from collections import defaultdict -def check_negative_cycle( - graph: list[dict[str, int]], distance: list[float], edge_count: int -): - for j in range(edge_count): - u, v, w = (graph[j][k] for k in ["src", "dst", "weight"]) - if distance[u] != float("inf") and distance[u] + w < distance[v]: - return True - return False - -def bellman_ford( - graph: list[dict[str, int]], vertex_count: int, edge_count: int, src: int -) -> list[float]: +class Graph: """ - Returns shortest paths from a vertex src to all - other vertices. - >>> edges = [(2, 1, -10), (3, 2, 3), (0, 3, 5), (0, 1, 4)] - >>> g = [{"src": s, "dst": d, "weight": w} for s, d, w in edges] - >>> bellman_ford(g, 4, 4, 0) - [0.0, -2.0, 8.0, 5.0] - >>> g = [{"src": s, "dst": d, "weight": w} for s, d, w in edges + [(1, 3, 5)]] - >>> bellman_ford(g, 4, 5, 0) - Traceback (most recent call last): - ... - Exception: Negative cycle found + A lightweight graph class for algorithms that don't have access to the main Graph class. """ - distance = [float("inf")] * vertex_count - distance[src] = 0.0 - - for _ in range(vertex_count - 1): - for j in range(edge_count): - u, v, w = (graph[j][k] for k in ["src", "dst", "weight"]) - - if distance[u] != float("inf") and distance[u] + w < distance[v]: - distance[v] = distance[u] + w - - negative_cycle_exists = check_negative_cycle(graph, distance, edge_count) - if negative_cycle_exists: - raise Exception("Negative cycle found") - - return distance - - -if __name__ == "__main__": - import doctest - - doctest.testmod() - - V = int(input("Enter number of vertices: ").strip()) - E = int(input("Enter number of edges: ").strip()) - - graph: list[dict[str, int]] = [{} for _ in range(E)] - - for i in range(E): - print("Edge ", i + 1) - src, dest, weight = ( - int(x) - for x in input("Enter source, destination, weight: ").strip().split(" ") - ) - graph[i] = {"src": src, "dst": dest, "weight": weight} - - source = int(input("\nEnter shortest path source:").strip()) - shortest_distance = bellman_ford(graph, V, E, source) - print_distance(shortest_distance, 0) + + def __init__(self): + self.adjacency_list = defaultdict(list) + self.vertices = set() + + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) + + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + return self.adjacency_list.get(vertex, []) + + def get_vertices(self) -> set: + return self.vertices.copy() + + def has_vertex(self, vertex: Any) -> bool: + return vertex in self.vertices + + def get_vertex_count(self) -> int: + return len(self.vertices) + + +class BellmanFord: + """ + Bellman-Ford algorithm implementation for single-source shortest path. + + This algorithm can handle graphs with negative edge weights and can detect + negative cycles. It's particularly useful as a preprocessing step in + Johnson's algorithm. + """ + + def __init__(self, graph): + """ + Initialize the Bellman-Ford algorithm with a graph. + + Args: + graph: The weighted directed graph to process + """ + self.graph = graph + self.distances = {} + self.predecessors = {} + + def find_shortest_paths(self, start_vertex: Any) -> Optional[Dict[Any, float]]: + """ + Find shortest paths from start_vertex to all other vertices. + + Args: + start_vertex: The source vertex to start from + + Returns: + Dictionary of vertex -> shortest distance, or None if negative cycle exists + """ + if not self.graph.has_vertex(start_vertex): + raise ValueError(f"Start vertex {start_vertex} not found in graph") + + # Initialize distances + self.distances = {vertex: float('inf') for vertex in self.graph.get_vertices()} + self.distances[start_vertex] = 0 + self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} + + # Relax edges V-1 times + vertex_count = self.graph.get_vertex_count() + + for iteration in range(vertex_count - 1): + updated = False + + for vertex in self.graph.get_vertices(): + if self.distances[vertex] != float('inf'): + for neighbor, weight in self.graph.get_neighbors(vertex): + new_distance = self.distances[vertex] + weight + + if new_distance < self.distances[neighbor]: + self.distances[neighbor] = new_distance + self.predecessors[neighbor] = vertex + updated = True + + # Early termination if no updates in this iteration + if not updated: + break + + # Check for negative cycles + if self._has_negative_cycle(): + return None + + return self.distances.copy() + + def _has_negative_cycle(self) -> bool: + """ + Check if the graph contains a negative cycle. + + Returns: + True if negative cycle exists, False otherwise + """ + for vertex in self.graph.get_vertices(): + if self.distances[vertex] != float('inf'): + for neighbor, weight in self.graph.get_neighbors(vertex): + if self.distances[vertex] + weight < self.distances[neighbor]: + return True + return False + + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: + """ + Get the shortest path from start_vertex to end_vertex. + + Args: + start_vertex: Source vertex + end_vertex: Destination vertex + + Returns: + List of vertices representing the path, or None if no path exists + """ + if not self.distances or end_vertex not in self.distances: + return None + + if self.distances[end_vertex] == float('inf'): + return None + + path = [] + current = end_vertex + + while current is not None: + path.append(current) + current = self.predecessors.get(current) + + path.reverse() + + # Verify the path starts with start_vertex + if path[0] != start_vertex: + return None + + return path + + def get_distance(self, vertex: Any) -> float: + """ + Get the shortest distance to a specific vertex. + + Args: + vertex: The target vertex + + Returns: + Shortest distance to the vertex + """ + return self.distances.get(vertex, float('inf')) + + @staticmethod + def detect_negative_cycle(graph) -> bool: + """ + Static method to detect if a graph contains a negative cycle. + + Args: + graph: The graph to check + + Returns: + True if negative cycle exists, False otherwise + """ + vertices = graph.get_vertices() + if not vertices: + return False + + # Pick any vertex as start + start_vertex = next(iter(vertices)) + bf = BellmanFord(graph) + result = bf.find_shortest_paths(start_vertex) + + return result is None diff --git a/graphs/dijkstra.py b/graphs/dijkstra.py index 87e9d2233bb2..898abd55e705 100644 --- a/graphs/dijkstra.py +++ b/graphs/dijkstra.py @@ -1,119 +1,219 @@ """ -pseudo-code +Dijkstra's Algorithm implementation for single-source shortest path. -DIJKSTRA(graph G, start vertex s, destination vertex d): +Dijkstra's algorithm finds shortest paths from a source vertex to all other vertices +in a weighted graph with non-negative edge weights. It has O(V log V + E) time complexity +when using a binary heap. -//all nodes initially unexplored - -1 - let H = min heap data structure, initialized with 0 and s [here 0 indicates - the distance from start vertex s] -2 - while H is non-empty: -3 - remove the first node and cost of H, call it U and cost -4 - if U has been previously explored: -5 - go to the while loop, line 2 //Once a node is explored there is no need - to make it again -6 - mark U as explored -7 - if U is d: -8 - return cost // total cost from start to destination vertex -9 - for each edge(U, V): c=cost of edge(U,V) // for V in graph[U] -10 - if V explored: -11 - go to next V in line 9 -12 - total_cost = cost + c -13 - add (total_cost,V) to H - -You can think at cost as a distance where Dijkstra finds the shortest distance -between vertices s and v in a graph G. The use of a min heap as H guarantees -that if a vertex has already been explored there will be no other path with -shortest distance, that happens because heapq.heappop will always return the -next vertex with the shortest distance, considering that the heap stores not -only the distance between previous vertex and current vertex but the entire -distance between each vertex that makes up the path from start vertex to target -vertex. +Author: Zakaria Fakhri +Date: August 2025 """ import heapq +from typing import Dict, Any, List, Optional, Set, Tuple +from collections import defaultdict -def dijkstra(graph, start, end): - """Return the cost of the shortest path between vertices start and end. - - >>> dijkstra(G, "E", "C") - 6 - >>> dijkstra(G2, "E", "F") - 3 - >>> dijkstra(G3, "E", "F") - 3 +class Graph: """ - - heap = [(0, start)] # cost from start node,end node - visited = set() - while heap: - (cost, u) = heapq.heappop(heap) - if u in visited: - continue - visited.add(u) - if u == end: - return cost - for v, c in graph[u]: - if v in visited: + A lightweight graph class for algorithms that don't have access to the main Graph class. + """ + + def __init__(self): + self.adjacency_list = defaultdict(list) + self.vertices = set() + + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) + + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + return self.adjacency_list.get(vertex, []) + + def get_vertices(self) -> set: + return self.vertices.copy() + + def has_vertex(self, vertex: Any) -> bool: + return vertex in self.vertices + + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: + for neighbor, weight in self.adjacency_list.get(source, []): + if neighbor == destination: + return weight + return None + + +class Dijkstra: + """ + Dijkstra's algorithm implementation for single-source shortest path. + + This algorithm works on graphs with non-negative edge weights and provides + optimal shortest paths. It's used as a component in Johnson's algorithm + after edge reweighting. + """ + + def __init__(self, graph): + """ + Initialize Dijkstra's algorithm with a graph. + + Args: + graph: The weighted directed graph with non-negative weights + """ + self.graph = graph + self.distances = {} + self.predecessors = {} + self.visited = set() + + def find_shortest_paths(self, start_vertex: Any) -> Dict[Any, float]: + """ + Find shortest paths from start_vertex to all other vertices. + + Args: + start_vertex: The source vertex to start from + + Returns: + Dictionary of vertex -> shortest distance + + Raises: + ValueError: If start_vertex is not in the graph + """ + if not self.graph.has_vertex(start_vertex): + raise ValueError(f"Start vertex {start_vertex} not found in graph") + + # Initialize distances and data structures + self.distances = {vertex: float('inf') for vertex in self.graph.get_vertices()} + self.distances[start_vertex] = 0 + self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} + self.visited = set() + + # Priority queue: (distance, vertex) + priority_queue = [(0, start_vertex)] + + while priority_queue: + current_distance, current_vertex = heapq.heappop(priority_queue) + + # Skip if already visited (handles duplicate entries in heap) + if current_vertex in self.visited: continue - next_item = cost + c - heapq.heappush(heap, (next_item, v)) - return -1 - - -G = { - "A": [["B", 2], ["C", 5]], - "B": [["A", 2], ["D", 3], ["E", 1], ["F", 1]], - "C": [["A", 5], ["F", 3]], - "D": [["B", 3]], - "E": [["B", 4], ["F", 3]], - "F": [["C", 3], ["E", 3]], -} - -r""" -Layout of G2: - -E -- 1 --> B -- 1 --> C -- 1 --> D -- 1 --> F - \ /\ - \ || - ----------------- 3 -------------------- -""" -G2 = { - "B": [["C", 1]], - "C": [["D", 1]], - "D": [["F", 1]], - "E": [["B", 1], ["F", 3]], - "F": [], -} - -r""" -Layout of G3: - -E -- 1 --> B -- 1 --> C -- 1 --> D -- 1 --> F - \ /\ - \ || - -------- 2 ---------> G ------- 1 ------ -""" -G3 = { - "B": [["C", 1]], - "C": [["D", 1]], - "D": [["F", 1]], - "E": [["B", 1], ["G", 2]], - "F": [], - "G": [["F", 1]], -} - -short_distance = dijkstra(G, "E", "C") -print(short_distance) # E -- 3 --> F -- 3 --> C == 6 - -short_distance = dijkstra(G2, "E", "F") -print(short_distance) # E -- 3 --> F == 3 - -short_distance = dijkstra(G3, "E", "F") -print(short_distance) # E -- 2 --> G -- 1 --> F == 3 - -if __name__ == "__main__": - import doctest - - doctest.testmod() + + # Mark current vertex as visited + self.visited.add(current_vertex) + + # Process all neighbors + for neighbor, weight in self.graph.get_neighbors(current_vertex): + if neighbor not in self.visited: + new_distance = current_distance + weight + + # Relaxation step + if new_distance < self.distances[neighbor]: + self.distances[neighbor] = new_distance + self.predecessors[neighbor] = current_vertex + heapq.heappush(priority_queue, (new_distance, neighbor)) + + return self.distances.copy() + + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: + """ + Get the shortest path from start_vertex to end_vertex. + + Args: + start_vertex: Source vertex + end_vertex: Destination vertex + + Returns: + List of vertices representing the path, or None if no path exists + """ + if not self.distances or end_vertex not in self.distances: + return None + + if self.distances[end_vertex] == float('inf'): + return None + + path = [] + current = end_vertex + + while current is not None: + path.append(current) + current = self.predecessors.get(current) + + path.reverse() + + # Verify the path starts with start_vertex + if path and path[0] != start_vertex: + return None + + return path if path else None + + def get_distance(self, vertex: Any) -> float: + """ + Get the shortest distance to a specific vertex. + + Args: + vertex: The target vertex + + Returns: + Shortest distance to the vertex + """ + return self.distances.get(vertex, float('inf')) + + def get_path_cost(self, path: List[Any]) -> float: + """ + Calculate the total cost of a given path. + + Args: + path: List of vertices representing a path + + Returns: + Total cost of the path, or infinity if path is invalid + """ + if not path or len(path) < 2: + return 0.0 + + total_cost = 0.0 + + for i in range(len(path) - 1): + current_vertex = path[i] + next_vertex = path[i + 1] + + # Find the edge weight + edge_weight = self.graph.get_edge_weight(current_vertex, next_vertex) + if edge_weight is None: + return float('inf') # Invalid path + + total_cost += edge_weight + + return total_cost + + def is_reachable(self, start_vertex: Any, target_vertex: Any) -> bool: + """ + Check if target_vertex is reachable from start_vertex. + + Args: + start_vertex: Source vertex + target_vertex: Target vertex + + Returns: + True if target is reachable, False otherwise + """ + if not self.distances: + self.find_shortest_paths(start_vertex) + + return self.distances.get(target_vertex, float('inf')) != float('inf') + + @staticmethod + def validate_non_negative_weights(graph) -> bool: + """ + Validate that all edge weights in the graph are non-negative. + + Args: + graph: The graph to validate + + Returns: + True if all weights are non-negative, False otherwise + """ + for vertex in graph.get_vertices(): + for neighbor, weight in graph.get_neighbors(vertex): + if weight < 0: + return False + return True diff --git a/graphs/johnsons_algorithm.py b/graphs/johnsons_algorithm.py new file mode 100644 index 000000000000..c26ac0c689ec --- /dev/null +++ b/graphs/johnsons_algorithm.py @@ -0,0 +1,304 @@ +""" +Johnson's Algorithm for All-Pairs Shortest Path +Implementation by: Zakaria Fakhri +Date: August 2025 + +Johnson's Algorithm is used to find the shortest paths between all pairs of vertices +in a weighted directed graph. It works well for sparse graphs and handles negative +edge weights (but not negative cycles). + +Algorithm Steps: +1. Add a new vertex connected to all vertices with weight 0 +2. Run Bellman-Ford from the new vertex to detect negative cycles and get potentials +3. Reweight all edges using the potentials to make them non-negative +4. Run Dijkstra from each vertex on the reweighted graph +5. Convert distances back using the original potentials + +Time Complexity: O(V²log(V) + VE) +Space Complexity: O(V²) +""" + +from collections import defaultdict +from typing import Dict, Any, Optional, List, Tuple +from .bellman_ford import BellmanFord +from .dijkstra import Dijkstra + + +class Graph: + """ + A weighted directed graph implementation using adjacency list representation. + """ + + def __init__(self): + """Initialize an empty graph.""" + self.adjacency_list = defaultdict(list) + self.vertices = set() + self._edge_count = 0 + + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + """Add a weighted edge to the graph.""" + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) + self._edge_count += 1 + + def add_edges(self, edges: List[Tuple[Any, Any, float]]) -> None: + """Add multiple edges to the graph.""" + for source, destination, weight in edges: + self.add_edge(source, destination, weight) + + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + """Get all neighbors of a vertex with their edge weights.""" + return self.adjacency_list.get(vertex, []) + + def get_vertices(self) -> set: + """Get all vertices in the graph.""" + return self.vertices.copy() + + def get_vertex_count(self) -> int: + """Get the number of vertices in the graph.""" + return len(self.vertices) + + def get_edge_count(self) -> int: + """Get the number of edges in the graph.""" + return self._edge_count + + def has_vertex(self, vertex: Any) -> bool: + """Check if a vertex exists in the graph.""" + return vertex in self.vertices + + def has_edge(self, source: Any, destination: Any) -> bool: + """Check if an edge exists between two vertices.""" + for neighbor, _ in self.adjacency_list.get(source, []): + if neighbor == destination: + return True + return False + + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: + """Get the weight of an edge between two vertices.""" + for neighbor, weight in self.adjacency_list.get(source, []): + if neighbor == destination: + return weight + return None + + def is_empty(self) -> bool: + """Check if the graph is empty (no vertices).""" + return len(self.vertices) == 0 + + def copy(self) -> 'Graph': + """Create a copy of the graph.""" + new_graph = Graph() + for vertex in self.adjacency_list: + for neighbor, weight in self.adjacency_list[vertex]: + new_graph.add_edge(vertex, neighbor, weight) + return new_graph + + def to_dict(self) -> Dict[Any, List[Tuple[Any, float]]]: + """Convert the graph to a dictionary representation.""" + return dict(self.adjacency_list) + + +class JohnsonsAlgorithm: + """ + Implementation of Johnson's Algorithm for All-Pairs Shortest Path problem. + + This class handles weighted directed graphs and finds shortest paths between + all pairs of vertices efficiently, even with negative edge weights. + """ + + def __init__(self, graph: Graph): + """ + Initialize Johnson's Algorithm with a graph. + + Args: + graph: The weighted directed graph to process + """ + self.original_graph = graph + self.potentials = {} + self.reweighted_graph = None + self.all_pairs_distances = {} + + def find_all_pairs_shortest_paths(self) -> Optional[Dict[Any, Dict[Any, float]]]: + """ + Find shortest paths between all pairs of vertices using Johnson's Algorithm. + + Returns: + Dictionary of source -> {destination -> distance}, or None if negative cycle exists + """ + if self.original_graph.is_empty(): + return {} + + # Step 1: Create graph with extra vertex + graph_with_extra = self._create_graph_with_extra_vertex() + extra_vertex = self._get_extra_vertex_name() + + # Step 2: Run Bellman-Ford from extra vertex to get potentials + print("Running Bellman-Ford to compute potentials...") + if not self._compute_potentials(graph_with_extra, extra_vertex): + return None # Negative cycle detected + + # Step 3: Reweight edges to make them non-negative + print("Reweighting edges...") + self._reweight_edges() + + # Step 4: Run Dijkstra from each vertex + print("Running Dijkstra from each vertex...") + self._compute_all_pairs_distances() + + # Step 5: Convert distances back to original weights + self._convert_distances_to_original() + + return self.all_pairs_distances + + def _get_extra_vertex_name(self) -> str: + """Generate a unique name for the extra vertex.""" + base_name = "extra_vertex_johnson" + extra_vertex = base_name + counter = 0 + + while self.original_graph.has_vertex(extra_vertex): + counter += 1 + extra_vertex = f"{base_name}_{counter}" + + return extra_vertex + + def _create_graph_with_extra_vertex(self) -> Graph: + """Create a copy of the original graph with an extra vertex.""" + graph_with_extra = self.original_graph.copy() + extra_vertex = self._get_extra_vertex_name() + + # Add edges from extra vertex to all original vertices with weight 0 + for vertex in self.original_graph.get_vertices(): + graph_with_extra.add_edge(extra_vertex, vertex, 0) + + return graph_with_extra + + def _compute_potentials(self, graph_with_extra: Graph, extra_vertex: str) -> bool: + """Compute potentials using Bellman-Ford algorithm.""" + bellman_ford = BellmanFord(graph_with_extra) + distances = bellman_ford.find_shortest_paths(extra_vertex) + + if distances is None: + return False # Negative cycle detected + + # Store potentials (exclude extra vertex) + self.potentials = {v: distances[v] for v in self.original_graph.get_vertices()} + return True + + def _reweight_edges(self) -> None: + """Reweight all edges to make them non-negative using the computed potentials.""" + self.reweighted_graph = Graph() + + for vertex in self.original_graph.get_vertices(): + for neighbor, weight in self.original_graph.get_neighbors(vertex): + # Reweight the edge + new_weight = weight + self.potentials[vertex] - self.potentials[neighbor] + self.reweighted_graph.add_edge(vertex, neighbor, new_weight) + + def _compute_all_pairs_distances(self) -> None: + """Run Dijkstra's algorithm from each vertex on the reweighted graph.""" + self.all_pairs_distances = {} + + for start_vertex in self.original_graph.get_vertices(): + dijkstra = Dijkstra(self.reweighted_graph) + distances = dijkstra.find_shortest_paths(start_vertex) + self.all_pairs_distances[start_vertex] = distances + + def _convert_distances_to_original(self) -> None: + """Convert the distances back to original weights using potentials.""" + for start_vertex in self.all_pairs_distances: + original_distances = {} + + for end_vertex in self.all_pairs_distances[start_vertex]: + reweighted_distance = self.all_pairs_distances[start_vertex][end_vertex] + + if reweighted_distance != float('inf'): + # Convert back to original distance + original_distance = (reweighted_distance - + self.potentials[start_vertex] + + self.potentials[end_vertex]) + original_distances[end_vertex] = original_distance + else: + original_distances[end_vertex] = float('inf') + + self.all_pairs_distances[start_vertex] = original_distances + + def get_shortest_path(self, source: Any, destination: Any) -> Optional[float]: + """Get the shortest distance between two specific vertices.""" + if source in self.all_pairs_distances: + return self.all_pairs_distances[source].get(destination) + return None + + def has_negative_cycle(self) -> bool: + """Check if the original graph contains a negative cycle.""" + return self.all_pairs_distances is None + + def get_potentials(self) -> Dict[Any, float]: + """Get the computed potentials for all vertices.""" + return self.potentials.copy() + + def get_reweighted_graph(self) -> Optional[Graph]: + """Get the reweighted graph with non-negative edge weights.""" + return self.reweighted_graph + + def print_all_pairs_distances(self, distances: Optional[Dict[Any, Dict[Any, float]]] = None) -> None: + """Print the all-pairs shortest path distances in a readable format.""" + if distances is None: + distances = self.all_pairs_distances + + if distances is None: + print("No solution due to negative cycle!") + return + + print("\n" + "="*60) + print("ALL-PAIRS SHORTEST PATH DISTANCES (Johnson's Algorithm)") + print("="*60) + + # Print header + vertices_list = sorted(list(self.original_graph.get_vertices()), key=str) + print(f"{'From\\To':<8}", end="") + for vertex in vertices_list: + print(f"{str(vertex):<8}", end="") + print() + print("-" * (8 + len(vertices_list) * 8)) + + # Print distances + for source in vertices_list: + print(f"{str(source):<8}", end="") + for destination in vertices_list: + if source in distances and destination in distances[source]: + dist = distances[source][destination] + if dist == float('inf'): + print(f"{'∞':<8}", end="") + else: + print(f"{dist:<8}", end="") + else: + print(f"{'N/A':<8}", end="") + print() + + @staticmethod + def is_suitable_for_johnson(graph: Graph) -> tuple: + """Check if Johnson's algorithm is suitable for the given graph.""" + if graph.is_empty(): + return False, "Graph is empty" + + # Check for negative cycles using a simple Bellman-Ford test + temp_graph = graph.copy() + extra_vertex = "temp_extra" + for vertex in graph.get_vertices(): + temp_graph.add_edge(extra_vertex, vertex, 0) + + bellman_ford = BellmanFord(temp_graph) + result = bellman_ford.find_shortest_paths(extra_vertex) + + if result is None: + return False, "Graph contains negative cycles" + + # Johnson's is particularly good for sparse graphs + vertex_count = graph.get_vertex_count() + edge_count = graph.get_edge_count() + + if edge_count < vertex_count * vertex_count / 4: + return True, "Suitable: Sparse graph with no negative cycles" + else: + return True, "Suitable but consider Floyd-Warshall for dense graphs" diff --git a/graphs/tests/test_johnsons_algorithm.py b/graphs/tests/test_johnsons_algorithm.py new file mode 100644 index 000000000000..ad45e7da37f4 --- /dev/null +++ b/graphs/tests/test_johnsons_algorithm.py @@ -0,0 +1,492 @@ +""" +Comprehensive tests for Johnson's Algorithm implementation. + +This module contains unit tests for all components of Johnson's Algorithm: +- Graph data structure +- Bellman-Ford algorithm +- Dijkstra's algorithm +- Johnson's algorithm main implementation + +Author: Zakariae Fakhri +Date: August 2025 +""" + +import unittest +import sys +import os + +# Add parent directory to path to import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from graphs.johnsons_algorithm import Graph, JohnsonsAlgorithm +from graphs.bellman_ford import BellmanFord +from graphs.dijkstra import Dijkstra + + +class TestGraph(unittest.TestCase): + """Test cases for the Graph class.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_empty_graph(self): + """Test empty graph properties.""" + self.assertTrue(self.graph.is_empty()) + self.assertEqual(self.graph.get_vertex_count(), 0) + self.assertEqual(self.graph.get_edge_count(), 0) + self.assertEqual(len(self.graph.get_vertices()), 0) + + def test_add_single_edge(self): + """Test adding a single edge.""" + self.graph.add_edge(1, 2, 5.0) + + self.assertFalse(self.graph.is_empty()) + self.assertEqual(self.graph.get_vertex_count(), 2) + self.assertEqual(self.graph.get_edge_count(), 1) + self.assertTrue(self.graph.has_vertex(1)) + self.assertTrue(self.graph.has_vertex(2)) + self.assertTrue(self.graph.has_edge(1, 2)) + self.assertFalse(self.graph.has_edge(2, 1)) + self.assertEqual(self.graph.get_edge_weight(1, 2), 5.0) + + def test_add_multiple_edges(self): + """Test adding multiple edges.""" + edges = [(1, 2, -1), (2, 3, 4), (3, 1, 2)] + self.graph.add_edges(edges) + + self.assertEqual(self.graph.get_vertex_count(), 3) + self.assertEqual(self.graph.get_edge_count(), 3) + self.assertEqual(self.graph.get_edge_weight(1, 2), -1) + self.assertEqual(self.graph.get_edge_weight(2, 3), 4) + self.assertEqual(self.graph.get_edge_weight(3, 1), 2) + + def test_neighbors(self): + """Test getting neighbors of vertices.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(1, 3, 4) + self.graph.add_edge(2, 3, 1) + + neighbors_1 = self.graph.get_neighbors(1) + self.assertEqual(len(neighbors_1), 2) + self.assertIn((2, 3), neighbors_1) + self.assertIn((3, 4), neighbors_1) + + neighbors_2 = self.graph.get_neighbors(2) + self.assertEqual(len(neighbors_2), 1) + self.assertIn((3, 1), neighbors_2) + + def test_copy_graph(self): + """Test copying a graph.""" + self.graph.add_edge(1, 2, 5) + self.graph.add_edge(2, 3, -2) + + copy_graph = self.graph.copy() + + self.assertEqual(copy_graph.get_vertex_count(), self.graph.get_vertex_count()) + self.assertEqual(copy_graph.get_edge_count(), self.graph.get_edge_count()) + self.assertEqual(copy_graph.get_edge_weight(1, 2), 5) + self.assertEqual(copy_graph.get_edge_weight(2, 3), -2) + + # Modify original, copy should remain unchanged + self.graph.add_edge(3, 4, 1) + self.assertEqual(self.graph.get_edge_count(), 3) + self.assertEqual(copy_graph.get_edge_count(), 2) + + +class TestBellmanFord(unittest.TestCase): + """Test cases for the Bellman-Ford algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_single_vertex(self): + """Test Bellman-Ford with a single vertex.""" + self.graph.add_edge(1, 1, 0) # Self-loop + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + + def test_simple_path(self): + """Test Bellman-Ford with a simple path.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(2, 3, 4) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 3) + self.assertEqual(distances[3], 7) + + def test_negative_edges(self): + """Test Bellman-Ford with negative edges but no negative cycles.""" + self.graph.add_edge(1, 2, -1) + self.graph.add_edge(2, 3, 4) + self.graph.add_edge(1, 3, 2) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], -1) + self.assertEqual(distances[3], 2) # min(2, -1+4) = 2 + + def test_negative_cycle_detection(self): + """Test Bellman-Ford negative cycle detection.""" + # Create a negative cycle: 1 -> 2 -> 3 -> 1 with total weight -1 + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, -4) + self.graph.add_edge(3, 1, 2) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNone(distances) # Should detect negative cycle + + def test_unreachable_vertices(self): + """Test Bellman-Ford with unreachable vertices.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(3, 4, 1) # Separate component + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 1) + self.assertEqual(distances[3], float('inf')) + self.assertEqual(distances[4], float('inf')) + + def test_get_path(self): + """Test path reconstruction in Bellman-Ford.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, 2) + self.graph.add_edge(1, 3, 5) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + path = bf.get_path(1, 3) + + self.assertIsNotNone(distances) + self.assertEqual(path, [1, 2, 3]) # Should use shorter path + self.assertEqual(distances[3], 3) + + +class TestDijkstra(unittest.TestCase): + """Test cases for Dijkstra's algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_simple_path(self): + """Test Dijkstra with a simple path.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(2, 3, 4) + self.graph.add_edge(1, 3, 10) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 3) + self.assertEqual(distances[3], 7) # 3 + 4 < 10 + + def test_complex_graph(self): + """Test Dijkstra with a more complex graph.""" + edges = [ + (1, 2, 4), (1, 3, 2), + (2, 3, 1), (2, 4, 5), + (3, 4, 8), (3, 5, 10), + (4, 5, 2) + ] + self.graph.add_edges(edges) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 4) # Direct edge 1->2: 4 + self.assertEqual(distances[3], 2) + self.assertEqual(distances[4], 9) # 1->2->4: 4+5 = 9 + self.assertEqual(distances[5], 11) # 1->2->4->5: 4+5+2 = 11 + + def test_unreachable_vertices(self): + """Test Dijkstra with unreachable vertices.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(3, 4, 1) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 1) + self.assertEqual(distances[3], float('inf')) + self.assertEqual(distances[4], float('inf')) + + def test_get_path(self): + """Test path reconstruction in Dijkstra.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, 2) + self.graph.add_edge(1, 3, 5) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + path = dijkstra.get_path(1, 3) + + self.assertEqual(path, [1, 2, 3]) + self.assertEqual(distances[3], 3) + + +class TestJohnsonsAlgorithm(unittest.TestCase): + """Test cases for Johnson's Algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_empty_graph(self): + """Test Johnson's algorithm with empty graph.""" + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertEqual(result, {}) + + def test_single_vertex(self): + """Test Johnson's algorithm with single vertex.""" + # Need at least one edge to create a vertex + self.graph.add_edge(1, 1, 0) + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + self.assertEqual(result[1][1], 0) + + def test_simple_graph_positive_weights(self): + """Test Johnson's algorithm with positive weights only.""" + edges = [ + (1, 2, 3), + (2, 3, 4), + (1, 3, 10) + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Check some key distances + self.assertEqual(result[1][1], 0) + self.assertEqual(result[1][2], 3) + self.assertEqual(result[1][3], 7) # 1->2->3: 3+4 = 7 + self.assertEqual(result[2][2], 0) + self.assertEqual(result[2][3], 4) + self.assertEqual(result[3][3], 0) + + # Unreachable paths should be infinity + self.assertEqual(result[2][1], float('inf')) + self.assertEqual(result[3][1], float('inf')) + self.assertEqual(result[3][2], float('inf')) + + def test_graph_with_negative_weights(self): + """Test Johnson's algorithm with negative weights (no negative cycles).""" + edges = [ + (1, 2, -1), + (1, 3, 4), + (2, 3, 3), + (2, 4, 2), + (3, 4, -5), + (4, 1, 6) + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Test some specific shortest paths + self.assertEqual(result[1][1], 0) + self.assertEqual(result[1][2], -1) + self.assertEqual(result[1][4], -3) # 1->3->4: 4+(-5) = -1, but 1->2->3->4: -1+3+(-5) = -3 + self.assertEqual(result[3][4], -5) + + def test_negative_cycle_detection(self): + """Test Johnson's algorithm negative cycle detection.""" + # Create a negative cycle + edges = [ + (1, 2, 1), + (2, 3, -4), + (3, 1, 2) # Total cycle weight: 1 + (-4) + 2 = -1 + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNone(result) # Should detect negative cycle + # Note: has_negative_cycle only returns True after find_all_pairs_shortest_paths is called and returns None + + def test_string_vertices(self): + """Test Johnson's algorithm with string vertices.""" + edges = [ + ('A', 'B', 1), + ('A', 'C', 4), + ('B', 'C', -3), + ('B', 'D', 2), + ('C', 'D', 3), + ('D', 'A', -1) + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Test some specific distances + self.assertEqual(result['A']['A'], 0) + self.assertEqual(result['A']['B'], 1) + self.assertEqual(result['A']['C'], -2) # A->B->C: 1+(-3) = -2 + self.assertEqual(result['B']['C'], -3) + self.assertEqual(result['D']['A'], -1) + + def test_get_shortest_path_method(self): + """Test the get_shortest_path convenience method.""" + edges = [(1, 2, 3), (2, 3, 4)] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertEqual(johnson.get_shortest_path(1, 3), 7) + self.assertEqual(johnson.get_shortest_path(1, 1), 0) + self.assertEqual(johnson.get_shortest_path(2, 1), float('inf')) + + def test_algorithm_suitability_check(self): + """Test the static method for checking algorithm suitability.""" + # Empty graph + empty_graph = Graph() + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(empty_graph) + self.assertFalse(suitable) + self.assertIn("empty", reason.lower()) + + # Graph with negative cycle + neg_cycle_graph = Graph() + neg_cycle_graph.add_edges([(1, 2, 1), (2, 3, -4), (3, 1, 2)]) + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(neg_cycle_graph) + self.assertFalse(suitable) + self.assertIn("negative cycle", reason.lower()) + + # Good graph for Johnson's + good_graph = Graph() + good_graph.add_edges([(1, 2, -1), (2, 3, 4), (3, 4, 2)]) + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(good_graph) + self.assertTrue(suitable) + + +class TestIntegration(unittest.TestCase): + """Integration tests comparing Johnson's with individual algorithms.""" + + def test_johnson_vs_dijkstra_positive_weights(self): + """Compare Johnson's with Dijkstra on positive weight graph.""" + graph = Graph() + edges = [(1, 2, 3), (2, 3, 4), (1, 3, 10), (3, 4, 1)] + graph.add_edges(edges) + + # Johnson's algorithm + johnson = JohnsonsAlgorithm(graph) + johnson_result = johnson.find_all_pairs_shortest_paths() + + # Dijkstra from vertex 1 + dijkstra = Dijkstra(graph) + dijkstra_distances = dijkstra.find_shortest_paths(1) + + # Compare results for vertex 1 + for vertex in graph.get_vertices(): + self.assertEqual( + johnson_result[1][vertex], + dijkstra_distances[vertex], + f"Mismatch for vertex {vertex}" + ) + + def test_performance_comparison(self): + """Basic performance test to ensure algorithms complete.""" + # Create a moderately sized graph + graph = Graph() + + # Create a grid-like graph + for i in range(1, 6): + for j in range(1, 6): + if i < 5: + graph.add_edge(i*5 + j, (i+1)*5 + j, 1) + if j < 5: + graph.add_edge(i*5 + j, i*5 + (j+1), 1) + + # Add some negative edges + graph.add_edge(6, 11, -2) + graph.add_edge(12, 17, -1) + + johnson = JohnsonsAlgorithm(graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + self.assertEqual(len(result), graph.get_vertex_count()) + + +def run_tests(): + """Run all tests and display results.""" + # Create test suite + test_classes = [ + TestGraph, + TestBellmanFord, + TestDijkstra, + TestJohnsonsAlgorithm, + TestIntegration + ] + + loader = unittest.TestLoader() + suite = unittest.TestSuite() + + for test_class in test_classes: + tests = loader.loadTestsFromTestCase(test_class) + suite.addTests(tests) + + # Run tests + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + # Print summary + print(f"\n{'='*60}") + print("TEST SUMMARY") + print(f"{'='*60}") + print(f"Tests run: {result.testsRun}") + print(f"Failures: {len(result.failures)}") + print(f"Errors: {len(result.errors)}") + + if result.failures: + print(f"\nFAILURES:") + for test, traceback in result.failures: + print(f"- {test}: {traceback.split('AssertionError:')[-1].strip()}") + + if result.errors: + print(f"\nERRORS:") + for test, traceback in result.errors: + print(f"- {test}: {traceback.split('Exception:')[-1].strip()}") + + success_rate = ((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun) * 100 + print(f"\nSuccess rate: {success_rate:.1f}%") + + return result.wasSuccessful() + + +if __name__ == "__main__": + success = run_tests() + exit(0 if success else 1) From b05af0a17f24bd44e161abf73029c7871b505dcd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 7 Aug 2025 09:33:20 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- graphs/bellman_ford.py | 88 ++++---- graphs/dijkstra.py | 106 ++++----- graphs/johnsons_algorithm.py | 146 +++++++------ graphs/tests/test_johnsons_algorithm.py | 272 ++++++++++++------------ 4 files changed, 308 insertions(+), 304 deletions(-) diff --git a/graphs/bellman_ford.py b/graphs/bellman_ford.py index 4869c52e2877..34cecf3c8a78 100644 --- a/graphs/bellman_ford.py +++ b/graphs/bellman_ford.py @@ -16,25 +16,25 @@ class Graph: """ A lightweight graph class for algorithms that don't have access to the main Graph class. """ - + def __init__(self): self.adjacency_list = defaultdict(list) self.vertices = set() - + def add_edge(self, source: Any, destination: Any, weight: float) -> None: self.adjacency_list[source].append((destination, weight)) self.vertices.add(source) self.vertices.add(destination) - + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: return self.adjacency_list.get(vertex, []) - + def get_vertices(self) -> set: return self.vertices.copy() - + def has_vertex(self, vertex: Any) -> bool: return vertex in self.vertices - + def get_vertex_count(self) -> int: return len(self.vertices) @@ -42,143 +42,143 @@ def get_vertex_count(self) -> int: class BellmanFord: """ Bellman-Ford algorithm implementation for single-source shortest path. - + This algorithm can handle graphs with negative edge weights and can detect negative cycles. It's particularly useful as a preprocessing step in Johnson's algorithm. """ - + def __init__(self, graph): """ Initialize the Bellman-Ford algorithm with a graph. - + Args: graph: The weighted directed graph to process """ self.graph = graph self.distances = {} self.predecessors = {} - + def find_shortest_paths(self, start_vertex: Any) -> Optional[Dict[Any, float]]: """ Find shortest paths from start_vertex to all other vertices. - + Args: start_vertex: The source vertex to start from - + Returns: Dictionary of vertex -> shortest distance, or None if negative cycle exists """ if not self.graph.has_vertex(start_vertex): raise ValueError(f"Start vertex {start_vertex} not found in graph") - + # Initialize distances - self.distances = {vertex: float('inf') for vertex in self.graph.get_vertices()} + self.distances = {vertex: float("inf") for vertex in self.graph.get_vertices()} self.distances[start_vertex] = 0 self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} - + # Relax edges V-1 times vertex_count = self.graph.get_vertex_count() - + for iteration in range(vertex_count - 1): updated = False - + for vertex in self.graph.get_vertices(): - if self.distances[vertex] != float('inf'): + if self.distances[vertex] != float("inf"): for neighbor, weight in self.graph.get_neighbors(vertex): new_distance = self.distances[vertex] + weight - + if new_distance < self.distances[neighbor]: self.distances[neighbor] = new_distance self.predecessors[neighbor] = vertex updated = True - + # Early termination if no updates in this iteration if not updated: break - + # Check for negative cycles if self._has_negative_cycle(): return None - + return self.distances.copy() - + def _has_negative_cycle(self) -> bool: """ Check if the graph contains a negative cycle. - + Returns: True if negative cycle exists, False otherwise """ for vertex in self.graph.get_vertices(): - if self.distances[vertex] != float('inf'): + if self.distances[vertex] != float("inf"): for neighbor, weight in self.graph.get_neighbors(vertex): if self.distances[vertex] + weight < self.distances[neighbor]: return True return False - + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: """ Get the shortest path from start_vertex to end_vertex. - + Args: start_vertex: Source vertex end_vertex: Destination vertex - + Returns: List of vertices representing the path, or None if no path exists """ if not self.distances or end_vertex not in self.distances: return None - - if self.distances[end_vertex] == float('inf'): + + if self.distances[end_vertex] == float("inf"): return None - + path = [] current = end_vertex - + while current is not None: path.append(current) current = self.predecessors.get(current) - + path.reverse() - + # Verify the path starts with start_vertex if path[0] != start_vertex: return None - + return path - + def get_distance(self, vertex: Any) -> float: """ Get the shortest distance to a specific vertex. - + Args: vertex: The target vertex - + Returns: Shortest distance to the vertex """ - return self.distances.get(vertex, float('inf')) - + return self.distances.get(vertex, float("inf")) + @staticmethod def detect_negative_cycle(graph) -> bool: """ Static method to detect if a graph contains a negative cycle. - + Args: graph: The graph to check - + Returns: True if negative cycle exists, False otherwise """ vertices = graph.get_vertices() if not vertices: return False - + # Pick any vertex as start start_vertex = next(iter(vertices)) bf = BellmanFord(graph) result = bf.find_shortest_paths(start_vertex) - + return result is None diff --git a/graphs/dijkstra.py b/graphs/dijkstra.py index 898abd55e705..0d1b0e3715e6 100644 --- a/graphs/dijkstra.py +++ b/graphs/dijkstra.py @@ -18,25 +18,25 @@ class Graph: """ A lightweight graph class for algorithms that don't have access to the main Graph class. """ - + def __init__(self): self.adjacency_list = defaultdict(list) self.vertices = set() - + def add_edge(self, source: Any, destination: Any, weight: float) -> None: self.adjacency_list[source].append((destination, weight)) self.vertices.add(source) self.vertices.add(destination) - + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: return self.adjacency_list.get(vertex, []) - + def get_vertices(self) -> set: return self.vertices.copy() - + def has_vertex(self, vertex: Any) -> bool: return vertex in self.vertices - + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: for neighbor, weight in self.adjacency_list.get(source, []): if neighbor == destination: @@ -47,16 +47,16 @@ def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: class Dijkstra: """ Dijkstra's algorithm implementation for single-source shortest path. - + This algorithm works on graphs with non-negative edge weights and provides optimal shortest paths. It's used as a component in Johnson's algorithm after edge reweighting. """ - + def __init__(self, graph): """ Initialize Dijkstra's algorithm with a graph. - + Args: graph: The weighted directed graph with non-negative weights """ @@ -64,151 +64,151 @@ def __init__(self, graph): self.distances = {} self.predecessors = {} self.visited = set() - + def find_shortest_paths(self, start_vertex: Any) -> Dict[Any, float]: """ Find shortest paths from start_vertex to all other vertices. - + Args: start_vertex: The source vertex to start from - + Returns: Dictionary of vertex -> shortest distance - + Raises: ValueError: If start_vertex is not in the graph """ if not self.graph.has_vertex(start_vertex): raise ValueError(f"Start vertex {start_vertex} not found in graph") - + # Initialize distances and data structures - self.distances = {vertex: float('inf') for vertex in self.graph.get_vertices()} + self.distances = {vertex: float("inf") for vertex in self.graph.get_vertices()} self.distances[start_vertex] = 0 self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} self.visited = set() - + # Priority queue: (distance, vertex) priority_queue = [(0, start_vertex)] - + while priority_queue: current_distance, current_vertex = heapq.heappop(priority_queue) - + # Skip if already visited (handles duplicate entries in heap) if current_vertex in self.visited: continue - + # Mark current vertex as visited self.visited.add(current_vertex) - + # Process all neighbors for neighbor, weight in self.graph.get_neighbors(current_vertex): if neighbor not in self.visited: new_distance = current_distance + weight - + # Relaxation step if new_distance < self.distances[neighbor]: self.distances[neighbor] = new_distance self.predecessors[neighbor] = current_vertex heapq.heappush(priority_queue, (new_distance, neighbor)) - + return self.distances.copy() - + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: """ Get the shortest path from start_vertex to end_vertex. - + Args: start_vertex: Source vertex end_vertex: Destination vertex - + Returns: List of vertices representing the path, or None if no path exists """ if not self.distances or end_vertex not in self.distances: return None - - if self.distances[end_vertex] == float('inf'): + + if self.distances[end_vertex] == float("inf"): return None - + path = [] current = end_vertex - + while current is not None: path.append(current) current = self.predecessors.get(current) - + path.reverse() - + # Verify the path starts with start_vertex if path and path[0] != start_vertex: return None - + return path if path else None - + def get_distance(self, vertex: Any) -> float: """ Get the shortest distance to a specific vertex. - + Args: vertex: The target vertex - + Returns: Shortest distance to the vertex """ - return self.distances.get(vertex, float('inf')) - + return self.distances.get(vertex, float("inf")) + def get_path_cost(self, path: List[Any]) -> float: """ Calculate the total cost of a given path. - + Args: path: List of vertices representing a path - + Returns: Total cost of the path, or infinity if path is invalid """ if not path or len(path) < 2: return 0.0 - + total_cost = 0.0 - + for i in range(len(path) - 1): current_vertex = path[i] next_vertex = path[i + 1] - + # Find the edge weight edge_weight = self.graph.get_edge_weight(current_vertex, next_vertex) if edge_weight is None: - return float('inf') # Invalid path - + return float("inf") # Invalid path + total_cost += edge_weight - + return total_cost - + def is_reachable(self, start_vertex: Any, target_vertex: Any) -> bool: """ Check if target_vertex is reachable from start_vertex. - + Args: start_vertex: Source vertex target_vertex: Target vertex - + Returns: True if target is reachable, False otherwise """ if not self.distances: self.find_shortest_paths(start_vertex) - - return self.distances.get(target_vertex, float('inf')) != float('inf') - + + return self.distances.get(target_vertex, float("inf")) != float("inf") + @staticmethod def validate_non_negative_weights(graph) -> bool: """ Validate that all edge weights in the graph are non-negative. - + Args: graph: The graph to validate - + Returns: True if all weights are non-negative, False otherwise """ diff --git a/graphs/johnsons_algorithm.py b/graphs/johnsons_algorithm.py index c26ac0c689ec..6d836d5736f5 100644 --- a/graphs/johnsons_algorithm.py +++ b/graphs/johnsons_algorithm.py @@ -3,14 +3,14 @@ Implementation by: Zakaria Fakhri Date: August 2025 -Johnson's Algorithm is used to find the shortest paths between all pairs of vertices -in a weighted directed graph. It works well for sparse graphs and handles negative +Johnson's Algorithm is used to find the shortest paths between all pairs of vertices +in a weighted directed graph. It works well for sparse graphs and handles negative edge weights (but not negative cycles). Algorithm Steps: 1. Add a new vertex connected to all vertices with weight 0 2. Run Bellman-Ford from the new vertex to detect negative cycles and get potentials -3. Reweight all edges using the potentials to make them non-negative +3. Reweight all edges using the potentials to make them non-negative 4. Run Dijkstra from each vertex on the reweighted graph 5. Convert distances back using the original potentials @@ -28,71 +28,71 @@ class Graph: """ A weighted directed graph implementation using adjacency list representation. """ - + def __init__(self): """Initialize an empty graph.""" self.adjacency_list = defaultdict(list) self.vertices = set() self._edge_count = 0 - + def add_edge(self, source: Any, destination: Any, weight: float) -> None: """Add a weighted edge to the graph.""" self.adjacency_list[source].append((destination, weight)) self.vertices.add(source) self.vertices.add(destination) self._edge_count += 1 - + def add_edges(self, edges: List[Tuple[Any, Any, float]]) -> None: """Add multiple edges to the graph.""" for source, destination, weight in edges: self.add_edge(source, destination, weight) - + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: """Get all neighbors of a vertex with their edge weights.""" return self.adjacency_list.get(vertex, []) - + def get_vertices(self) -> set: """Get all vertices in the graph.""" return self.vertices.copy() - + def get_vertex_count(self) -> int: """Get the number of vertices in the graph.""" return len(self.vertices) - + def get_edge_count(self) -> int: """Get the number of edges in the graph.""" return self._edge_count - + def has_vertex(self, vertex: Any) -> bool: """Check if a vertex exists in the graph.""" return vertex in self.vertices - + def has_edge(self, source: Any, destination: Any) -> bool: """Check if an edge exists between two vertices.""" for neighbor, _ in self.adjacency_list.get(source, []): if neighbor == destination: return True return False - + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: """Get the weight of an edge between two vertices.""" for neighbor, weight in self.adjacency_list.get(source, []): if neighbor == destination: return weight return None - + def is_empty(self) -> bool: """Check if the graph is empty (no vertices).""" return len(self.vertices) == 0 - - def copy(self) -> 'Graph': + + def copy(self) -> "Graph": """Create a copy of the graph.""" new_graph = Graph() for vertex in self.adjacency_list: for neighbor, weight in self.adjacency_list[vertex]: new_graph.add_edge(vertex, neighbor, weight) return new_graph - + def to_dict(self) -> Dict[Any, List[Tuple[Any, float]]]: """Convert the graph to a dictionary representation.""" return dict(self.adjacency_list) @@ -101,15 +101,15 @@ def to_dict(self) -> Dict[Any, List[Tuple[Any, float]]]: class JohnsonsAlgorithm: """ Implementation of Johnson's Algorithm for All-Pairs Shortest Path problem. - + This class handles weighted directed graphs and finds shortest paths between all pairs of vertices efficiently, even with negative edge weights. """ - + def __init__(self, graph: Graph): """ Initialize Johnson's Algorithm with a graph. - + Args: graph: The weighted directed graph to process """ @@ -117,143 +117,149 @@ def __init__(self, graph: Graph): self.potentials = {} self.reweighted_graph = None self.all_pairs_distances = {} - + def find_all_pairs_shortest_paths(self) -> Optional[Dict[Any, Dict[Any, float]]]: """ Find shortest paths between all pairs of vertices using Johnson's Algorithm. - + Returns: Dictionary of source -> {destination -> distance}, or None if negative cycle exists """ if self.original_graph.is_empty(): return {} - + # Step 1: Create graph with extra vertex graph_with_extra = self._create_graph_with_extra_vertex() extra_vertex = self._get_extra_vertex_name() - + # Step 2: Run Bellman-Ford from extra vertex to get potentials print("Running Bellman-Ford to compute potentials...") if not self._compute_potentials(graph_with_extra, extra_vertex): return None # Negative cycle detected - + # Step 3: Reweight edges to make them non-negative print("Reweighting edges...") self._reweight_edges() - + # Step 4: Run Dijkstra from each vertex print("Running Dijkstra from each vertex...") self._compute_all_pairs_distances() - + # Step 5: Convert distances back to original weights self._convert_distances_to_original() - + return self.all_pairs_distances - + def _get_extra_vertex_name(self) -> str: """Generate a unique name for the extra vertex.""" base_name = "extra_vertex_johnson" extra_vertex = base_name counter = 0 - + while self.original_graph.has_vertex(extra_vertex): counter += 1 extra_vertex = f"{base_name}_{counter}" - + return extra_vertex - + def _create_graph_with_extra_vertex(self) -> Graph: """Create a copy of the original graph with an extra vertex.""" graph_with_extra = self.original_graph.copy() extra_vertex = self._get_extra_vertex_name() - + # Add edges from extra vertex to all original vertices with weight 0 for vertex in self.original_graph.get_vertices(): graph_with_extra.add_edge(extra_vertex, vertex, 0) - + return graph_with_extra - + def _compute_potentials(self, graph_with_extra: Graph, extra_vertex: str) -> bool: """Compute potentials using Bellman-Ford algorithm.""" bellman_ford = BellmanFord(graph_with_extra) distances = bellman_ford.find_shortest_paths(extra_vertex) - + if distances is None: return False # Negative cycle detected - + # Store potentials (exclude extra vertex) self.potentials = {v: distances[v] for v in self.original_graph.get_vertices()} return True - + def _reweight_edges(self) -> None: """Reweight all edges to make them non-negative using the computed potentials.""" self.reweighted_graph = Graph() - + for vertex in self.original_graph.get_vertices(): for neighbor, weight in self.original_graph.get_neighbors(vertex): # Reweight the edge - new_weight = weight + self.potentials[vertex] - self.potentials[neighbor] + new_weight = ( + weight + self.potentials[vertex] - self.potentials[neighbor] + ) self.reweighted_graph.add_edge(vertex, neighbor, new_weight) - + def _compute_all_pairs_distances(self) -> None: """Run Dijkstra's algorithm from each vertex on the reweighted graph.""" self.all_pairs_distances = {} - + for start_vertex in self.original_graph.get_vertices(): dijkstra = Dijkstra(self.reweighted_graph) distances = dijkstra.find_shortest_paths(start_vertex) self.all_pairs_distances[start_vertex] = distances - + def _convert_distances_to_original(self) -> None: """Convert the distances back to original weights using potentials.""" for start_vertex in self.all_pairs_distances: original_distances = {} - + for end_vertex in self.all_pairs_distances[start_vertex]: reweighted_distance = self.all_pairs_distances[start_vertex][end_vertex] - - if reweighted_distance != float('inf'): + + if reweighted_distance != float("inf"): # Convert back to original distance - original_distance = (reweighted_distance - - self.potentials[start_vertex] + - self.potentials[end_vertex]) + original_distance = ( + reweighted_distance + - self.potentials[start_vertex] + + self.potentials[end_vertex] + ) original_distances[end_vertex] = original_distance else: - original_distances[end_vertex] = float('inf') - + original_distances[end_vertex] = float("inf") + self.all_pairs_distances[start_vertex] = original_distances - + def get_shortest_path(self, source: Any, destination: Any) -> Optional[float]: """Get the shortest distance between two specific vertices.""" if source in self.all_pairs_distances: return self.all_pairs_distances[source].get(destination) return None - + def has_negative_cycle(self) -> bool: """Check if the original graph contains a negative cycle.""" return self.all_pairs_distances is None - + def get_potentials(self) -> Dict[Any, float]: """Get the computed potentials for all vertices.""" return self.potentials.copy() - + def get_reweighted_graph(self) -> Optional[Graph]: """Get the reweighted graph with non-negative edge weights.""" return self.reweighted_graph - - def print_all_pairs_distances(self, distances: Optional[Dict[Any, Dict[Any, float]]] = None) -> None: + + def print_all_pairs_distances( + self, distances: Optional[Dict[Any, Dict[Any, float]]] = None + ) -> None: """Print the all-pairs shortest path distances in a readable format.""" if distances is None: distances = self.all_pairs_distances - + if distances is None: print("No solution due to negative cycle!") return - - print("\n" + "="*60) + + print("\n" + "=" * 60) print("ALL-PAIRS SHORTEST PATH DISTANCES (Johnson's Algorithm)") - print("="*60) - + print("=" * 60) + # Print header vertices_list = sorted(list(self.original_graph.get_vertices()), key=str) print(f"{'From\\To':<8}", end="") @@ -261,43 +267,43 @@ def print_all_pairs_distances(self, distances: Optional[Dict[Any, Dict[Any, floa print(f"{str(vertex):<8}", end="") print() print("-" * (8 + len(vertices_list) * 8)) - + # Print distances for source in vertices_list: print(f"{str(source):<8}", end="") for destination in vertices_list: if source in distances and destination in distances[source]: dist = distances[source][destination] - if dist == float('inf'): + if dist == float("inf"): print(f"{'∞':<8}", end="") else: print(f"{dist:<8}", end="") else: print(f"{'N/A':<8}", end="") print() - + @staticmethod def is_suitable_for_johnson(graph: Graph) -> tuple: """Check if Johnson's algorithm is suitable for the given graph.""" if graph.is_empty(): return False, "Graph is empty" - + # Check for negative cycles using a simple Bellman-Ford test temp_graph = graph.copy() extra_vertex = "temp_extra" for vertex in graph.get_vertices(): temp_graph.add_edge(extra_vertex, vertex, 0) - + bellman_ford = BellmanFord(temp_graph) result = bellman_ford.find_shortest_paths(extra_vertex) - + if result is None: return False, "Graph contains negative cycles" - + # Johnson's is particularly good for sparse graphs vertex_count = graph.get_vertex_count() edge_count = graph.get_edge_count() - + if edge_count < vertex_count * vertex_count / 4: return True, "Suitable: Sparse graph with no negative cycles" else: diff --git a/graphs/tests/test_johnsons_algorithm.py b/graphs/tests/test_johnsons_algorithm.py index ad45e7da37f4..1a931cd80c48 100644 --- a/graphs/tests/test_johnsons_algorithm.py +++ b/graphs/tests/test_johnsons_algorithm.py @@ -16,7 +16,9 @@ import os # Add parent directory to path to import modules -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +sys.path.append( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) from graphs.johnsons_algorithm import Graph, JohnsonsAlgorithm from graphs.bellman_ford import BellmanFord @@ -25,22 +27,22 @@ class TestGraph(unittest.TestCase): """Test cases for the Graph class.""" - + def setUp(self): """Set up test fixtures before each test method.""" self.graph = Graph() - + def test_empty_graph(self): """Test empty graph properties.""" self.assertTrue(self.graph.is_empty()) self.assertEqual(self.graph.get_vertex_count(), 0) self.assertEqual(self.graph.get_edge_count(), 0) self.assertEqual(len(self.graph.get_vertices()), 0) - + def test_add_single_edge(self): """Test adding a single edge.""" self.graph.add_edge(1, 2, 5.0) - + self.assertFalse(self.graph.is_empty()) self.assertEqual(self.graph.get_vertex_count(), 2) self.assertEqual(self.graph.get_edge_count(), 1) @@ -49,45 +51,45 @@ def test_add_single_edge(self): self.assertTrue(self.graph.has_edge(1, 2)) self.assertFalse(self.graph.has_edge(2, 1)) self.assertEqual(self.graph.get_edge_weight(1, 2), 5.0) - + def test_add_multiple_edges(self): """Test adding multiple edges.""" edges = [(1, 2, -1), (2, 3, 4), (3, 1, 2)] self.graph.add_edges(edges) - + self.assertEqual(self.graph.get_vertex_count(), 3) self.assertEqual(self.graph.get_edge_count(), 3) self.assertEqual(self.graph.get_edge_weight(1, 2), -1) self.assertEqual(self.graph.get_edge_weight(2, 3), 4) self.assertEqual(self.graph.get_edge_weight(3, 1), 2) - + def test_neighbors(self): """Test getting neighbors of vertices.""" self.graph.add_edge(1, 2, 3) self.graph.add_edge(1, 3, 4) self.graph.add_edge(2, 3, 1) - + neighbors_1 = self.graph.get_neighbors(1) self.assertEqual(len(neighbors_1), 2) self.assertIn((2, 3), neighbors_1) self.assertIn((3, 4), neighbors_1) - + neighbors_2 = self.graph.get_neighbors(2) self.assertEqual(len(neighbors_2), 1) self.assertIn((3, 1), neighbors_2) - + def test_copy_graph(self): """Test copying a graph.""" self.graph.add_edge(1, 2, 5) self.graph.add_edge(2, 3, -2) - + copy_graph = self.graph.copy() - + self.assertEqual(copy_graph.get_vertex_count(), self.graph.get_vertex_count()) self.assertEqual(copy_graph.get_edge_count(), self.graph.get_edge_count()) self.assertEqual(copy_graph.get_edge_weight(1, 2), 5) self.assertEqual(copy_graph.get_edge_weight(2, 3), -2) - + # Modify original, copy should remain unchanged self.graph.add_edge(3, 4, 1) self.assertEqual(self.graph.get_edge_count(), 3) @@ -96,83 +98,83 @@ def test_copy_graph(self): class TestBellmanFord(unittest.TestCase): """Test cases for the Bellman-Ford algorithm.""" - + def setUp(self): """Set up test fixtures before each test method.""" self.graph = Graph() - + def test_single_vertex(self): """Test Bellman-Ford with a single vertex.""" self.graph.add_edge(1, 1, 0) # Self-loop bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) - + self.assertIsNotNone(distances) self.assertEqual(distances[1], 0) - + def test_simple_path(self): """Test Bellman-Ford with a simple path.""" self.graph.add_edge(1, 2, 3) self.graph.add_edge(2, 3, 4) - + bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) - + self.assertIsNotNone(distances) self.assertEqual(distances[1], 0) self.assertEqual(distances[2], 3) self.assertEqual(distances[3], 7) - + def test_negative_edges(self): """Test Bellman-Ford with negative edges but no negative cycles.""" self.graph.add_edge(1, 2, -1) self.graph.add_edge(2, 3, 4) self.graph.add_edge(1, 3, 2) - + bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) - + self.assertIsNotNone(distances) self.assertEqual(distances[1], 0) self.assertEqual(distances[2], -1) self.assertEqual(distances[3], 2) # min(2, -1+4) = 2 - + def test_negative_cycle_detection(self): """Test Bellman-Ford negative cycle detection.""" # Create a negative cycle: 1 -> 2 -> 3 -> 1 with total weight -1 self.graph.add_edge(1, 2, 1) self.graph.add_edge(2, 3, -4) self.graph.add_edge(3, 1, 2) - + bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) - + self.assertIsNone(distances) # Should detect negative cycle - + def test_unreachable_vertices(self): """Test Bellman-Ford with unreachable vertices.""" self.graph.add_edge(1, 2, 1) self.graph.add_edge(3, 4, 1) # Separate component - + bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) - + self.assertIsNotNone(distances) self.assertEqual(distances[1], 0) self.assertEqual(distances[2], 1) - self.assertEqual(distances[3], float('inf')) - self.assertEqual(distances[4], float('inf')) - + self.assertEqual(distances[3], float("inf")) + self.assertEqual(distances[4], float("inf")) + def test_get_path(self): """Test path reconstruction in Bellman-Ford.""" self.graph.add_edge(1, 2, 1) self.graph.add_edge(2, 3, 2) self.graph.add_edge(1, 3, 5) - + bf = BellmanFord(self.graph) distances = bf.find_shortest_paths(1) path = bf.get_path(1, 3) - + self.assertIsNotNone(distances) self.assertEqual(path, [1, 2, 3]) # Should use shorter path self.assertEqual(distances[3], 3) @@ -180,108 +182,107 @@ def test_get_path(self): class TestDijkstra(unittest.TestCase): """Test cases for Dijkstra's algorithm.""" - + def setUp(self): """Set up test fixtures before each test method.""" self.graph = Graph() - + def test_simple_path(self): """Test Dijkstra with a simple path.""" self.graph.add_edge(1, 2, 3) self.graph.add_edge(2, 3, 4) self.graph.add_edge(1, 3, 10) - + dijkstra = Dijkstra(self.graph) distances = dijkstra.find_shortest_paths(1) - + self.assertEqual(distances[1], 0) self.assertEqual(distances[2], 3) self.assertEqual(distances[3], 7) # 3 + 4 < 10 - + def test_complex_graph(self): """Test Dijkstra with a more complex graph.""" edges = [ - (1, 2, 4), (1, 3, 2), - (2, 3, 1), (2, 4, 5), - (3, 4, 8), (3, 5, 10), - (4, 5, 2) + (1, 2, 4), + (1, 3, 2), + (2, 3, 1), + (2, 4, 5), + (3, 4, 8), + (3, 5, 10), + (4, 5, 2), ] self.graph.add_edges(edges) - + dijkstra = Dijkstra(self.graph) distances = dijkstra.find_shortest_paths(1) - + self.assertEqual(distances[1], 0) self.assertEqual(distances[2], 4) # Direct edge 1->2: 4 self.assertEqual(distances[3], 2) self.assertEqual(distances[4], 9) # 1->2->4: 4+5 = 9 - self.assertEqual(distances[5], 11) # 1->2->4->5: 4+5+2 = 11 - + self.assertEqual(distances[5], 11) # 1->2->4->5: 4+5+2 = 11 + def test_unreachable_vertices(self): """Test Dijkstra with unreachable vertices.""" self.graph.add_edge(1, 2, 1) self.graph.add_edge(3, 4, 1) - + dijkstra = Dijkstra(self.graph) distances = dijkstra.find_shortest_paths(1) - + self.assertEqual(distances[1], 0) self.assertEqual(distances[2], 1) - self.assertEqual(distances[3], float('inf')) - self.assertEqual(distances[4], float('inf')) - + self.assertEqual(distances[3], float("inf")) + self.assertEqual(distances[4], float("inf")) + def test_get_path(self): """Test path reconstruction in Dijkstra.""" self.graph.add_edge(1, 2, 1) self.graph.add_edge(2, 3, 2) self.graph.add_edge(1, 3, 5) - + dijkstra = Dijkstra(self.graph) distances = dijkstra.find_shortest_paths(1) path = dijkstra.get_path(1, 3) - + self.assertEqual(path, [1, 2, 3]) self.assertEqual(distances[3], 3) class TestJohnsonsAlgorithm(unittest.TestCase): """Test cases for Johnson's Algorithm.""" - + def setUp(self): """Set up test fixtures before each test method.""" self.graph = Graph() - + def test_empty_graph(self): """Test Johnson's algorithm with empty graph.""" johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertEqual(result, {}) - + def test_single_vertex(self): """Test Johnson's algorithm with single vertex.""" # Need at least one edge to create a vertex self.graph.add_edge(1, 1, 0) johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNotNone(result) self.assertEqual(result[1][1], 0) - + def test_simple_graph_positive_weights(self): """Test Johnson's algorithm with positive weights only.""" - edges = [ - (1, 2, 3), - (2, 3, 4), - (1, 3, 10) - ] + edges = [(1, 2, 3), (2, 3, 4), (1, 3, 10)] self.graph.add_edges(edges) - + johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNotNone(result) - + # Check some key distances self.assertEqual(result[1][1], 0) self.assertEqual(result[1][2], 3) @@ -289,87 +290,82 @@ def test_simple_graph_positive_weights(self): self.assertEqual(result[2][2], 0) self.assertEqual(result[2][3], 4) self.assertEqual(result[3][3], 0) - + # Unreachable paths should be infinity - self.assertEqual(result[2][1], float('inf')) - self.assertEqual(result[3][1], float('inf')) - self.assertEqual(result[3][2], float('inf')) - + self.assertEqual(result[2][1], float("inf")) + self.assertEqual(result[3][1], float("inf")) + self.assertEqual(result[3][2], float("inf")) + def test_graph_with_negative_weights(self): """Test Johnson's algorithm with negative weights (no negative cycles).""" - edges = [ - (1, 2, -1), - (1, 3, 4), - (2, 3, 3), - (2, 4, 2), - (3, 4, -5), - (4, 1, 6) - ] + edges = [(1, 2, -1), (1, 3, 4), (2, 3, 3), (2, 4, 2), (3, 4, -5), (4, 1, 6)] self.graph.add_edges(edges) - + johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNotNone(result) - + # Test some specific shortest paths self.assertEqual(result[1][1], 0) self.assertEqual(result[1][2], -1) - self.assertEqual(result[1][4], -3) # 1->3->4: 4+(-5) = -1, but 1->2->3->4: -1+3+(-5) = -3 + self.assertEqual( + result[1][4], -3 + ) # 1->3->4: 4+(-5) = -1, but 1->2->3->4: -1+3+(-5) = -3 self.assertEqual(result[3][4], -5) - + def test_negative_cycle_detection(self): """Test Johnson's algorithm negative cycle detection.""" # Create a negative cycle edges = [ (1, 2, 1), (2, 3, -4), - (3, 1, 2) # Total cycle weight: 1 + (-4) + 2 = -1 + (3, 1, 2), # Total cycle weight: 1 + (-4) + 2 = -1 ] self.graph.add_edges(edges) - + johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNone(result) # Should detect negative cycle # Note: has_negative_cycle only returns True after find_all_pairs_shortest_paths is called and returns None - + def test_string_vertices(self): """Test Johnson's algorithm with string vertices.""" edges = [ - ('A', 'B', 1), - ('A', 'C', 4), - ('B', 'C', -3), - ('B', 'D', 2), - ('C', 'D', 3), - ('D', 'A', -1) + ("A", "B", 1), + ("A", "C", 4), + ("B", "C", -3), + ("B", "D", 2), + ("C", "D", 3), + ("D", "A", -1), ] self.graph.add_edges(edges) - + johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNotNone(result) - + # Test some specific distances - self.assertEqual(result['A']['A'], 0) - self.assertEqual(result['A']['B'], 1) - self.assertEqual(result['A']['C'], -2) # A->B->C: 1+(-3) = -2 - self.assertEqual(result['B']['C'], -3) - self.assertEqual(result['D']['A'], -1) - + self.assertEqual(result["A"]["A"], 0) + self.assertEqual(result["A"]["B"], 1) + self.assertEqual(result["A"]["C"], -2) # A->B->C: 1+(-3) = -2 + self.assertEqual(result["B"]["C"], -3) + self.assertEqual(result["D"]["A"], -1) + def test_get_shortest_path_method(self): """Test the get_shortest_path convenience method.""" edges = [(1, 2, 3), (2, 3, 4)] self.graph.add_edges(edges) - + johnson = JohnsonsAlgorithm(self.graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertEqual(johnson.get_shortest_path(1, 3), 7) self.assertEqual(johnson.get_shortest_path(1, 1), 0) - self.assertEqual(johnson.get_shortest_path(2, 1), float('inf')) - + self.assertEqual(johnson.get_shortest_path(2, 1), float("inf")) + def test_algorithm_suitability_check(self): """Test the static method for checking algorithm suitability.""" # Empty graph @@ -377,14 +373,14 @@ def test_algorithm_suitability_check(self): suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(empty_graph) self.assertFalse(suitable) self.assertIn("empty", reason.lower()) - + # Graph with negative cycle neg_cycle_graph = Graph() neg_cycle_graph.add_edges([(1, 2, 1), (2, 3, -4), (3, 1, 2)]) suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(neg_cycle_graph) self.assertFalse(suitable) self.assertIn("negative cycle", reason.lower()) - + # Good graph for Johnson's good_graph = Graph() good_graph.add_edges([(1, 2, -1), (2, 3, 4), (3, 4, 2)]) @@ -394,49 +390,49 @@ def test_algorithm_suitability_check(self): class TestIntegration(unittest.TestCase): """Integration tests comparing Johnson's with individual algorithms.""" - + def test_johnson_vs_dijkstra_positive_weights(self): """Compare Johnson's with Dijkstra on positive weight graph.""" graph = Graph() edges = [(1, 2, 3), (2, 3, 4), (1, 3, 10), (3, 4, 1)] graph.add_edges(edges) - + # Johnson's algorithm johnson = JohnsonsAlgorithm(graph) johnson_result = johnson.find_all_pairs_shortest_paths() - + # Dijkstra from vertex 1 dijkstra = Dijkstra(graph) dijkstra_distances = dijkstra.find_shortest_paths(1) - + # Compare results for vertex 1 for vertex in graph.get_vertices(): self.assertEqual( - johnson_result[1][vertex], + johnson_result[1][vertex], dijkstra_distances[vertex], - f"Mismatch for vertex {vertex}" + f"Mismatch for vertex {vertex}", ) - + def test_performance_comparison(self): """Basic performance test to ensure algorithms complete.""" # Create a moderately sized graph graph = Graph() - + # Create a grid-like graph for i in range(1, 6): for j in range(1, 6): if i < 5: - graph.add_edge(i*5 + j, (i+1)*5 + j, 1) + graph.add_edge(i * 5 + j, (i + 1) * 5 + j, 1) if j < 5: - graph.add_edge(i*5 + j, i*5 + (j+1), 1) - + graph.add_edge(i * 5 + j, i * 5 + (j + 1), 1) + # Add some negative edges graph.add_edge(6, 11, -2) graph.add_edge(12, 17, -1) - + johnson = JohnsonsAlgorithm(graph) result = johnson.find_all_pairs_shortest_paths() - + self.assertIsNotNone(result) self.assertEqual(len(result), graph.get_vertex_count()) @@ -446,44 +442,46 @@ def run_tests(): # Create test suite test_classes = [ TestGraph, - TestBellmanFord, + TestBellmanFord, TestDijkstra, TestJohnsonsAlgorithm, - TestIntegration + TestIntegration, ] - + loader = unittest.TestLoader() suite = unittest.TestSuite() - + for test_class in test_classes: tests = loader.loadTestsFromTestCase(test_class) suite.addTests(tests) - + # Run tests runner = unittest.TextTestRunner(verbosity=2) result = runner.run(suite) - + # Print summary - print(f"\n{'='*60}") + print(f"\n{'=' * 60}") print("TEST SUMMARY") - print(f"{'='*60}") + print(f"{'=' * 60}") print(f"Tests run: {result.testsRun}") print(f"Failures: {len(result.failures)}") print(f"Errors: {len(result.errors)}") - + if result.failures: print(f"\nFAILURES:") for test, traceback in result.failures: print(f"- {test}: {traceback.split('AssertionError:')[-1].strip()}") - + if result.errors: print(f"\nERRORS:") for test, traceback in result.errors: print(f"- {test}: {traceback.split('Exception:')[-1].strip()}") - - success_rate = ((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun) * 100 + + success_rate = ( + (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun + ) * 100 print(f"\nSuccess rate: {success_rate:.1f}%") - + return result.wasSuccessful()