Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Jan 20, 2026

📄 6,873% (68.73x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 23.6 milliseconds 339 microseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 6873% speedup by replacing an O(n×m) nested loop with an O(m+n) set-based lookup, where n is the number of nodes and m is the number of edges.

Key Optimization

Original approach: For each node, the code iterates through ALL edges to check if that node is a source:

all(e["source"] != n["id"] for e in edges)

This creates n×m comparisons in the worst case.

Optimized approach: Build a set of all source IDs once, then use O(1) membership tests:

sources = {e["source"] for e in edges}
# Then: n["id"] not in sources

This reduces complexity from O(n×m) to O(m+n).

Performance Impact by Scale

The speedup grows dramatically with input size:

  • Small graphs (3-4 nodes): 20-65% faster
  • Medium graphs (50-100 nodes): 800-2,250% faster
  • Large graphs (500+ nodes): 8,500-13,000% faster

This is because the quadratic behavior of the original code becomes increasingly expensive as the number of nodes and edges grows.

Edge Cases Preserved

The optimization maintains original behavior through careful handling:

  1. Empty edges: When edges = [], the set sources is empty. The code returns the first node without accessing n["id"], matching the original's lazy evaluation via all() on an empty sequence.

  2. Unhashable sources: A try-except catches TypeError if edge sources aren't hashable (rare but possible), falling back to the original logic.

  3. Missing keys: Both versions raise KeyError when nodes lack 'id' keys or edges lack 'source' keys, but only when those keys are actually accessed.

When This Optimization Matters Most

Based on test results and typical graph algorithm usage, this optimization is particularly valuable when:

  • The function is called repeatedly in a loop or hot path
  • Processing graphs with >50 nodes/edges
  • Working with data flow diagrams, DAGs, or workflow systems where finding terminal nodes is common

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 41 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Click to see Generated Regression Tests
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

if __name__ == "__main__":
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}, {"id": "4"}]
    edges = [{"source": "1", "target": "2"}, {"source": "2", "target": "3"}]


def test_basic_chain_returns_terminal_node():
    # Simple linear flow: 1 -> 2 -> 3. The last node (id "3") is not a source in any edge.
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = [{"source": "1", "target": "2"}, {"source": "2", "target": "3"}]
    # Expect the actual dict object for node "3" from the nodes list to be returned.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.00μs -> 1.21μs (65.4% faster)


def test_no_edges_returns_first_node():
    # With no edges, every node is not a source; the implementation returns the first node.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = []  # empty edge set
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.00μs -> 833ns (20.0% faster)


def test_all_nodes_are_sources_returns_none():
    # If every node appears as a source in edges, there is no "last node" -> None expected.
    nodes = [{"id": "x"}, {"id": "y"}]
    edges = [{"source": "x"}, {"source": "y"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 1.33μs (21.8% faster)


def test_missing_node_id_raises_keyerror():
    # Nodes missing the 'id' key will cause a KeyError inside the function when accessing n["id"].
    nodes = [{"identifier": "no_id_here"}]  # incorrect key name on purpose
    edges = [{"source": "whatever"}]
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)  # 1.62μs -> 1.54μs (5.45% faster)


def test_missing_edge_source_raises_keyerror():
    # Edges missing the 'source' key will cause a KeyError when the function tries to access e["source"].
    nodes = [{"id": "1"}]
    edges = [{"src": "1"}]  # incorrect key name
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)  # 1.46μs -> 1.08μs (34.7% faster)


def test_non_string_ids_type_sensitive():
    # The function uses == comparison: type differences matter.
    # Node id is integer 1 but edge source is string "1" so they are not equal.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": "1"}]  # string vs integer
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.42μs -> 1.21μs (17.3% faster)


def test_edges_with_unrelated_sources_are_ignored():
    # If edges reference sources not present among nodes, they shouldn't prevent valid nodes from being returned.
    nodes = [{"id": "a"}, {"id": "b"}]
    edges = [{"source": "z"}]  # 'z' not present among node ids
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.29μs -> 1.08μs (19.2% faster)


