Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Jan 22, 2026

📄 9,058% (90.58x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 12.3 milliseconds 134 microseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 9057% speedup (from 12.3ms to 134μs) by replacing a quadratic O(N×M) algorithm with a linear O(N+M) algorithm, where N is the number of nodes and M is the number of edges.

Key optimization:
The original code uses a nested loop structure: for each node, it iterates through all edges to check if that node appears as a source. This results in O(N×M) comparisons.

The optimized version builds a set of source IDs from edges in a single pass (sources = {e["source"] for e in edges}), then performs O(1) membership checks (n["id"] not in sources) for each node. This reduces complexity to O(N+M).

Why this is faster:

  • Set lookup is O(1) vs. linear scan through all edges
  • For the large test cases (500 nodes, 499 edges), the original performs ~250,000 comparisons while the optimized performs ~1,000 operations
  • Test results confirm this: test_large_scale_chain_flow shows 16,642% speedup (4.43ms → 26.5μs) and test_large_complete_graph_with_sink shows 10,155% speedup (1.62ms → 15.8μs)

Edge case handling:
The optimization includes safeguards:

  1. Single-use iterators: Detects if edges is a consumed iterator (iter(edges) is edges) and falls back to original logic to preserve correctness
  2. Unhashable sources: If any source value can't be hashed (e.g., lists, dicts), catches TypeError and falls back to the original nested approach

Performance impact:

  • Small graphs (2-10 nodes): 30-96% faster - modest gains due to set construction overhead
  • Medium graphs (100-300 nodes): 2,834-10,155% faster - substantial wins as quadratic cost dominates
  • Large graphs (500+ nodes): 16,000%+ faster - dramatic improvements where the original becomes prohibitively slow

The optimization is particularly valuable when find_last_node is called repeatedly on non-trivial graphs, as the linear algorithm scales far better than the quadratic baseline.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 33 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Click to see Generated Regression Tests
from __future__ import annotations

import itertools  # used to create generators for iterator-based tests
import time  # used to provide a small sanity check on large-scale runtime

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node


def test_basic_single_last_node():
    # Arrange: nodes with ids 'a','b','c'; edges originate from 'a' and 'b'
    nodes = [
        {"id": "a", "value": 1},
        {"id": "b", "value": 2},
        {"id": "c", "value": 3},
    ]
    edges = [
        {"source": "a", "target": "b"},
        {"source": "b", "target": "c"},
    ]

    # Act: find the last node (node with no outgoing edges)
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.00μs -> 1.08μs (84.7% faster)


def test_no_edges_returns_first_node_with_no_outgoing():
    # Arrange: edges empty -> any node has no outgoing edges, function should return the first node
    nodes = [{"id": 1}, {"id": 2}]
    edges = []

    # Act
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.12μs -> 792ns (42.0% faster)


def test_empty_nodes_returns_none():
    # Arrange: no nodes at all
    nodes = []
    edges = [{"source": "x"}]

    # Act
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 667ns -> 791ns (15.7% slower)


def test_multiple_candidates_returns_first_candidate():
    # Arrange: two nodes have no outgoing edges (id 'x' and 'y'), ensure the first is returned
    nodes = [{"id": "x"}, {"id": "y"}, {"id": "z"}]
    # Edge only from 'z' so 'x' and 'y' are both candidates; first candidate 'x' should be returned
    edges = [{"source": "z", "target": "x"}]

    # Act
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.33μs -> 875ns (52.5% faster)


def test_duplicate_node_ids_returns_first_instance():
    # Arrange: duplicate node ids appear; duplicates should be treated according to sequence order
    node_a = {"id": 1, "tag": "first"}
    node_b = {"id": 1, "tag": "second"}
    nodes = [node_a, node_b]
    # No edges at all, so the first node instance should be returned
    edges = []

    # Act
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.08μs -> 833ns (30.0% faster)


def test_edge_dict_missing_source_key_raises_keyerror():
    # Arrange: an edge is missing the 'source' key which will cause e["source"] to raise KeyError
    nodes = [{"id": "n1"}]
    edges = [{"target": "n1"}]  # missing 'source'

    # Act & Assert: calling find_last_node should raise KeyError due to edges missing 'source'
    with pytest.raises(KeyError):
        codeflash_output = find_last_node(nodes, edges)
        _ = codeflash_output  # 1.54μs -> 1.21μs (27.6% faster)


def test_node_dict_missing_id_key_raises_keyerror():
    # Arrange: a node is missing the 'id' key which will cause n["id"] to raise KeyError during comparison
    nodes = [{"name": "no-id"}]
    edges = [{"source": "whatever"}]

    # Act & Assert: attempting to access n["id"] inside the generator will raise KeyError
    with pytest.raises(KeyError):
        codeflash_output = find_last_node(nodes, edges)
        _ = codeflash_output  # 1.50μs -> 1.04μs (44.1% faster)


def test_non_string_and_mixed_id_types_handling():
    # Arrange: node ids can be ints and edges use int sources; equality should work across types that match
    nodes = [{"id": 0}, {"id": "1"}, {"id": 2}]
    edges = [{"source": 0}, {"source": 2}]
    # Only node with id "1" (a string) has no outgoing edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.83μs -> 1.04μs (75.9% faster)


def test_large_scale_chain_flow_last_node_identified_quickly():
    # Arrange: create a linear chain of nodes 0 -> 1 -> 2 -> ... -> N-1
    # Use N = 500 to respect "avoid loops exceeding 1000 steps" and data structure limits
    N = 500
    nodes = [{"id": i} for i in range(N)]  # nodes 0 .. N-1
    # Create edges from i -> i+1 for i in 0 .. N-2, so node N-1 has no outgoing edges
    edges = [{"source": i, "target": i + 1} for i in range(N - 1)]

    # Act: run and measure a tiny runtime to ensure it completes in reasonable time
    start = time.perf_counter()
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.43ms -> 26.5μs (16642% faster)
    elapsed = time.perf_counter() - start


def test_large_scale_many_candidates_returns_first_of_many():
    # Arrange: set up many nodes where the later half have no outgoing edges
    N = 300  # within allowed limits
    nodes = [{"id": i} for i in range(N)]
    # Create edges only for nodes 0..149 so nodes 150..299 have no outgoing edges
    edges = [{"source": i, "target": i + 1} for i in range(150 - 1)]

    # Act: the first node without outgoing edges is id==149 (since edges cover up to 148->149), so 149 has no outgoing edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 426μs -> 8.88μs (4700% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import pytest
from src.algorithms.graph import find_last_node


def test_single_node_no_edges():
    """Test finding last node when there is only one node with no edges."""
    nodes = [{"id": 1, "name": "node1"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.08μs -> 833ns (30.1% faster)


def test_linear_chain_three_nodes():
    """Test finding the last node in a simple linear chain: 1->2->3."""
    nodes = [
        {"id": 1, "name": "node1"},
        {"id": 2, "name": "node2"},
        {"id": 3, "name": "node3"},
    ]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 1.08μs (73.1% faster)


def test_diamond_graph_single_sink():
    """Test finding last node in a diamond-shaped graph (multiple paths to single sink)."""
    nodes = [
        {"id": 1, "name": "start"},
        {"id": 2, "name": "left"},
        {"id": 3, "name": "right"},
        {"id": 4, "name": "end"},
    ]
    edges = [
        {"source": 1, "target": 2},
        {"source": 1, "target": 3},
        {"source": 2, "target": 4},
        {"source": 3, "target": 4},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.38μs -> 1.21μs (96.6% faster)


def test_two_nodes_with_edge():
    """Test finding last node when there are two nodes with an edge between them."""
    nodes = [{"id": 1, "name": "source_node"}, {"id": 2, "name": "sink_node"}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 1.00μs (50.0% faster)


def test_multiple_sinks_returns_first():
    """Test that when multiple nodes have no outgoing edges, the first one is returned."""
    nodes = [
        {"id": 1, "name": "node1"},
        {"id": 2, "name": "node2"},
        {"id": 3, "name": "node3"},
    ]
    edges = [{"source": 1, "target": 2}, {"source": 1, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 1.00μs (62.5% faster)


def test_node_with_extra_attributes():
    """Test that nodes with additional attributes are handled correctly."""
    nodes = [
        {"id": "A", "name": "first", "type": "input", "status": "active"},
        {"id": "B", "name": "last", "type": "output", "status": "active"},
    ]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 1.00μs (62.5% faster)


def test_empty_nodes_list():
    """Test behavior when nodes list is empty."""
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 709ns -> 666ns (6.46% faster)


def test_empty_edges_with_multiple_nodes():
    """Test that when there are no edges, the first node is returned as the last node."""
    nodes = [
        {"id": 1, "name": "node1"},
        {"id": 2, "name": "node2"},
        {"id": 3, "name": "node3"},
    ]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.08μs -> 792ns (36.7% faster)


def test_cyclic_graph():
    """Test finding last node in a cyclic graph (no true sink exists)."""
    nodes = [
        {"id": 1, "name": "node1"},
        {"id": 2, "name": "node2"},
        {"id": 3, "name": "node3"},
    ]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},  # Creates cycle
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 1.04μs (84.0% faster)


def test_self_loop():
    """Test node with self-loop edge."""
    nodes = [{"id": 1, "name": "loop_node"}, {"id": 2, "name": "final_node"}]
    edges = [{"source": 1, "target": 1}, {"source": 1, "target": 2}]  # Self-loop
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.54μs -> 1.00μs (54.2% faster)


def test_string_ids():
    """Test with string-based IDs instead of integers."""
    nodes = [
        {"id": "start", "name": "beginning"},
        {"id": "middle", "name": "center"},
        {"id": "end", "name": "finish"},
    ]
    edges = [
        {"source": "start", "target": "middle"},
        {"source": "middle", "target": "end"},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 1.08μs (77.0% faster)


def test_isolated_node_among_chain():
    """Test finding last node when there's an isolated node not in the chain."""
    nodes = [
        {"id": 1, "name": "chain_start"},
        {"id": 2, "name": "chain_end"},
        {"id": 3, "name": "isolated"},
    ]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 1.00μs (50.0% faster)


def test_edge_with_non_existent_source():
    """Test edge referencing a source node that doesn't exist in the nodes list."""
    nodes = [{"id": 1, "name": "node1"}, {"id": 2, "name": "node2"}]
    edges = [
        {"source": 999, "target": 1},  # Source 999 doesn't exist
        {"source": 1, "target": 2},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.75μs -> 1.00μs (75.0% faster)


def test_node_id_zero():
    """Test with node ID of 0 (falsy value but valid)."""
    nodes = [{"id": 0, "name": "zero_node"}, {"id": 1, "name": "one_node"}]
    edges = [{"source": 0, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 958ns (56.6% faster)


def test_node_id_none_type():
    """Test with None as a node ID (edge case for falsy values)."""
    nodes = [{"id": None, "name": "none_node"}, {"id": 1, "name": "real_node"}]
    edges = [{"source": None, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 958ns (69.6% faster)


def test_empty_string_node_id():
    """Test with empty string as node ID."""
    nodes = [{"id": "", "name": "empty_id_node"}, {"id": "a", "name": "normal_id_node"}]
    edges = [{"source": "", "target": "a"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.50μs -> 958ns (56.6% faster)


def test_large_linear_chain():
    """Test finding last node in a large linear chain (500 nodes)."""
    num_nodes = 500
    nodes = [{"id": i, "name": f"node{i}"} for i in range(num_nodes)]
    edges = [{"source": i, "target": i + 1} for i in range(num_nodes - 1)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.42ms -> 26.8μs (16396% faster)


def test_wide_graph_single_sink():
    """Test finding last node in a wide graph (many sources to single sink)."""
    num_sources = 200
    nodes = [{"id": i, "name": f"source{i}"} for i in range(num_sources)]
    nodes.append({"id": num_sources, "name": "sink"})

    edges = [{"source": i, "target": num_sources} for i in range(num_sources)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 745μs -> 11.0μs (6647% faster)


def test_deep_pyramid_graph():
    """Test finding last node in a deep pyramid structure (multiple levels)."""
    # Create a pyramid: level 0 has 1 node, level 1 has 2, level 2 has 4, etc.
    nodes = []
    edges = []
    node_id = 0
    level_positions = {}

    # Build 5 levels
    for level in range(5):
        level_positions[level] = []
        nodes_in_level = 2**level
        for _ in range(nodes_in_level):
            nodes.append({"id": node_id, "name": f"node{node_id}"})
            level_positions[level].append(node_id)
            node_id += 1

        # Connect previous level to current level
        if level > 0:
            prev_level_nodes = level_positions[level - 1]
            curr_level_nodes = level_positions[level]
            for i, prev_id in enumerate(prev_level_nodes):
                edges.append({"source": prev_id, "target": curr_level_nodes[2 * i]})
                edges.append({"source": prev_id, "target": curr_level_nodes[2 * i + 1]})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 13.3μs -> 2.29μs (482% faster)


def test_large_complete_graph_with_sink():
    """Test performance with a large near-complete graph (all nodes connect to single sink)."""
    num_nodes = 300
    nodes = [{"id": i, "name": f"node{i}"} for i in range(num_nodes)]

    # Create edges: nodes 0 to n-2 all connect to node n-1 (sink)
    edges = [{"source": i, "target": num_nodes - 1} for i in range(num_nodes - 1)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62ms -> 15.8μs (10155% faster)


def test_sparse_graph_with_multiple_isolated_nodes():
    """Test finding last node in a sparse graph with many isolated nodes."""
    # Create 100 isolated nodes
    nodes = [{"id": i, "name": f"isolated{i}"} for i in range(100)]

    # Add 1 connected pair
    nodes.append({"id": 100, "name": "connected_source"})
    nodes.append({"id": 101, "name": "connected_sink"})

    edges = [{"source": 100, "target": 101}]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.29μs -> 958ns (34.9% faster)


def test_complex_dag_structure():
    """Test finding last node in a large DAG (directed acyclic graph) with ~250 nodes."""
    nodes = []
    edges = []

    # Create 5 layers with increasing width
    layer_starts = []
    node_id = 0

    for layer in range(5):
        layer_starts.append(node_id)
        nodes_in_layer = 10 + layer * 10  # 10, 20, 30, 40, 50 nodes per layer

        for _ in range(nodes_in_layer):
            nodes.append({"id": node_id, "name": f"layer{layer}_node{node_id}"})
            node_id += 1

        # Connect to next layer
        if layer < 4:
            current_layer_start = layer_starts[layer]
            current_layer_end = node_id
            next_layer_start = node_id
            next_layer_size = 10 + (layer + 1) * 10

            for src_id in range(current_layer_start, current_layer_end):
                # Each node connects to 2 nodes in next layer (sparse connections)
                edges.append({"source": src_id, "target": next_layer_start})
                if next_layer_start + 1 < next_layer_start + next_layer_size:
                    edges.append({"source": src_id, "target": next_layer_start + 1})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 376μs -> 8.71μs (4226% faster)
    # The sink nodes are those in the last layer with no outgoing edges
    last_layer_start = layer_starts[4]


def test_large_nodes_with_many_attributes():
    """Test performance with large nodes having many attributes."""
    num_nodes = 100
    nodes = [
        {
            "id": i,
            "name": f"node{i}",
            "type": "process",
            "status": "active",
            "priority": i % 5,
            "timestamp": 1000 + i,
            "tags": [f"tag{j}" for j in range(5)],
            "metadata": {"key1": f"value{i}", "key2": i * 2},
        }
        for i in range(num_nodes)
    ]

    edges = [{"source": i, "target": i + 1} for i in range(num_nodes - 1)]

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 200μs -> 6.83μs (2834% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mkq2j701 and push.

Codeflash Static Badge

The optimized code achieves a **9057% speedup** (from 12.3ms to 134μs) by replacing a **quadratic O(N×M) algorithm with a linear O(N+M) algorithm**, where N is the number of nodes and M is the number of edges.

**Key optimization:**
The original code uses a nested loop structure: for each node, it iterates through *all* edges to check if that node appears as a source. This results in O(N×M) comparisons.

The optimized version builds a **set of source IDs** from edges in a single pass (`sources = {e["source"] for e in edges}`), then performs O(1) membership checks (`n["id"] not in sources`) for each node. This reduces complexity to O(N+M).

**Why this is faster:**
- **Set lookup is O(1)** vs. linear scan through all edges
- For the large test cases (500 nodes, 499 edges), the original performs ~250,000 comparisons while the optimized performs ~1,000 operations
- Test results confirm this: `test_large_scale_chain_flow` shows **16,642% speedup** (4.43ms → 26.5μs) and `test_large_complete_graph_with_sink` shows **10,155% speedup** (1.62ms → 15.8μs)

**Edge case handling:**
The optimization includes safeguards:
1. **Single-use iterators**: Detects if `edges` is a consumed iterator (`iter(edges) is edges`) and falls back to original logic to preserve correctness
2. **Unhashable sources**: If any source value can't be hashed (e.g., lists, dicts), catches `TypeError` and falls back to the original nested approach

**Performance impact:**
- Small graphs (2-10 nodes): **30-96% faster** - modest gains due to set construction overhead
- Medium graphs (100-300 nodes): **2,834-10,155% faster** - substantial wins as quadratic cost dominates
- Large graphs (500+ nodes): **16,000%+ faster** - dramatic improvements where the original becomes prohibitively slow

The optimization is particularly valuable when `find_last_node` is called repeatedly on non-trivial graphs, as the linear algorithm scales far better than the quadratic baseline.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 January 22, 2026 23:14
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Jan 22, 2026
@KRRT7 KRRT7 closed this Jan 25, 2026
@KRRT7 KRRT7 deleted the codeflash/optimize-find_last_node-mkq2j701 branch January 25, 2026 09:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant