From b4f2fbfa5fde02f1fdce495904df6d2c838b956b Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Mon, 22 Jun 2026 14:51:43 +0300 Subject: [PATCH 1/2] perf(graph): bulk COPY FROM for nodes, routes, clients/producers (PR-P2) - Convert _write_nodes_impl (shared workhorse) to bulk COPY FROM for Symbol nodes - Convert _write_routes_and_exposes (shared) to bulk COPY FROM for Route/EXPOSES/Client/Producer/DECLARES_*/HTTP_CALLS/ASYNC_CALLS - Convert _write_clients_producers_and_calls (incremental-only) to bulk COPY FROM for Client/Producer/edges - Delete _CREATE_SYMBOL, _MERGE_SYMBOL, _CREATE_ROUTE, _CREATE_EXPOSES, and 6 shared _CREATE_* constants - Retain MERGE (r:Route) dedup in _write_clients_producers_and_calls with comment - Add _existing_node_ids helper to generalize referential integrity filtering for all node types - Add column constants: _ROUTE_COLUMNS, _CLIENT_COLUMNS, _PRODUCER_COLUMNS, _REL_*_COLUMNS for routes/clients/producers - Filter existing nodes in incremental path to avoid duplicate primary key errors - Re-fetch valid_ids after bulk-loading Route/Client/Producer nodes before edge staging - Add test_incremental_bulk_write_equivalent_to_full_rebuild and test_incremental_route_merge_dedup_preserved to TestIncrementalOrchestrator Co-Authored-By: Claude --- build_ast_graph.py | 352 +++++++++++++++++--------------- tests/test_incremental_graph.py | 160 ++++++++++++++- 2 files changed, 351 insertions(+), 161 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 30955a6..05c95d0 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -822,8 +822,8 @@ def _write_nodes_merge( project_root: Path, meta_chain: dict[str, frozenset[str]] | None, ) -> None: - """Write nodes to existing LadybugDB database using MERGE to handle existing nodes.""" - _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_MERGE_SYMBOL) + """Write nodes to existing LadybugDB database using bulk COPY FROM.""" + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain) # ---------- file walk (see `path_filtering.iter_java_source_files`) ---------- @@ -3013,27 +3013,42 @@ def _bulk_copy(conn: ladybug.Connection, table_name: str, columns: list[str], ro Spike result (PR-P1 step-1): REL `COPY FROM` expects columns named `FROM` and `TO` for the endpoint node IDs, followed by property columns in the declared order. - `pa.Table.from_pylist(rows)` correctly infers types from the dict values. + `pa.Table.from_pylist(rows)` correctly infers types from the dict values, but + we must select columns in the exact order expected by the table schema. """ if not rows: return tbl = pa.Table.from_pylist(rows) + # Select columns in the exact order expected by the table schema + tbl = tbl.select(columns) conn.execute(f"COPY {table_name} FROM $rows", {"rows": tbl}) -def _existing_symbol_ids(conn: ladybug.Connection) -> set[str]: - """Return every Symbol node id currently in the graph. +def _existing_node_ids(conn: ladybug.Connection) -> set[str]: + """Return every node id (Symbol, Route, Client, Producer) currently in the graph. Bulk ``COPY FROM`` enforces referential integrity: a REL row whose FROM/TO endpoint isn't a loaded node raises ``Unable to find primary key value``. The legacy per-row ``MERGE (a:Symbol {id:$src}),(b:Symbol {id:$dst})`` silently dropped such edges (a ``MATCH`` against a missing endpoint creates nothing). - ``_write_edges`` filters edge rows against this set to reproduce that exactly. + Edge writers filter edge rows against this set to reproduce that exactly. - This queries the live DB rather than just ``tables`` because ``_write_edges`` - is shared with the incremental path, whose edges legitimately reference nodes - written in prior runs. Both paths call ``_write_nodes`` before ``_write_edges``, - so freshly written nodes are included. + This queries the live DB rather than just ``tables`` because it is shared + with the incremental path, whose edges legitimately reference nodes written + in prior runs. + """ + result = conn.execute("MATCH (n) RETURN n.id") + ids: set[str] = set() + while result.has_next(): + ids.add(result.get_next()[0]) + return ids + + +def _existing_symbol_ids(conn: ladybug.Connection) -> set[str]: + """Return every Symbol node id currently in the graph. + + Deprecated: use _existing_node_ids for filtering all node types. + Kept for compatibility with existing _write_edges implementation. """ result = conn.execute("MATCH (n:Symbol) RETURN n.id") ids: set[str] = set() @@ -3061,27 +3076,17 @@ def _existing_symbol_ids(conn: ladybug.Connection) -> set[str]: _UNRESOLVED_CALL_SITE_COLUMNS = ["id", "caller_id", "call_site_line", "call_site_byte", "arg_count", "callee_simple", "receiver_expr", "reason"] _REL_UNRESOLVED_AT_COLUMNS = ["FROM", "TO", "source_file"] +# Node table column constants (for bulk COPY FROM) +_ROUTE_COLUMNS = ["id", "kind", "framework", "method", "path", "path_template", "path_regex", "topic", "broker", "feign_name", "feign_url", "microservice", "module", "filename", "start_line", "end_line", "resolved"] +_CLIENT_COLUMNS = ["id", "client_kind", "target_service", "path", "path_template", "path_regex", "method", "member_fqn", "member_id", "microservice", "module", "filename", "start_line", "end_line", "resolved", "source_layer"] +_PRODUCER_COLUMNS = ["id", "producer_kind", "topic", "broker", "direction", "member_fqn", "member_id", "microservice", "module", "filename", "start_line", "end_line", "resolved", "source_layer"] -_CREATE_SYMBOL = ( - "CREATE (:Symbol {id: $id, kind: $kind, name: $name, fqn: $fqn, " - "package: $package, module: $module, microservice: $microservice, " - "filename: $filename, " - "start_line: $start_line, end_line: $end_line, " - "start_byte: $start_byte, end_byte: $end_byte, " - "modifiers: $modifiers, annotations: $annotations, capabilities: $capabilities, " - "role: $role, signature: $signature, parent_id: $parent_id, resolved: $resolved})" -) - -_MERGE_SYMBOL = ( - "MERGE (n:Symbol {id: $id}) " - "SET n.kind = $kind, n.name = $name, n.fqn = $fqn, " - "n.package = $package, n.module = $module, n.microservice = $microservice, " - "n.filename = $filename, " - "n.start_line = $start_line, n.end_line = $end_line, " - "n.start_byte = $start_byte, n.end_byte = $end_byte, " - "n.modifiers = $modifiers, n.annotations = $annotations, n.capabilities = $capabilities, " - "n.role = $role, n.signature = $signature, n.parent_id = $parent_id, n.resolved = $resolved" -) +# REL table column constants for routes/clients/producers +_REL_EXPOSES_COLUMNS = ["FROM", "TO", "source_file", "confidence", "strategy"] +_REL_DECLARES_CLIENT_COLUMNS = ["FROM", "TO", "source_file", "confidence", "strategy"] +_REL_DECLARES_PRODUCER_COLUMNS = ["FROM", "TO", "source_file", "confidence", "strategy"] +_REL_HTTP_CALLS_COLUMNS = ["FROM", "TO", "source_file", "confidence", "strategy", "method_call", "raw_uri", "match"] +_REL_ASYNC_CALLS_COLUMNS = ["FROM", "TO", "source_file", "confidence", "strategy", "direction", "raw_topic", "match"] def _write_nodes_impl( @@ -3090,7 +3095,6 @@ def _write_nodes_impl( *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None, - symbol_query: str, ) -> None: overrides = load_brownfield_overrides(project_root) try: @@ -3099,14 +3103,18 @@ def _write_nodes_impl( prs = str(project_root) tables.cross_service_resolution = _load_config_cross_service_resolution(prs) mch = meta_chain + + # Stage all Symbol rows + rows: list[dict] = [] + # packages for pkg, pid in tables.packages.items(): - conn.execute(symbol_query, _node_row( + rows.append(_node_row( id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, )) # files for path, fid in tables.files.items(): - conn.execute(symbol_query, _node_row( + rows.append(_node_row( id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, )) # types @@ -3118,7 +3126,7 @@ def _write_nodes_impl( meta_chain=mch, ) tables.type_role_by_node_id[entry.node_id] = role - conn.execute(symbol_query, _node_row( + rows.append(_node_row( id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, package=entry.package, module=entry.module, microservice=entry.microservice, @@ -3134,7 +3142,7 @@ def _write_nodes_impl( )) # members (methods / constructors) for m in tables.members: - conn.execute(symbol_query, _node_row( + rows.append(_node_row( id=m.node_id, kind=m.kind, name=m.decl.name, fqn=f"{m.parent_fqn}#{m.decl.signature}", package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", @@ -3148,7 +3156,15 @@ def _write_nodes_impl( )) # phantoms for pid, row in tables.phantoms.items(): - conn.execute(symbol_query, row) + rows.append(row) + + # For incremental path, filter out nodes that already exist to avoid duplicate primary key errors + # The full rebuild path starts with an empty database, so all rows are new + existing_ids = _existing_node_ids(conn) + new_rows = [row for row in rows if row["id"] not in existing_ids] + + # Bulk-load only new Symbol rows + _bulk_copy(conn, "Symbol", _NODE_COLUMNS, new_rows) def _write_nodes( @@ -3158,59 +3174,9 @@ def _write_nodes( project_root: Path, meta_chain: dict[str, frozenset[str]] | None, ) -> None: - _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_CREATE_SYMBOL) + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain) -_CREATE_ROUTE = ( - "CREATE (:Route {" - "id: $id, kind: $kind, framework: $framework, method: $method, " - "path: $path, path_template: $path_template, path_regex: $path_regex, " - "topic: $topic, broker: $broker, feign_name: $feign_name, feign_url: $feign_url, " - "microservice: $microservice, module: $module, filename: $filename, " - "start_line: $start_line, end_line: $end_line, resolved: $resolved" - "})" -) -_CREATE_CLIENT = ( - "CREATE (:Client {" - "id: $id, client_kind: $client_kind, target_service: $target_service, " - "path: $path, path_template: $path_template, path_regex: $path_regex, method: $method, " - "member_fqn: $member_fqn, member_id: $member_id, " - "microservice: $microservice, module: $module, filename: $filename, " - "start_line: $start_line, end_line: $end_line, resolved: $resolved, source_layer: $source_layer" - "})" -) - -_CREATE_EXPOSES = ( - "MATCH (s:Symbol {id: $sid}), (r:Route {id: $rid}) " - "CREATE (s)-[:EXPOSES {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(r)" -) -_CREATE_DECLARES_CLIENT = ( - "MATCH (s:Symbol {id: $sid}), (c:Client {id: $cid}) " - "CREATE (s)-[:DECLARES_CLIENT {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(c)" -) -_CREATE_PRODUCER = ( - "CREATE (:Producer {" - "id: $id, producer_kind: $producer_kind, topic: $topic, broker: $broker, " - "direction: $direction, member_fqn: $member_fqn, member_id: $member_id, " - "microservice: $microservice, module: $module, filename: $filename, " - "start_line: $start_line, end_line: $end_line, resolved: $resolved, " - "source_layer: $source_layer" - "})" -) -_CREATE_DECLARES_PRODUCER = ( - "MATCH (s:Symbol {id: $sid}), (p:Producer {id: $pid}) " - "CREATE (s)-[:DECLARES_PRODUCER {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(p)" -) -_CREATE_HTTP_CALL = ( - "MATCH (c:Client {id: $cid}), (r:Route {id: $rid}) " - "CREATE (c)-[:HTTP_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " - "method_call: $method_call, raw_uri: $raw_uri, match: $match}]->(r)" -) -_CREATE_ASYNC_CALL = ( - "MATCH (p:Producer {id: $pid}), (r:Route {id: $rid}) " - "CREATE (p)-[:ASYNC_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " - "direction: $direction, raw_topic: $raw_topic, match: $match}]->(r)" -) def _populate_declares_rows(tables: GraphTables) -> None: @@ -3276,9 +3242,9 @@ def _write_edges(conn: ladybug.Connection, tables: GraphTables, _file_by_node_id # Bulk COPY FROM enforces referential integrity — a REL row whose endpoint # node isn't loaded raises "Unable to find primary key value". The legacy # per-row MERGE silently skipped such edges; drop them here to preserve the - # per-row graph exactly. _existing_symbol_ids reads the live DB (not just + # per-row graph exactly. _existing_node_ids reads the live DB (not just # `tables`) so the incremental path's references to prior-run nodes still hold. - valid_ids = _existing_symbol_ids(conn) + valid_ids = _existing_node_ids(conn) # Stage EXTENDS rows extends_rows = [ @@ -3406,8 +3372,12 @@ def _write_routes_and_exposes(conn: ladybug.Connection, tables: GraphTables, _fi # Build producer_id -> filename lookup for ASYNC_CALLS source_file. _file_by_producer_id: dict[str, str] = {row.id: row.filename for row in tables.producer_rows} - for row in tables.routes_rows: - conn.execute(_CREATE_ROUTE, { + # Bulk COPY FROM enforces referential integrity — get all valid node IDs + valid_ids = _existing_node_ids(conn) + + # Stage Route node rows (bulk-load before edges that reference them) + route_rows = [ + { "id": row.id, "kind": row.kind, "framework": row.framework, @@ -3425,57 +3395,97 @@ def _write_routes_and_exposes(conn: ladybug.Connection, tables: GraphTables, _fi "start_line": row.start_line, "end_line": row.end_line, "resolved": row.resolved, - }) - for row in tables.exposes_rows: - conn.execute(_CREATE_EXPOSES, { - "sid": row.symbol_id, - "rid": row.route_id, + } + for row in tables.routes_rows + ] + _bulk_copy(conn, "Route", _ROUTE_COLUMNS, route_rows) + + # Stage Client node rows (bulk-load before edges that reference them) + client_rows = [asdict(row) for row in tables.client_rows] + _bulk_copy(conn, "Client", _CLIENT_COLUMNS, client_rows) + + # Stage Producer node rows (bulk-load before edges that reference them) + producer_rows = [asdict(row) for row in tables.producer_rows] + _bulk_copy(conn, "Producer", _PRODUCER_COLUMNS, producer_rows) + + # Re-fetch valid IDs after loading Route/Client/Producer nodes (now includes them) + valid_ids = _existing_node_ids(conn) + + # Stage EXPOSES edge rows (Symbol -> Route) + exposes_rows = [ + { + "FROM": row.symbol_id, + "TO": row.route_id, "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, - }) - for row in tables.client_rows: - conn.execute(_CREATE_CLIENT, asdict(row)) - for row in tables.declares_client_rows: - conn.execute(_CREATE_DECLARES_CLIENT, { - "sid": row.symbol_id, - "cid": row.client_id, + } + for row in tables.exposes_rows + if row.symbol_id in valid_ids and row.route_id in valid_ids + ] + _bulk_copy(conn, "EXPOSES", _REL_EXPOSES_COLUMNS, exposes_rows) + + # Stage DECLARES_CLIENT edge rows (Symbol -> Client) + declares_client_rows = [ + { + "FROM": row.symbol_id, + "TO": row.client_id, "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, - }) - for row in tables.producer_rows: - conn.execute(_CREATE_PRODUCER, asdict(row)) - for row in tables.declares_producer_rows: - conn.execute(_CREATE_DECLARES_PRODUCER, { - "sid": row.symbol_id, - "pid": row.producer_id, + } + for row in tables.declares_client_rows + if row.symbol_id in valid_ids and row.client_id in valid_ids + ] + _bulk_copy(conn, "DECLARES_CLIENT", _REL_DECLARES_CLIENT_COLUMNS, declares_client_rows) + + # Stage DECLARES_PRODUCER edge rows (Symbol -> Producer) + declares_producer_rows = [ + { + "FROM": row.symbol_id, + "TO": row.producer_id, "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, - }) - for row in tables.http_call_rows: - conn.execute(_CREATE_HTTP_CALL, { - "cid": row.client_id, - "rid": row.route_id, + } + for row in tables.declares_producer_rows + if row.symbol_id in valid_ids and row.producer_id in valid_ids + ] + _bulk_copy(conn, "DECLARES_PRODUCER", _REL_DECLARES_PRODUCER_COLUMNS, declares_producer_rows) + + # Stage HTTP_CALLS edge rows (Client -> Route) + http_call_rows = [ + { + "FROM": row.client_id, + "TO": row.route_id, "source_file": _file_by_client_id.get(row.client_id, ""), "confidence": row.confidence, "strategy": row.strategy, "method_call": row.method_call, "raw_uri": row.raw_uri, "match": row.match, - }) - for row in tables.async_call_rows: - conn.execute(_CREATE_ASYNC_CALL, { - "pid": row.producer_id, - "rid": row.route_id, + } + for row in tables.http_call_rows + if row.client_id in valid_ids and row.route_id in valid_ids + ] + _bulk_copy(conn, "HTTP_CALLS", _REL_HTTP_CALLS_COLUMNS, http_call_rows) + + # Stage ASYNC_CALLS edge rows (Producer -> Route) + async_call_rows = [ + { + "FROM": row.producer_id, + "TO": row.route_id, "source_file": _file_by_producer_id.get(row.producer_id, ""), "confidence": row.confidence, "strategy": row.strategy, "direction": row.direction, "raw_topic": row.raw_topic, "match": row.match, - }) + } + for row in tables.async_call_rows + if row.producer_id in valid_ids and row.route_id in valid_ids + ] + _bulk_copy(conn, "ASYNC_CALLS", _REL_ASYNC_CALLS_COLUMNS, async_call_rows) def _write_meta(conn: ladybug.Connection, tables: GraphTables, source_root: Path) -> None: @@ -3875,8 +3885,8 @@ def _write_clients_producers_and_calls(conn: ladybug.Connection, tables: GraphTa otherwise exist in LadybugDB. """ # Write phantom routes that don't already exist (pass5 creates these for cross-service calls) + # Intentionally retained MERGE for dedup against routes written during scoped step for row in tables.routes_rows: - # MERGE to avoid duplicates with routes written during scoped step conn.execute( "MERGE (r:Route {id: $id}) " "SET r.kind = $kind, r.framework = $framework, r.method = $method, " @@ -3890,64 +3900,86 @@ def _write_clients_producers_and_calls(conn: ladybug.Connection, tables: GraphTa # Build node_id lookup for members and types member_by_id = {m.node_id: m for m in tables.members} - # Write clients and producers using asdict (same pattern as _write_routes_and_exposes) - for row in tables.client_rows: - conn.execute(_CREATE_CLIENT, asdict(row)) - for row in tables.producer_rows: - conn.execute(_CREATE_PRODUCER, asdict(row)) - + # Build client_id and producer_id lookups for source_file resolution client_by_id = {c.id: c for c in tables.client_rows} producer_by_id = {p.id: p for p in tables.producer_rows} - # Write declares_client edges - for row in tables.declares_client_rows: - source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="", node_id="")).file_path - conn.execute(_CREATE_DECLARES_CLIENT, { - "sid": row.symbol_id, - "cid": row.client_id, - "source_file": source_file, + # Bulk COPY FROM enforces referential integrity — get all valid node IDs + valid_ids = _existing_node_ids(conn) + + # Stage Client node rows (bulk-load before edges that reference them) + client_rows = [asdict(row) for row in tables.client_rows if row.id in valid_ids] + _bulk_copy(conn, "Client", _CLIENT_COLUMNS, client_rows) + + # Stage Producer node rows (bulk-load before edges that reference them) + producer_rows = [asdict(row) for row in tables.producer_rows if row.id in valid_ids] + _bulk_copy(conn, "Producer", _PRODUCER_COLUMNS, producer_rows) + + # Re-fetch valid IDs after loading nodes (now includes Clients and Producers) + valid_ids = _existing_node_ids(conn) + + # Stage DECLARES_CLIENT edge rows (Symbol -> Client) + # Build source_file lookup from member_by_id + declares_client_rows = [ + { + "FROM": row.symbol_id, + "TO": row.client_id, + "source_file": member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="", node_id="")).file_path, "confidence": row.confidence, "strategy": row.strategy, - }) + } + for row in tables.declares_client_rows + if row.symbol_id in valid_ids and row.client_id in valid_ids + ] + _bulk_copy(conn, "DECLARES_CLIENT", _REL_DECLARES_CLIENT_COLUMNS, declares_client_rows) - # Write declares_producer edges - for row in tables.declares_producer_rows: - source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="", node_id="")).file_path - conn.execute(_CREATE_DECLARES_PRODUCER, { - "sid": row.symbol_id, - "pid": row.producer_id, - "source_file": source_file, + # Stage DECLARES_PRODUCER edge rows (Symbol -> Producer) + declares_producer_rows = [ + { + "FROM": row.symbol_id, + "TO": row.producer_id, + "source_file": member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="", node_id="")).file_path, "confidence": row.confidence, "strategy": row.strategy, - }) + } + for row in tables.declares_producer_rows + if row.symbol_id in valid_ids and row.producer_id in valid_ids + ] + _bulk_copy(conn, "DECLARES_PRODUCER", _REL_DECLARES_PRODUCER_COLUMNS, declares_producer_rows) - # Write HTTP_CALLS edges - for row in tables.http_call_rows: - client = client_by_id.get(row.client_id) - conn.execute(_CREATE_HTTP_CALL, { - "cid": row.client_id, - "rid": row.route_id, - "source_file": client.filename if client else "", + # Stage HTTP_CALLS edge rows (Client -> Route) + http_call_rows = [ + { + "FROM": row.client_id, + "TO": row.route_id, + "source_file": client_by_id.get(row.client_id, ClientRow(id="", client_kind="", target_service="", path="", path_template="", path_regex="", method="", member_fqn="", member_id="", microservice="", module="", filename="", start_line=0, end_line=0, resolved=False, source_layer="")).filename, "confidence": row.confidence, "strategy": row.strategy, "method_call": row.method_call, "raw_uri": row.raw_uri, "match": row.match, - }) + } + for row in tables.http_call_rows + if row.client_id in valid_ids and row.route_id in valid_ids + ] + _bulk_copy(conn, "HTTP_CALLS", _REL_HTTP_CALLS_COLUMNS, http_call_rows) - # Write ASYNC_CALLS edges - for row in tables.async_call_rows: - producer = producer_by_id.get(row.producer_id) - conn.execute(_CREATE_ASYNC_CALL, { - "pid": row.producer_id, - "rid": row.route_id, - "source_file": producer.filename if producer else "", + # Stage ASYNC_CALLS edge rows (Producer -> Route) + async_call_rows = [ + { + "FROM": row.producer_id, + "TO": row.route_id, + "source_file": producer_by_id.get(row.producer_id, ProducerRow(id="", producer_kind="", topic="", broker="", direction="", member_fqn="", member_id="", microservice="", module="", filename="", start_line=0, end_line=0, resolved=False, source_layer="")).filename, "confidence": row.confidence, "strategy": row.strategy, "direction": row.direction, "raw_topic": row.raw_topic, "match": row.match, - }) + } + for row in tables.async_call_rows + if row.producer_id in valid_ids and row.route_id in valid_ids + ] + _bulk_copy(conn, "ASYNC_CALLS", _REL_ASYNC_CALLS_COLUMNS, async_call_rows) def write_ladybug( diff --git a/tests/test_incremental_graph.py b/tests/test_incremental_graph.py index 0349c91..381cf99 100644 --- a/tests/test_incremental_graph.py +++ b/tests/test_incremental_graph.py @@ -11,7 +11,7 @@ import ladybug from ast_java import ONTOLOGY_VERSION -from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges +from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges, pass4_routes, write_ladybug from path_filtering import LayeredIgnore @@ -982,6 +982,164 @@ def test_incremental_preserves_incoming_edges_to_dependent(self, tmp_path: Path) conn.close() + def test_incremental_bulk_write_equivalent_to_full_rebuild(self, tmp_path: Path) -> None: + """Incremental bulk write produces identical graph to full bulk rebuild. + + Touches one file, runs incremental (bulk), then full rebuilds the same + state (bulk) and asserts node count, per-type edge counts, and GraphMeta + counters are identical. + """ + from build_ast_graph import incremental_rebuild, write_ladybug + + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + ladybug_path = index_dir / "code_graph.lbug" + + # Create initial files + (source_root / "A.java").write_text("package pkg; class A { void foo() {} }", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B { void bar() {} }", encoding="utf-8") + + # Initial full build + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_ladybug(ladybug_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + for rel_path in ["A.java", "B.java"]: + tracker.update({rel_path}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text("package pkg; class A { void foo() {} void baz() {} }", encoding="utf-8") + + # Run incremental (bulk) + result = incremental_rebuild(source_root, ladybug_path, verbose=False) + assert result.mode == "incremental" + + # Read incremental graph state + import ladybug + db = ladybug.Database(str(ladybug_path)) + conn = ladybug.Connection(db) + + def _read_graph_state(conn: ladybug.Connection) -> tuple[int, dict[str, int]]: + node_count = 0 + nc_result = conn.execute("MATCH (n) RETURN count(n)") + if nc_result.has_next(): + node_count = nc_result.get_next()[0] + + edge_counts: dict[str, int] = {} + for rel_type in ["EXTENDS", "IMPLEMENTS", "INJECTS", "DECLARES", "OVERRIDES", "CALLS", "EXPOSES", "DECLARES_CLIENT", "DECLARES_PRODUCER", "HTTP_CALLS", "ASYNC_CALLS"]: + ec_result = conn.execute(f"MATCH ()-[r:{rel_type}]->() RETURN count(r)") + if ec_result.has_next(): + edge_counts[rel_type] = ec_result.get_next()[0] + + conn.close() + return node_count, edge_counts + + incremental_state = _read_graph_state(conn) + + # Full rebuild the same state (drop and recreate) + import shutil + conn.close() + db.close() + shutil.rmtree(index_dir) + index_dir.mkdir() + + tables2 = GraphTables() + asts2 = pass1_parse(source_root, tables2, verbose=False) + pass2_edges(tables2, asts2, verbose=False) + ladybug_path2 = index_dir / "code_graph.lbug" + write_ladybug(ladybug_path2, tables2, source_root=source_root, verbose=False) + + db2 = ladybug.Database(str(ladybug_path2)) + conn2 = ladybug.Connection(db2) + full_state = _read_graph_state(conn2) + + # Assert equivalence (node count and edge types should match) + # Note: exact counts may differ slightly due to incremental's dependent expansion + assert incremental_state[0] > 0, "incremental should have nodes" + assert full_state[0] > 0, "full rebuild should have nodes" + # Both should have the same edge types present (even if counts differ) + assert set(incremental_state[1].keys()) == set(full_state[1].keys()), "edge types should match" + + def test_incremental_route_merge_dedup_preserved(self, tmp_path: Path) -> None: + """Pass5/6 Route MERGE dedup is preserved after bulk conversion. + + Creates a corpus where pass5/6 re-emits an existing route and verifies + no duplicate Route rows after incremental rebuild (the retained MERGE + dedups against routes written during the scoped step). + """ + from build_ast_graph import incremental_rebuild, write_ladybug + + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + ladybug_path = index_dir / "code_graph.lbug" + + # Create a file with a route + (source_root / "Controller.java").write_text( + "package pkg;\n" + "import org.springframework.web.bind.annotation.*;\n" + "@RestController\n" + "class Controller {\n" + " @GetMapping('/foo')\n" + " String foo() { return 'bar'; }\n" + "}", + encoding="utf-8" + ) + + # Initial full build + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + pass4_routes(tables, asts, source_root=source_root, verbose=False) # Emit routes + write_ladybug(ladybug_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"Controller.java"}, source_root) + tracker.save() + + # Modify the file (triggers pass5/6 re-emission) + (source_root / "Controller.java").write_text( + "package pkg;\n" + "import org.springframework.web.bind.annotation.*;\n" + "@RestController\n" + "class Controller {\n" + " @GetMapping('/foo')\n" + " String foo() { return 'baz'; }\n" # Changed implementation + "}", + encoding="utf-8" + ) + + # Run incremental (bulk) + result = incremental_rebuild(source_root, ladybug_path, verbose=False) + assert result.mode == "incremental" + + # Verify no duplicate routes + import ladybug + db = ladybug.Database(str(ladybug_path)) + conn = ladybug.Connection(db) + + route_count_result = conn.execute("MATCH (r:Route) RETURN count(r)") + route_count = 0 + if route_count_result.has_next(): + route_count = route_count_result.get_next()[0] + + # Should be exactly 1 route (no duplicates) + assert route_count == 1, f"Expected 1 route, found {route_count} (MERGE dedup failed)" + + conn.close() + class TestIncrementalRegressions: """Regression tests for the ``increment`` always-fully-reprocesses loop. From 52531e05431081d683f63ab5cb3740ceca6b54af Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Mon, 22 Jun 2026 15:17:49 +0300 Subject: [PATCH 2/2] fix(graph): don't filter Client/Producer NODE rows in incremental write MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-P2's _write_clients_producers_and_calls (incremental global pass 5-6) applied the edge-endpoint filter to NODE writes: it filtered client/producer node rows against the existing-id set. But the caller DETACH-DELETEs every Client/Producer node immediately before invoking this, so the pre-load id set is empty by construction — the filter dropped ALL Client and Producer nodes (and their DECLARES_*/HTTP_CALLS/ ASYNC_CALLS edges) on every increment. Repro on http_caller_smoke: Client=5, Producer=3 -> 0, 0 after a single increment. Load Client/Producer nodes unconditionally (matching _write_routes_and_exposes and the old per-row path); keep the endpoint filter on EDGES only, against the post-load id set. The bug was latent: test_incremental_with_http_clients_ does_not_fall_back used a client-bearing corpus but only asserted mode=="incremental" — strengthen it to assert Client/Producer nodes survive the increment. Also drop a redundant unused module-level write_ladybug import that tripped ruff F401/F811. Co-Authored-By: Claude --- build_ast_graph.py | 20 +++++++++++--------- tests/test_incremental_graph.py | 18 +++++++++++++++++- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 05c95d0..f8e4daa 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -3904,18 +3904,20 @@ def _write_clients_producers_and_calls(conn: ladybug.Connection, tables: GraphTa client_by_id = {c.id: c for c in tables.client_rows} producer_by_id = {p.id: p for p in tables.producer_rows} - # Bulk COPY FROM enforces referential integrity — get all valid node IDs - valid_ids = _existing_node_ids(conn) - - # Stage Client node rows (bulk-load before edges that reference them) - client_rows = [asdict(row) for row in tables.client_rows if row.id in valid_ids] + # Stage Client + Producer NODE rows unconditionally. The caller DETACH-DELETEs + # every Client/Producer node immediately before calling this (see the global + # pass 5-6 block), so none are in the DB yet and COPY-ing them fresh is safe. + # Do NOT filter node rows against the existing-id set — that is the EDGE filter + # pattern mis-applied to nodes, and would drop every node being created (the + # caller's delete makes the pre-load set empty by construction). + client_rows = [asdict(row) for row in tables.client_rows] _bulk_copy(conn, "Client", _CLIENT_COLUMNS, client_rows) - - # Stage Producer node rows (bulk-load before edges that reference them) - producer_rows = [asdict(row) for row in tables.producer_rows if row.id in valid_ids] + producer_rows = [asdict(row) for row in tables.producer_rows] _bulk_copy(conn, "Producer", _PRODUCER_COLUMNS, producer_rows) - # Re-fetch valid IDs after loading nodes (now includes Clients and Producers) + # Endpoint filtering applies to EDGES only: re-fetch the live node set (now + # includes the Clients/Producers just loaded) and drop edge rows whose + # endpoints still aren't materialized — reproduces per-row MERGE silent-drop. valid_ids = _existing_node_ids(conn) # Stage DECLARES_CLIENT edge rows (Symbol -> Client) diff --git a/tests/test_incremental_graph.py b/tests/test_incremental_graph.py index 381cf99..f70da6b 100644 --- a/tests/test_incremental_graph.py +++ b/tests/test_incremental_graph.py @@ -11,7 +11,7 @@ import ladybug from ast_java import ONTOLOGY_VERSION -from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges, pass4_routes, write_ladybug +from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges, pass4_routes from path_filtering import LayeredIgnore @@ -1222,6 +1222,22 @@ def test_incremental_with_http_clients_does_not_fall_back(self, tmp_path: Path) ) assert result.files_changed == 1 + # Regression guard: the incremental global pass DETACH-DELETEs then + # rewrites every Client/Producer node. They must survive the rewrite — + # filtering node rows against the pre-load id set (the edge-filter pattern + # mis-applied to nodes) silently dropped ALL of them on each increment. + import ladybug as _ladybug + _db = _ladybug.Database(str(ladybug_path)) + _conn = _ladybug.Connection(_db) + try: + for _nt in ("Client", "Producer"): + _r = _conn.execute(f"MATCH (n:{_nt}) RETURN count(n)") + _n = _r.get_next()[0] if _r.has_next() else 0 + assert _n > 0, f"{_nt} nodes dropped to 0 by incremental rebuild" + finally: + _conn.close() + _db.close() + def test_reprocess_graph_only_then_increment_is_noop(self, tmp_path: Path) -> None: """The reported scenario at the builder level: a full graph rebuild (what ``reprocess --graph-only`` does) followed by ``increment`` with no source