def test_duplicate_node_ids_behavior():
    # Duplicate node ids: the function inspects nodes in order and returns the first node whose id is not a source.
    # Case 1: duplicates present and not referenced by edges -> first duplicate returned.
    nodes = [{"id": "dup"}, {"id": "dup"}, {"id": "other"}]
    edges = [{"source": "other"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.25μs -> 1.04μs (20.0% faster)

    # Case 2: duplicates referenced by edges -> both duplicates are considered sources and skipped
    edges_all_dup = [{"source": "dup"}]
    codeflash_output = find_last_node(nodes, edges_all_dup)
    result2 = codeflash_output  # 1.12μs -> 667ns (68.7% faster)


def test_order_dependency_multiple_non_sources_returns_first():
    # When multiple nodes qualify as "not a source", the function returns the first such node in 'nodes' order.
    nodes = [{"id": "alpha"}, {"id": "beta"}, {"id": "gamma"}]
    # Only 'gamma' appears as a source -> alpha and beta are valid; expect 'alpha' returned.
    edges = [{"source": "gamma"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.25μs -> 1.12μs (11.1% faster)


def test_large_scale_many_nodes_and_edges_performance_and_correctness():
    # Create a sizable list of nodes and edges under the 1000-elements guideline.
    # We pick 700 nodes to remain well under the stated limit.
    n = 700
    nodes = [{"id": str(i)} for i in range(n)]  # ids "0".."n-1"
    # Create edges that use every node as a source except the final node (n-1).
    # This forms a chain of sources 0..n-2; node n-1 will be the only id not used as a source.
    edges = [{"source": str(i), "target": str(i + 1)} for i in range(n - 1)]
    # The expected last node is the node with id str(n-1) (the last entry in nodes).
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 9.57ms -> 72.1μs (13174% faster)


def test_all_nodes_sources_with_large_set_returns_none():
    # Another large-scale variant: make edges include all node ids as sources -> expect None.
    n = 500  # keep below the threshold
    nodes = [{"id": f"id_{i}"} for i in range(n)]
    edges = [{"source": f"id_{i}"} for i in range(n)]  # every node appears as a source
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.73ms -> 54.7μs (8540% faster)


def test_handling_of_non_sequential_and_missing_nodes():
    # Nodes list is non-sequential and edges reference a subset; ensure function finds the first node not used as a source.
    nodes = [{"id": "n1"}, {"id": "n2"}, {"id": "n3"}, {"id": "n4"}]
    # edges reference n2 and n4 as sources; n1 and n3 are not sources. Expect n1 returned (first non-source).
    edges = [{"source": "n2"}, {"source": "n4"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.38μs -> 1.12μs (22.2% faster)


def test_function_returns_none_for_empty_nodes_list():
    # No nodes -> nothing to return => None
    nodes = []
    edges = [{"source": "anything"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 667ns -> 1.00μs (33.3% slower)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import pytest
from src.algorithms.graph import find_last_node


def test_find_last_node_simple_linear_flow():
    """Test basic functionality with a simple linear flow: 1->2->3"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = [{"source": "1", "target": "2"}, {"source": "2", "target": "3"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 1.21μs (58.7% faster)


def test_find_last_node_single_node_no_edges():
    """Test with a single node and no edges - should return that node as last"""
    nodes = [{"id": "1"}]
    edges = []

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.08μs -> 834ns (29.9% faster)


def test_find_last_node_two_nodes_one_edge():
    """Test with two nodes connected by one edge"""
    nodes = [{"id": "a"}, {"id": "b"}]
    edges = [{"source": "a", "target": "b"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 1.17μs (28.5% faster)


def test_find_last_node_multiple_branches_converging():
    """Test with multiple branches converging to a final node"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}, {"id": "4"}]
    edges = [
        {"source": "1", "target": "3"},
        {"source": "2", "target": "3"},
        {"source": "3", "target": "4"},
    ]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.17μs -> 1.33μs (62.6% faster)


def test_find_last_node_returns_first_valid_last_node():
    """Test that function returns the first node in list order that has no outgoing edges"""
    nodes = [{"id": "x"}, {"id": "y"}, {"id": "z"}]
    edges = [{"source": "y", "target": "x"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.21μs -> 1.08μs (11.5% faster)


def test_find_last_node_with_node_attributes():
    """Test that function correctly identifies last node even with extra attributes"""
    nodes = [
        {"id": "1", "name": "start", "type": "input"},
        {"id": "2", "name": "middle", "type": "process"},
        {"id": "3", "name": "end", "type": "output"},
    ]
    edges = [{"source": "1", "target": "2"}, {"source": "2", "target": "3"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.83μs -> 1.17μs (57.1% faster)


def test_find_last_node_empty_nodes_list():
    """Test with empty nodes list - should return None"""
    nodes = []
    edges = []

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 667ns -> 875ns (23.8% slower)


def test_find_last_node_empty_edges_list():
    """Test with nodes but no edges - first node should be last node"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = []

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.04μs -> 834ns (24.9% faster)


def test_find_last_node_all_nodes_have_outgoing_edges_cyclic():
    """Test with cyclic graph where all nodes have outgoing edges - should return None"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = [
        {"source": "1", "target": "2"},
        {"source": "2", "target": "3"},
        {"source": "3", "target": "1"},  # Creates cycle
    ]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 1.29μs (45.1% faster)


def test_find_last_node_self_loop():
    """Test with a node that has a self-loop (source and target are same)"""
    nodes = [{"id": "1"}, {"id": "2"}]
    edges = [{"source": "1", "target": "1"}]  # Self-loop

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.54μs -> 1.17μs (32.0% faster)


def test_find_last_node_unreachable_node():
    """Test with unreachable node that has no incoming or outgoing edges"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "unreachable"}]
    edges = [{"source": "1", "target": "2"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 1.12μs (33.3% faster)


def test_find_last_node_node_id_with_special_characters():
    """Test with node IDs containing special characters"""
    nodes = [{"id": "node-1"}, {"id": "node_2"}, {"id": "node.3"}]
    edges = [
        {"source": "node-1", "target": "node_2"},
        {"source": "node_2", "target": "node.3"},
    ]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.12μs -> 1.33μs (59.4% faster)


def test_find_last_node_numeric_string_ids():
    """Test with numeric string IDs to ensure string comparison works"""
    nodes = [{"id": "10"}, {"id": "2"}, {"id": "100"}]
    edges = [{"source": "10", "target": "2"}, {"source": "2", "target": "100"}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 1.33μs (40.7% faster)


def test_find_last_node_duplicate_edges():
    """Test with duplicate edges in the flow"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = [
        {"source": "1", "target": "2"},
        {"source": "1", "target": "2"},  # Duplicate edge
        {"source": "2", "target": "3"},
    ]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 1.29μs (48.5% faster)


def test_find_last_node_edge_with_extra_attributes():
    """Test that extra attributes in edges don't affect the result"""
    nodes = [{"id": "1"}, {"id": "2"}]
    edges = [{"source": "1", "target": "2", "label": "flow", "weight": 5}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.46μs -> 1.12μs (29.6% faster)


def test_find_last_node_edge_without_source_key():
    """Test behavior when an edge dict lacks a 'source' key - should raise KeyError"""
    nodes = [{"id": "1"}, {"id": "2"}]
    edges = [{"target": "2"}]  # Missing 'source' key

    with pytest.raises(KeyError):
        find_last_node(nodes, edges)  # 1.62μs -> 1.12μs (44.4% faster)


def test_find_last_node_multiple_last_nodes_returns_first():
    """Test when multiple nodes have no outgoing edges - returns first in order"""
    nodes = [{"id": "1"}, {"id": "2"}, {"id": "3"}]
    edges = [{"source": "1", "target": "2"}]  # Both 2 and 3 have no outgoing edges

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.54μs -> 1.17μs (32.1% faster)


def test_find_last_node_very_long_id_string():
    """Test with very long ID strings"""
    long_id_1 = "a" * 1000
    long_id_2 = "b" * 1000
    nodes = [{"id": long_id_1}, {"id": long_id_2}]
    edges = [{"source": long_id_1, "target": long_id_2}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.54μs -> 1.21μs (27.6% faster)


def test_find_last_node_large_linear_chain():
    """Test with a large linear chain of 500 nodes"""
    num_nodes = 500
    nodes = [{"id": str(i)} for i in range(num_nodes)]
    edges = [{"source": str(i), "target": str(i + 1)} for i in range(num_nodes - 1)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.87ms -> 52.5μs (9172% faster)


def test_find_last_node_large_graph_with_many_edges():
    """Test with 100 nodes and many edges forming a DAG"""
    num_nodes = 100
    nodes = [{"id": str(i)} for i in range(num_nodes)]
    edges = []

    # Create edges: each node i connects to nodes i+1 and i+2 (if they exist)
    for i in range(num_nodes - 1):
        edges.append({"source": str(i), "target": str(i + 1)})
        if i + 2 < num_nodes:
            edges.append({"source": str(i), "target": str(i + 2)})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 405μs -> 17.2μs (2256% faster)


def test_find_last_node_large_fully_connected_layer():
    """Test with 50 nodes where all feed into a final node"""
    num_nodes = 50
    nodes = [{"id": str(i)} for i in range(num_nodes)]
    edges = []

    # All nodes except last one connect to the final node
    for i in range(num_nodes - 1):
        edges.append({"source": str(i), "target": str(num_nodes - 1)})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 61.6μs -> 6.79μs (807% faster)


def test_find_last_node_many_independent_branches():
    """Test with many independent branching paths that converge"""
    # Create 10 branches, each with 10 nodes, all converging to a final node
    nodes = []
    edges = []
    final_node_id = "final"

    node_id_counter = 0
    for branch in range(10):
        for step in range(10):
            node_id = f"b{branch}_n{step}"
            nodes.append({"id": node_id})

            if step > 0:
                prev_node = f"b{branch}_n{step - 1}"
                edges.append({"source": prev_node, "target": node_id})

            if step == 9:
                # Last node in branch connects to final node
                edges.append({"source": node_id, "target": final_node_id})

    nodes.append({"id": final_node_id})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 222μs -> 12.8μs (1645% faster)


def test_find_last_node_wide_graph():
    """Test with a very wide graph (many nodes at same level)"""
    # Create 200 nodes where 100 feed into 100 others
    num_sources = 100
    num_sinks = 100
    nodes = []
    edges = []

    for i in range(num_sources):
        nodes.append({"id": f"source_{i}"})

    for i in range(num_sinks):
        nodes.append({"id": f"sink_{i}"})

    # Each source connects to corresponding sink
    for i in range(min(num_sources, num_sinks)):
        edges.append({"source": f"source_{i}", "target": f"sink_{i}"})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 220μs -> 15.1μs (1360% faster)


def test_find_last_node_diamond_pattern_scaled():
    """Test with multiple diamond patterns scaled up"""
    # Create 5 diamond patterns chained together
    nodes = []
    edges = []

    for diamond in range(5):
        top = f"d{diamond}_top"
        left = f"d{diamond}_left"
        right = f"d{diamond}_right"
        bottom = f"d{diamond}_bottom"

        nodes.extend([{"id": top}, {"id": left}, {"id": right}, {"id": bottom}])

        edges.append({"source": top, "target": left})
        edges.append({"source": top, "target": right})
        edges.append({"source": left, "target": bottom})
        edges.append({"source": right, "target": bottom})

        # Chain diamonds together
        if diamond > 0:
            prev_bottom = f"d{diamond - 1}_bottom"
            edges.append({"source": prev_bottom, "target": top})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 16.1μs -> 3.96μs (306% faster)


def test_find_last_node_performance_with_many_unreachable_edges():
    """Test performance with 300 nodes and 800 edges forming a sparse DAG"""
    num_nodes = 300
    nodes = [{"id": str(i)} for i in range(num_nodes)]
    edges = []

    # Create edges sparsely: each node connects to 2-3 random later nodes
    for i in range(num_nodes - 1):
        # Each node i connects forward to create DAG property
        edges.append({"source": str(i), "target": str(i + 1)})
        if i % 3 == 0 and i + 2 < num_nodes:
            edges.append({"source": str(i), "target": str(i + 2)})
        if i % 5 == 0 and i + 3 < num_nodes:
            edges.append({"source": str(i), "target": str(i + 3)})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.68ms -> 45.0μs (5854% faster)


def test_find_last_node_tree_structure_balanced():
    """Test with a balanced binary tree structure (63 nodes)"""
    # Level 0: 1 node, Level 1: 2 nodes, Level 2: 4 nodes, Level 3: 8 nodes, etc.
    nodes = []
    edges = []
    node_counter = 0

    # Create 4 levels of binary tree
    for level in range(4):
        level_size = 2**level
        for i in range(level_size):
            node_id = f"l{level}_n{i}"
            nodes.append({"id": node_id})

            # Connect to children in next level
            if level < 3:
                left_child = f"l{level + 1}_n{i * 2}"
                right_child = f"l{level + 1}_n{i * 2 + 1}"
                edges.append({"source": node_id, "target": left_child})
                edges.append({"source": node_id, "target": right_child})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 5.38μs -> 2.29μs (135% faster)


def test_find_last_node_all_to_one_convergence():
    """Test with 200 nodes all connecting to a single final node"""
    num_nodes = 200
    nodes = [{"id": str(i)} for i in range(num_nodes)] + [{"id": "final"}]
    edges = [{"source": str(i), "target": "final"} for i in range(num_nodes)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 801μs -> 22.4μs (3474% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mkm6kvu3 and push.

Codeflash Static Badge

The optimized code achieves a **6873% speedup** by replacing an O(n×m) nested loop with an O(m+n) set-based lookup, where n is the number of nodes and m is the number of edges.

## Key Optimization

**Original approach:** For each node, the code iterates through ALL edges to check if that node is a source:
```python
all(e["source"] != n["id"] for e in edges)
```
This creates n×m comparisons in the worst case.

**Optimized approach:** Build a set of all source IDs once, then use O(1) membership tests:
```python
sources = {e["source"] for e in edges}
# Then: n["id"] not in sources
```
This reduces complexity from O(n×m) to O(m+n).

## Performance Impact by Scale

The speedup grows dramatically with input size:
- **Small graphs (3-4 nodes):** 20-65% faster
- **Medium graphs (50-100 nodes):** 800-2,250% faster  
- **Large graphs (500+ nodes):** 8,500-13,000% faster

This is because the quadratic behavior of the original code becomes increasingly expensive as the number of nodes and edges grows.

## Edge Cases Preserved

The optimization maintains original behavior through careful handling:

1. **Empty edges:** When `edges = []`, the set `sources` is empty. The code returns the first node without accessing `n["id"]`, matching the original's lazy evaluation via `all()` on an empty sequence.

2. **Unhashable sources:** A try-except catches `TypeError` if edge sources aren't hashable (rare but possible), falling back to the original logic.

3. **Missing keys:** Both versions raise `KeyError` when nodes lack 'id' keys or edges lack 'source' keys, but only when those keys are actually accessed.

## When This Optimization Matters Most

Based on test results and typical graph algorithm usage, this optimization is particularly valuable when:
- The function is called repeatedly in a loop or hot path
- Processing graphs with >50 nodes/edges  
- Working with data flow diagrams, DAGs, or workflow systems where finding terminal nodes is common
@codeflash-ai codeflash-ai bot requested a review from KRRT7 January 20, 2026 05:56
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Jan 20, 2026
@KRRT7 KRRT7 closed this Jan 25, 2026
@KRRT7 KRRT7 deleted the codeflash/optimize-find_last_node-mkm6kvu3 branch January 25, 2026 09:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant