diff --git a/graphs/bellman_ford.py b/graphs/bellman_ford.py index 9ac8bae85d4f..34cecf3c8a78 100644 --- a/graphs/bellman_ford.py +++ b/graphs/bellman_ford.py @@ -1,73 +1,184 @@ -from __future__ import annotations +""" +Bellman-Ford Algorithm implementation for single-source shortest path. +The Bellman-Ford algorithm can handle negative edge weights and detect negative cycles. +It has O(VE) time complexity where V is the number of vertices and E is the number of edges. -def print_distance(distance: list[float], src): - print(f"Vertex\tShortest Distance from vertex {src}") - for i, d in enumerate(distance): - print(f"{i}\t\t{d}") +Author: Zakariae Fakhri +Date: August 2025 +""" +from typing import Dict, Any, Optional, List, Tuple +from collections import defaultdict -def check_negative_cycle( - graph: list[dict[str, int]], distance: list[float], edge_count: int -): - for j in range(edge_count): - u, v, w = (graph[j][k] for k in ["src", "dst", "weight"]) - if distance[u] != float("inf") and distance[u] + w < distance[v]: - return True - return False - -def bellman_ford( - graph: list[dict[str, int]], vertex_count: int, edge_count: int, src: int -) -> list[float]: +class Graph: """ - Returns shortest paths from a vertex src to all - other vertices. - >>> edges = [(2, 1, -10), (3, 2, 3), (0, 3, 5), (0, 1, 4)] - >>> g = [{"src": s, "dst": d, "weight": w} for s, d, w in edges] - >>> bellman_ford(g, 4, 4, 0) - [0.0, -2.0, 8.0, 5.0] - >>> g = [{"src": s, "dst": d, "weight": w} for s, d, w in edges + [(1, 3, 5)]] - >>> bellman_ford(g, 4, 5, 0) - Traceback (most recent call last): - ... - Exception: Negative cycle found + A lightweight graph class for algorithms that don't have access to the main Graph class. """ - distance = [float("inf")] * vertex_count - distance[src] = 0.0 - for _ in range(vertex_count - 1): - for j in range(edge_count): - u, v, w = (graph[j][k] for k in ["src", "dst", "weight"]) + def __init__(self): + self.adjacency_list = defaultdict(list) + self.vertices = set() + + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) - if distance[u] != float("inf") and distance[u] + w < distance[v]: - distance[v] = distance[u] + w + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + return self.adjacency_list.get(vertex, []) - negative_cycle_exists = check_negative_cycle(graph, distance, edge_count) - if negative_cycle_exists: - raise Exception("Negative cycle found") + def get_vertices(self) -> set: + return self.vertices.copy() - return distance + def has_vertex(self, vertex: Any) -> bool: + return vertex in self.vertices + def get_vertex_count(self) -> int: + return len(self.vertices) -if __name__ == "__main__": - import doctest - doctest.testmod() +class BellmanFord: + """ + Bellman-Ford algorithm implementation for single-source shortest path. - V = int(input("Enter number of vertices: ").strip()) - E = int(input("Enter number of edges: ").strip()) + This algorithm can handle graphs with negative edge weights and can detect + negative cycles. It's particularly useful as a preprocessing step in + Johnson's algorithm. + """ - graph: list[dict[str, int]] = [{} for _ in range(E)] + def __init__(self, graph): + """ + Initialize the Bellman-Ford algorithm with a graph. + + Args: + graph: The weighted directed graph to process + """ + self.graph = graph + self.distances = {} + self.predecessors = {} + + def find_shortest_paths(self, start_vertex: Any) -> Optional[Dict[Any, float]]: + """ + Find shortest paths from start_vertex to all other vertices. + + Args: + start_vertex: The source vertex to start from - for i in range(E): - print("Edge ", i + 1) - src, dest, weight = ( - int(x) - for x in input("Enter source, destination, weight: ").strip().split(" ") - ) - graph[i] = {"src": src, "dst": dest, "weight": weight} + Returns: + Dictionary of vertex -> shortest distance, or None if negative cycle exists + """ + if not self.graph.has_vertex(start_vertex): + raise ValueError(f"Start vertex {start_vertex} not found in graph") + + # Initialize distances + self.distances = {vertex: float("inf") for vertex in self.graph.get_vertices()} + self.distances[start_vertex] = 0 + self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} + + # Relax edges V-1 times + vertex_count = self.graph.get_vertex_count() + + for iteration in range(vertex_count - 1): + updated = False + + for vertex in self.graph.get_vertices(): + if self.distances[vertex] != float("inf"): + for neighbor, weight in self.graph.get_neighbors(vertex): + new_distance = self.distances[vertex] + weight + + if new_distance < self.distances[neighbor]: + self.distances[neighbor] = new_distance + self.predecessors[neighbor] = vertex + updated = True + + # Early termination if no updates in this iteration + if not updated: + break + + # Check for negative cycles + if self._has_negative_cycle(): + return None + + return self.distances.copy() + + def _has_negative_cycle(self) -> bool: + """ + Check if the graph contains a negative cycle. + + Returns: + True if negative cycle exists, False otherwise + """ + for vertex in self.graph.get_vertices(): + if self.distances[vertex] != float("inf"): + for neighbor, weight in self.graph.get_neighbors(vertex): + if self.distances[vertex] + weight < self.distances[neighbor]: + return True + return False + + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: + """ + Get the shortest path from start_vertex to end_vertex. + + Args: + start_vertex: Source vertex + end_vertex: Destination vertex + + Returns: + List of vertices representing the path, or None if no path exists + """ + if not self.distances or end_vertex not in self.distances: + return None + + if self.distances[end_vertex] == float("inf"): + return None + + path = [] + current = end_vertex + + while current is not None: + path.append(current) + current = self.predecessors.get(current) + + path.reverse() + + # Verify the path starts with start_vertex + if path[0] != start_vertex: + return None + + return path + + def get_distance(self, vertex: Any) -> float: + """ + Get the shortest distance to a specific vertex. + + Args: + vertex: The target vertex + + Returns: + Shortest distance to the vertex + """ + return self.distances.get(vertex, float("inf")) + + @staticmethod + def detect_negative_cycle(graph) -> bool: + """ + Static method to detect if a graph contains a negative cycle. + + Args: + graph: The graph to check + + Returns: + True if negative cycle exists, False otherwise + """ + vertices = graph.get_vertices() + if not vertices: + return False - source = int(input("\nEnter shortest path source:").strip()) - shortest_distance = bellman_ford(graph, V, E, source) - print_distance(shortest_distance, 0) + # Pick any vertex as start + start_vertex = next(iter(vertices)) + bf = BellmanFord(graph) + result = bf.find_shortest_paths(start_vertex) + + return result is None diff --git a/graphs/dijkstra.py b/graphs/dijkstra.py index 87e9d2233bb2..0d1b0e3715e6 100644 --- a/graphs/dijkstra.py +++ b/graphs/dijkstra.py @@ -1,119 +1,219 @@ """ -pseudo-code - -DIJKSTRA(graph G, start vertex s, destination vertex d): - -//all nodes initially unexplored - -1 - let H = min heap data structure, initialized with 0 and s [here 0 indicates - the distance from start vertex s] -2 - while H is non-empty: -3 - remove the first node and cost of H, call it U and cost -4 - if U has been previously explored: -5 - go to the while loop, line 2 //Once a node is explored there is no need - to make it again -6 - mark U as explored -7 - if U is d: -8 - return cost // total cost from start to destination vertex -9 - for each edge(U, V): c=cost of edge(U,V) // for V in graph[U] -10 - if V explored: -11 - go to next V in line 9 -12 - total_cost = cost + c -13 - add (total_cost,V) to H - -You can think at cost as a distance where Dijkstra finds the shortest distance -between vertices s and v in a graph G. The use of a min heap as H guarantees -that if a vertex has already been explored there will be no other path with -shortest distance, that happens because heapq.heappop will always return the -next vertex with the shortest distance, considering that the heap stores not -only the distance between previous vertex and current vertex but the entire -distance between each vertex that makes up the path from start vertex to target -vertex. +Dijkstra's Algorithm implementation for single-source shortest path. + +Dijkstra's algorithm finds shortest paths from a source vertex to all other vertices +in a weighted graph with non-negative edge weights. It has O(V log V + E) time complexity +when using a binary heap. + +Author: Zakaria Fakhri +Date: August 2025 """ import heapq +from typing import Dict, Any, List, Optional, Set, Tuple +from collections import defaultdict + + +class Graph: + """ + A lightweight graph class for algorithms that don't have access to the main Graph class. + """ + + def __init__(self): + self.adjacency_list = defaultdict(list) + self.vertices = set() + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) -def dijkstra(graph, start, end): - """Return the cost of the shortest path between vertices start and end. + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + return self.adjacency_list.get(vertex, []) + + def get_vertices(self) -> set: + return self.vertices.copy() + + def has_vertex(self, vertex: Any) -> bool: + return vertex in self.vertices + + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: + for neighbor, weight in self.adjacency_list.get(source, []): + if neighbor == destination: + return weight + return None + + +class Dijkstra: + """ + Dijkstra's algorithm implementation for single-source shortest path. - >>> dijkstra(G, "E", "C") - 6 - >>> dijkstra(G2, "E", "F") - 3 - >>> dijkstra(G3, "E", "F") - 3 + This algorithm works on graphs with non-negative edge weights and provides + optimal shortest paths. It's used as a component in Johnson's algorithm + after edge reweighting. """ - heap = [(0, start)] # cost from start node,end node - visited = set() - while heap: - (cost, u) = heapq.heappop(heap) - if u in visited: - continue - visited.add(u) - if u == end: - return cost - for v, c in graph[u]: - if v in visited: + def __init__(self, graph): + """ + Initialize Dijkstra's algorithm with a graph. + + Args: + graph: The weighted directed graph with non-negative weights + """ + self.graph = graph + self.distances = {} + self.predecessors = {} + self.visited = set() + + def find_shortest_paths(self, start_vertex: Any) -> Dict[Any, float]: + """ + Find shortest paths from start_vertex to all other vertices. + + Args: + start_vertex: The source vertex to start from + + Returns: + Dictionary of vertex -> shortest distance + + Raises: + ValueError: If start_vertex is not in the graph + """ + if not self.graph.has_vertex(start_vertex): + raise ValueError(f"Start vertex {start_vertex} not found in graph") + + # Initialize distances and data structures + self.distances = {vertex: float("inf") for vertex in self.graph.get_vertices()} + self.distances[start_vertex] = 0 + self.predecessors = {vertex: None for vertex in self.graph.get_vertices()} + self.visited = set() + + # Priority queue: (distance, vertex) + priority_queue = [(0, start_vertex)] + + while priority_queue: + current_distance, current_vertex = heapq.heappop(priority_queue) + + # Skip if already visited (handles duplicate entries in heap) + if current_vertex in self.visited: continue - next_item = cost + c - heapq.heappush(heap, (next_item, v)) - return -1 - - -G = { - "A": [["B", 2], ["C", 5]], - "B": [["A", 2], ["D", 3], ["E", 1], ["F", 1]], - "C": [["A", 5], ["F", 3]], - "D": [["B", 3]], - "E": [["B", 4], ["F", 3]], - "F": [["C", 3], ["E", 3]], -} - -r""" -Layout of G2: - -E -- 1 --> B -- 1 --> C -- 1 --> D -- 1 --> F - \ /\ - \ || - ----------------- 3 -------------------- -""" -G2 = { - "B": [["C", 1]], - "C": [["D", 1]], - "D": [["F", 1]], - "E": [["B", 1], ["F", 3]], - "F": [], -} - -r""" -Layout of G3: - -E -- 1 --> B -- 1 --> C -- 1 --> D -- 1 --> F - \ /\ - \ || - -------- 2 ---------> G ------- 1 ------ -""" -G3 = { - "B": [["C", 1]], - "C": [["D", 1]], - "D": [["F", 1]], - "E": [["B", 1], ["G", 2]], - "F": [], - "G": [["F", 1]], -} -short_distance = dijkstra(G, "E", "C") -print(short_distance) # E -- 3 --> F -- 3 --> C == 6 + # Mark current vertex as visited + self.visited.add(current_vertex) + + # Process all neighbors + for neighbor, weight in self.graph.get_neighbors(current_vertex): + if neighbor not in self.visited: + new_distance = current_distance + weight + + # Relaxation step + if new_distance < self.distances[neighbor]: + self.distances[neighbor] = new_distance + self.predecessors[neighbor] = current_vertex + heapq.heappush(priority_queue, (new_distance, neighbor)) + + return self.distances.copy() + + def get_path(self, start_vertex: Any, end_vertex: Any) -> Optional[List[Any]]: + """ + Get the shortest path from start_vertex to end_vertex. + + Args: + start_vertex: Source vertex + end_vertex: Destination vertex + + Returns: + List of vertices representing the path, or None if no path exists + """ + if not self.distances or end_vertex not in self.distances: + return None + + if self.distances[end_vertex] == float("inf"): + return None + + path = [] + current = end_vertex + + while current is not None: + path.append(current) + current = self.predecessors.get(current) + + path.reverse() + + # Verify the path starts with start_vertex + if path and path[0] != start_vertex: + return None + + return path if path else None + + def get_distance(self, vertex: Any) -> float: + """ + Get the shortest distance to a specific vertex. + + Args: + vertex: The target vertex + + Returns: + Shortest distance to the vertex + """ + return self.distances.get(vertex, float("inf")) + + def get_path_cost(self, path: List[Any]) -> float: + """ + Calculate the total cost of a given path. + + Args: + path: List of vertices representing a path + + Returns: + Total cost of the path, or infinity if path is invalid + """ + if not path or len(path) < 2: + return 0.0 + + total_cost = 0.0 + + for i in range(len(path) - 1): + current_vertex = path[i] + next_vertex = path[i + 1] + + # Find the edge weight + edge_weight = self.graph.get_edge_weight(current_vertex, next_vertex) + if edge_weight is None: + return float("inf") # Invalid path + + total_cost += edge_weight + + return total_cost + + def is_reachable(self, start_vertex: Any, target_vertex: Any) -> bool: + """ + Check if target_vertex is reachable from start_vertex. + + Args: + start_vertex: Source vertex + target_vertex: Target vertex + + Returns: + True if target is reachable, False otherwise + """ + if not self.distances: + self.find_shortest_paths(start_vertex) -short_distance = dijkstra(G2, "E", "F") -print(short_distance) # E -- 3 --> F == 3 + return self.distances.get(target_vertex, float("inf")) != float("inf") -short_distance = dijkstra(G3, "E", "F") -print(short_distance) # E -- 2 --> G -- 1 --> F == 3 + @staticmethod + def validate_non_negative_weights(graph) -> bool: + """ + Validate that all edge weights in the graph are non-negative. -if __name__ == "__main__": - import doctest + Args: + graph: The graph to validate - doctest.testmod() + Returns: + True if all weights are non-negative, False otherwise + """ + for vertex in graph.get_vertices(): + for neighbor, weight in graph.get_neighbors(vertex): + if weight < 0: + return False + return True diff --git a/graphs/johnsons_algorithm.py b/graphs/johnsons_algorithm.py new file mode 100644 index 000000000000..6d836d5736f5 --- /dev/null +++ b/graphs/johnsons_algorithm.py @@ -0,0 +1,310 @@ +""" +Johnson's Algorithm for All-Pairs Shortest Path +Implementation by: Zakaria Fakhri +Date: August 2025 + +Johnson's Algorithm is used to find the shortest paths between all pairs of vertices +in a weighted directed graph. It works well for sparse graphs and handles negative +edge weights (but not negative cycles). + +Algorithm Steps: +1. Add a new vertex connected to all vertices with weight 0 +2. Run Bellman-Ford from the new vertex to detect negative cycles and get potentials +3. Reweight all edges using the potentials to make them non-negative +4. Run Dijkstra from each vertex on the reweighted graph +5. Convert distances back using the original potentials + +Time Complexity: O(V²log(V) + VE) +Space Complexity: O(V²) +""" + +from collections import defaultdict +from typing import Dict, Any, Optional, List, Tuple +from .bellman_ford import BellmanFord +from .dijkstra import Dijkstra + + +class Graph: + """ + A weighted directed graph implementation using adjacency list representation. + """ + + def __init__(self): + """Initialize an empty graph.""" + self.adjacency_list = defaultdict(list) + self.vertices = set() + self._edge_count = 0 + + def add_edge(self, source: Any, destination: Any, weight: float) -> None: + """Add a weighted edge to the graph.""" + self.adjacency_list[source].append((destination, weight)) + self.vertices.add(source) + self.vertices.add(destination) + self._edge_count += 1 + + def add_edges(self, edges: List[Tuple[Any, Any, float]]) -> None: + """Add multiple edges to the graph.""" + for source, destination, weight in edges: + self.add_edge(source, destination, weight) + + def get_neighbors(self, vertex: Any) -> List[Tuple[Any, float]]: + """Get all neighbors of a vertex with their edge weights.""" + return self.adjacency_list.get(vertex, []) + + def get_vertices(self) -> set: + """Get all vertices in the graph.""" + return self.vertices.copy() + + def get_vertex_count(self) -> int: + """Get the number of vertices in the graph.""" + return len(self.vertices) + + def get_edge_count(self) -> int: + """Get the number of edges in the graph.""" + return self._edge_count + + def has_vertex(self, vertex: Any) -> bool: + """Check if a vertex exists in the graph.""" + return vertex in self.vertices + + def has_edge(self, source: Any, destination: Any) -> bool: + """Check if an edge exists between two vertices.""" + for neighbor, _ in self.adjacency_list.get(source, []): + if neighbor == destination: + return True + return False + + def get_edge_weight(self, source: Any, destination: Any) -> Optional[float]: + """Get the weight of an edge between two vertices.""" + for neighbor, weight in self.adjacency_list.get(source, []): + if neighbor == destination: + return weight + return None + + def is_empty(self) -> bool: + """Check if the graph is empty (no vertices).""" + return len(self.vertices) == 0 + + def copy(self) -> "Graph": + """Create a copy of the graph.""" + new_graph = Graph() + for vertex in self.adjacency_list: + for neighbor, weight in self.adjacency_list[vertex]: + new_graph.add_edge(vertex, neighbor, weight) + return new_graph + + def to_dict(self) -> Dict[Any, List[Tuple[Any, float]]]: + """Convert the graph to a dictionary representation.""" + return dict(self.adjacency_list) + + +class JohnsonsAlgorithm: + """ + Implementation of Johnson's Algorithm for All-Pairs Shortest Path problem. + + This class handles weighted directed graphs and finds shortest paths between + all pairs of vertices efficiently, even with negative edge weights. + """ + + def __init__(self, graph: Graph): + """ + Initialize Johnson's Algorithm with a graph. + + Args: + graph: The weighted directed graph to process + """ + self.original_graph = graph + self.potentials = {} + self.reweighted_graph = None + self.all_pairs_distances = {} + + def find_all_pairs_shortest_paths(self) -> Optional[Dict[Any, Dict[Any, float]]]: + """ + Find shortest paths between all pairs of vertices using Johnson's Algorithm. + + Returns: + Dictionary of source -> {destination -> distance}, or None if negative cycle exists + """ + if self.original_graph.is_empty(): + return {} + + # Step 1: Create graph with extra vertex + graph_with_extra = self._create_graph_with_extra_vertex() + extra_vertex = self._get_extra_vertex_name() + + # Step 2: Run Bellman-Ford from extra vertex to get potentials + print("Running Bellman-Ford to compute potentials...") + if not self._compute_potentials(graph_with_extra, extra_vertex): + return None # Negative cycle detected + + # Step 3: Reweight edges to make them non-negative + print("Reweighting edges...") + self._reweight_edges() + + # Step 4: Run Dijkstra from each vertex + print("Running Dijkstra from each vertex...") + self._compute_all_pairs_distances() + + # Step 5: Convert distances back to original weights + self._convert_distances_to_original() + + return self.all_pairs_distances + + def _get_extra_vertex_name(self) -> str: + """Generate a unique name for the extra vertex.""" + base_name = "extra_vertex_johnson" + extra_vertex = base_name + counter = 0 + + while self.original_graph.has_vertex(extra_vertex): + counter += 1 + extra_vertex = f"{base_name}_{counter}" + + return extra_vertex + + def _create_graph_with_extra_vertex(self) -> Graph: + """Create a copy of the original graph with an extra vertex.""" + graph_with_extra = self.original_graph.copy() + extra_vertex = self._get_extra_vertex_name() + + # Add edges from extra vertex to all original vertices with weight 0 + for vertex in self.original_graph.get_vertices(): + graph_with_extra.add_edge(extra_vertex, vertex, 0) + + return graph_with_extra + + def _compute_potentials(self, graph_with_extra: Graph, extra_vertex: str) -> bool: + """Compute potentials using Bellman-Ford algorithm.""" + bellman_ford = BellmanFord(graph_with_extra) + distances = bellman_ford.find_shortest_paths(extra_vertex) + + if distances is None: + return False # Negative cycle detected + + # Store potentials (exclude extra vertex) + self.potentials = {v: distances[v] for v in self.original_graph.get_vertices()} + return True + + def _reweight_edges(self) -> None: + """Reweight all edges to make them non-negative using the computed potentials.""" + self.reweighted_graph = Graph() + + for vertex in self.original_graph.get_vertices(): + for neighbor, weight in self.original_graph.get_neighbors(vertex): + # Reweight the edge + new_weight = ( + weight + self.potentials[vertex] - self.potentials[neighbor] + ) + self.reweighted_graph.add_edge(vertex, neighbor, new_weight) + + def _compute_all_pairs_distances(self) -> None: + """Run Dijkstra's algorithm from each vertex on the reweighted graph.""" + self.all_pairs_distances = {} + + for start_vertex in self.original_graph.get_vertices(): + dijkstra = Dijkstra(self.reweighted_graph) + distances = dijkstra.find_shortest_paths(start_vertex) + self.all_pairs_distances[start_vertex] = distances + + def _convert_distances_to_original(self) -> None: + """Convert the distances back to original weights using potentials.""" + for start_vertex in self.all_pairs_distances: + original_distances = {} + + for end_vertex in self.all_pairs_distances[start_vertex]: + reweighted_distance = self.all_pairs_distances[start_vertex][end_vertex] + + if reweighted_distance != float("inf"): + # Convert back to original distance + original_distance = ( + reweighted_distance + - self.potentials[start_vertex] + + self.potentials[end_vertex] + ) + original_distances[end_vertex] = original_distance + else: + original_distances[end_vertex] = float("inf") + + self.all_pairs_distances[start_vertex] = original_distances + + def get_shortest_path(self, source: Any, destination: Any) -> Optional[float]: + """Get the shortest distance between two specific vertices.""" + if source in self.all_pairs_distances: + return self.all_pairs_distances[source].get(destination) + return None + + def has_negative_cycle(self) -> bool: + """Check if the original graph contains a negative cycle.""" + return self.all_pairs_distances is None + + def get_potentials(self) -> Dict[Any, float]: + """Get the computed potentials for all vertices.""" + return self.potentials.copy() + + def get_reweighted_graph(self) -> Optional[Graph]: + """Get the reweighted graph with non-negative edge weights.""" + return self.reweighted_graph + + def print_all_pairs_distances( + self, distances: Optional[Dict[Any, Dict[Any, float]]] = None + ) -> None: + """Print the all-pairs shortest path distances in a readable format.""" + if distances is None: + distances = self.all_pairs_distances + + if distances is None: + print("No solution due to negative cycle!") + return + + print("\n" + "=" * 60) + print("ALL-PAIRS SHORTEST PATH DISTANCES (Johnson's Algorithm)") + print("=" * 60) + + # Print header + vertices_list = sorted(list(self.original_graph.get_vertices()), key=str) + print(f"{'From\\To':<8}", end="") + for vertex in vertices_list: + print(f"{str(vertex):<8}", end="") + print() + print("-" * (8 + len(vertices_list) * 8)) + + # Print distances + for source in vertices_list: + print(f"{str(source):<8}", end="") + for destination in vertices_list: + if source in distances and destination in distances[source]: + dist = distances[source][destination] + if dist == float("inf"): + print(f"{'∞':<8}", end="") + else: + print(f"{dist:<8}", end="") + else: + print(f"{'N/A':<8}", end="") + print() + + @staticmethod + def is_suitable_for_johnson(graph: Graph) -> tuple: + """Check if Johnson's algorithm is suitable for the given graph.""" + if graph.is_empty(): + return False, "Graph is empty" + + # Check for negative cycles using a simple Bellman-Ford test + temp_graph = graph.copy() + extra_vertex = "temp_extra" + for vertex in graph.get_vertices(): + temp_graph.add_edge(extra_vertex, vertex, 0) + + bellman_ford = BellmanFord(temp_graph) + result = bellman_ford.find_shortest_paths(extra_vertex) + + if result is None: + return False, "Graph contains negative cycles" + + # Johnson's is particularly good for sparse graphs + vertex_count = graph.get_vertex_count() + edge_count = graph.get_edge_count() + + if edge_count < vertex_count * vertex_count / 4: + return True, "Suitable: Sparse graph with no negative cycles" + else: + return True, "Suitable but consider Floyd-Warshall for dense graphs" diff --git a/graphs/tests/test_johnsons_algorithm.py b/graphs/tests/test_johnsons_algorithm.py new file mode 100644 index 000000000000..1a931cd80c48 --- /dev/null +++ b/graphs/tests/test_johnsons_algorithm.py @@ -0,0 +1,490 @@ +""" +Comprehensive tests for Johnson's Algorithm implementation. + +This module contains unit tests for all components of Johnson's Algorithm: +- Graph data structure +- Bellman-Ford algorithm +- Dijkstra's algorithm +- Johnson's algorithm main implementation + +Author: Zakariae Fakhri +Date: August 2025 +""" + +import unittest +import sys +import os + +# Add parent directory to path to import modules +sys.path.append( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) + +from graphs.johnsons_algorithm import Graph, JohnsonsAlgorithm +from graphs.bellman_ford import BellmanFord +from graphs.dijkstra import Dijkstra + + +class TestGraph(unittest.TestCase): + """Test cases for the Graph class.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_empty_graph(self): + """Test empty graph properties.""" + self.assertTrue(self.graph.is_empty()) + self.assertEqual(self.graph.get_vertex_count(), 0) + self.assertEqual(self.graph.get_edge_count(), 0) + self.assertEqual(len(self.graph.get_vertices()), 0) + + def test_add_single_edge(self): + """Test adding a single edge.""" + self.graph.add_edge(1, 2, 5.0) + + self.assertFalse(self.graph.is_empty()) + self.assertEqual(self.graph.get_vertex_count(), 2) + self.assertEqual(self.graph.get_edge_count(), 1) + self.assertTrue(self.graph.has_vertex(1)) + self.assertTrue(self.graph.has_vertex(2)) + self.assertTrue(self.graph.has_edge(1, 2)) + self.assertFalse(self.graph.has_edge(2, 1)) + self.assertEqual(self.graph.get_edge_weight(1, 2), 5.0) + + def test_add_multiple_edges(self): + """Test adding multiple edges.""" + edges = [(1, 2, -1), (2, 3, 4), (3, 1, 2)] + self.graph.add_edges(edges) + + self.assertEqual(self.graph.get_vertex_count(), 3) + self.assertEqual(self.graph.get_edge_count(), 3) + self.assertEqual(self.graph.get_edge_weight(1, 2), -1) + self.assertEqual(self.graph.get_edge_weight(2, 3), 4) + self.assertEqual(self.graph.get_edge_weight(3, 1), 2) + + def test_neighbors(self): + """Test getting neighbors of vertices.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(1, 3, 4) + self.graph.add_edge(2, 3, 1) + + neighbors_1 = self.graph.get_neighbors(1) + self.assertEqual(len(neighbors_1), 2) + self.assertIn((2, 3), neighbors_1) + self.assertIn((3, 4), neighbors_1) + + neighbors_2 = self.graph.get_neighbors(2) + self.assertEqual(len(neighbors_2), 1) + self.assertIn((3, 1), neighbors_2) + + def test_copy_graph(self): + """Test copying a graph.""" + self.graph.add_edge(1, 2, 5) + self.graph.add_edge(2, 3, -2) + + copy_graph = self.graph.copy() + + self.assertEqual(copy_graph.get_vertex_count(), self.graph.get_vertex_count()) + self.assertEqual(copy_graph.get_edge_count(), self.graph.get_edge_count()) + self.assertEqual(copy_graph.get_edge_weight(1, 2), 5) + self.assertEqual(copy_graph.get_edge_weight(2, 3), -2) + + # Modify original, copy should remain unchanged + self.graph.add_edge(3, 4, 1) + self.assertEqual(self.graph.get_edge_count(), 3) + self.assertEqual(copy_graph.get_edge_count(), 2) + + +class TestBellmanFord(unittest.TestCase): + """Test cases for the Bellman-Ford algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_single_vertex(self): + """Test Bellman-Ford with a single vertex.""" + self.graph.add_edge(1, 1, 0) # Self-loop + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + + def test_simple_path(self): + """Test Bellman-Ford with a simple path.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(2, 3, 4) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 3) + self.assertEqual(distances[3], 7) + + def test_negative_edges(self): + """Test Bellman-Ford with negative edges but no negative cycles.""" + self.graph.add_edge(1, 2, -1) + self.graph.add_edge(2, 3, 4) + self.graph.add_edge(1, 3, 2) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], -1) + self.assertEqual(distances[3], 2) # min(2, -1+4) = 2 + + def test_negative_cycle_detection(self): + """Test Bellman-Ford negative cycle detection.""" + # Create a negative cycle: 1 -> 2 -> 3 -> 1 with total weight -1 + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, -4) + self.graph.add_edge(3, 1, 2) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNone(distances) # Should detect negative cycle + + def test_unreachable_vertices(self): + """Test Bellman-Ford with unreachable vertices.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(3, 4, 1) # Separate component + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + + self.assertIsNotNone(distances) + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 1) + self.assertEqual(distances[3], float("inf")) + self.assertEqual(distances[4], float("inf")) + + def test_get_path(self): + """Test path reconstruction in Bellman-Ford.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, 2) + self.graph.add_edge(1, 3, 5) + + bf = BellmanFord(self.graph) + distances = bf.find_shortest_paths(1) + path = bf.get_path(1, 3) + + self.assertIsNotNone(distances) + self.assertEqual(path, [1, 2, 3]) # Should use shorter path + self.assertEqual(distances[3], 3) + + +class TestDijkstra(unittest.TestCase): + """Test cases for Dijkstra's algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_simple_path(self): + """Test Dijkstra with a simple path.""" + self.graph.add_edge(1, 2, 3) + self.graph.add_edge(2, 3, 4) + self.graph.add_edge(1, 3, 10) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 3) + self.assertEqual(distances[3], 7) # 3 + 4 < 10 + + def test_complex_graph(self): + """Test Dijkstra with a more complex graph.""" + edges = [ + (1, 2, 4), + (1, 3, 2), + (2, 3, 1), + (2, 4, 5), + (3, 4, 8), + (3, 5, 10), + (4, 5, 2), + ] + self.graph.add_edges(edges) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 4) # Direct edge 1->2: 4 + self.assertEqual(distances[3], 2) + self.assertEqual(distances[4], 9) # 1->2->4: 4+5 = 9 + self.assertEqual(distances[5], 11) # 1->2->4->5: 4+5+2 = 11 + + def test_unreachable_vertices(self): + """Test Dijkstra with unreachable vertices.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(3, 4, 1) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + + self.assertEqual(distances[1], 0) + self.assertEqual(distances[2], 1) + self.assertEqual(distances[3], float("inf")) + self.assertEqual(distances[4], float("inf")) + + def test_get_path(self): + """Test path reconstruction in Dijkstra.""" + self.graph.add_edge(1, 2, 1) + self.graph.add_edge(2, 3, 2) + self.graph.add_edge(1, 3, 5) + + dijkstra = Dijkstra(self.graph) + distances = dijkstra.find_shortest_paths(1) + path = dijkstra.get_path(1, 3) + + self.assertEqual(path, [1, 2, 3]) + self.assertEqual(distances[3], 3) + + +class TestJohnsonsAlgorithm(unittest.TestCase): + """Test cases for Johnson's Algorithm.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.graph = Graph() + + def test_empty_graph(self): + """Test Johnson's algorithm with empty graph.""" + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertEqual(result, {}) + + def test_single_vertex(self): + """Test Johnson's algorithm with single vertex.""" + # Need at least one edge to create a vertex + self.graph.add_edge(1, 1, 0) + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + self.assertEqual(result[1][1], 0) + + def test_simple_graph_positive_weights(self): + """Test Johnson's algorithm with positive weights only.""" + edges = [(1, 2, 3), (2, 3, 4), (1, 3, 10)] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Check some key distances + self.assertEqual(result[1][1], 0) + self.assertEqual(result[1][2], 3) + self.assertEqual(result[1][3], 7) # 1->2->3: 3+4 = 7 + self.assertEqual(result[2][2], 0) + self.assertEqual(result[2][3], 4) + self.assertEqual(result[3][3], 0) + + # Unreachable paths should be infinity + self.assertEqual(result[2][1], float("inf")) + self.assertEqual(result[3][1], float("inf")) + self.assertEqual(result[3][2], float("inf")) + + def test_graph_with_negative_weights(self): + """Test Johnson's algorithm with negative weights (no negative cycles).""" + edges = [(1, 2, -1), (1, 3, 4), (2, 3, 3), (2, 4, 2), (3, 4, -5), (4, 1, 6)] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Test some specific shortest paths + self.assertEqual(result[1][1], 0) + self.assertEqual(result[1][2], -1) + self.assertEqual( + result[1][4], -3 + ) # 1->3->4: 4+(-5) = -1, but 1->2->3->4: -1+3+(-5) = -3 + self.assertEqual(result[3][4], -5) + + def test_negative_cycle_detection(self): + """Test Johnson's algorithm negative cycle detection.""" + # Create a negative cycle + edges = [ + (1, 2, 1), + (2, 3, -4), + (3, 1, 2), # Total cycle weight: 1 + (-4) + 2 = -1 + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNone(result) # Should detect negative cycle + # Note: has_negative_cycle only returns True after find_all_pairs_shortest_paths is called and returns None + + def test_string_vertices(self): + """Test Johnson's algorithm with string vertices.""" + edges = [ + ("A", "B", 1), + ("A", "C", 4), + ("B", "C", -3), + ("B", "D", 2), + ("C", "D", 3), + ("D", "A", -1), + ] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + + # Test some specific distances + self.assertEqual(result["A"]["A"], 0) + self.assertEqual(result["A"]["B"], 1) + self.assertEqual(result["A"]["C"], -2) # A->B->C: 1+(-3) = -2 + self.assertEqual(result["B"]["C"], -3) + self.assertEqual(result["D"]["A"], -1) + + def test_get_shortest_path_method(self): + """Test the get_shortest_path convenience method.""" + edges = [(1, 2, 3), (2, 3, 4)] + self.graph.add_edges(edges) + + johnson = JohnsonsAlgorithm(self.graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertEqual(johnson.get_shortest_path(1, 3), 7) + self.assertEqual(johnson.get_shortest_path(1, 1), 0) + self.assertEqual(johnson.get_shortest_path(2, 1), float("inf")) + + def test_algorithm_suitability_check(self): + """Test the static method for checking algorithm suitability.""" + # Empty graph + empty_graph = Graph() + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(empty_graph) + self.assertFalse(suitable) + self.assertIn("empty", reason.lower()) + + # Graph with negative cycle + neg_cycle_graph = Graph() + neg_cycle_graph.add_edges([(1, 2, 1), (2, 3, -4), (3, 1, 2)]) + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(neg_cycle_graph) + self.assertFalse(suitable) + self.assertIn("negative cycle", reason.lower()) + + # Good graph for Johnson's + good_graph = Graph() + good_graph.add_edges([(1, 2, -1), (2, 3, 4), (3, 4, 2)]) + suitable, reason = JohnsonsAlgorithm.is_suitable_for_johnson(good_graph) + self.assertTrue(suitable) + + +class TestIntegration(unittest.TestCase): + """Integration tests comparing Johnson's with individual algorithms.""" + + def test_johnson_vs_dijkstra_positive_weights(self): + """Compare Johnson's with Dijkstra on positive weight graph.""" + graph = Graph() + edges = [(1, 2, 3), (2, 3, 4), (1, 3, 10), (3, 4, 1)] + graph.add_edges(edges) + + # Johnson's algorithm + johnson = JohnsonsAlgorithm(graph) + johnson_result = johnson.find_all_pairs_shortest_paths() + + # Dijkstra from vertex 1 + dijkstra = Dijkstra(graph) + dijkstra_distances = dijkstra.find_shortest_paths(1) + + # Compare results for vertex 1 + for vertex in graph.get_vertices(): + self.assertEqual( + johnson_result[1][vertex], + dijkstra_distances[vertex], + f"Mismatch for vertex {vertex}", + ) + + def test_performance_comparison(self): + """Basic performance test to ensure algorithms complete.""" + # Create a moderately sized graph + graph = Graph() + + # Create a grid-like graph + for i in range(1, 6): + for j in range(1, 6): + if i < 5: + graph.add_edge(i * 5 + j, (i + 1) * 5 + j, 1) + if j < 5: + graph.add_edge(i * 5 + j, i * 5 + (j + 1), 1) + + # Add some negative edges + graph.add_edge(6, 11, -2) + graph.add_edge(12, 17, -1) + + johnson = JohnsonsAlgorithm(graph) + result = johnson.find_all_pairs_shortest_paths() + + self.assertIsNotNone(result) + self.assertEqual(len(result), graph.get_vertex_count()) + + +def run_tests(): + """Run all tests and display results.""" + # Create test suite + test_classes = [ + TestGraph, + TestBellmanFord, + TestDijkstra, + TestJohnsonsAlgorithm, + TestIntegration, + ] + + loader = unittest.TestLoader() + suite = unittest.TestSuite() + + for test_class in test_classes: + tests = loader.loadTestsFromTestCase(test_class) + suite.addTests(tests) + + # Run tests + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + # Print summary + print(f"\n{'=' * 60}") + print("TEST SUMMARY") + print(f"{'=' * 60}") + print(f"Tests run: {result.testsRun}") + print(f"Failures: {len(result.failures)}") + print(f"Errors: {len(result.errors)}") + + if result.failures: + print(f"\nFAILURES:") + for test, traceback in result.failures: + print(f"- {test}: {traceback.split('AssertionError:')[-1].strip()}") + + if result.errors: + print(f"\nERRORS:") + for test, traceback in result.errors: + print(f"- {test}: {traceback.split('Exception:')[-1].strip()}") + + success_rate = ( + (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun + ) * 100 + print(f"\nSuccess rate: {success_rate:.1f}%") + + return result.wasSuccessful() + + +if __name__ == "__main__": + success = run_tests() + exit(0 if success else 1)