Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 147 additions & 87 deletions build_ast_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pathlib import Path

import ladybug
import pyarrow as pa

from ast_java import (
ONTOLOGY_VERSION,
Expand Down Expand Up @@ -3004,6 +3005,63 @@ def _node_row(**kwargs) -> dict:
return base


def _bulk_copy(conn: ladybug.Connection, table_name: str, columns: list[str], rows: list[dict]) -> None:
"""Bulk-load rows into a node/rel table via in-memory pyarrow COPY FROM.

`columns` fixes column order; for REL tables the first two MUST be the
FROM/TO node primary keys (kuzu requirement). Empty `rows` is a no-op.

Spike result (PR-P1 step-1): REL `COPY FROM` expects columns named `FROM` and `TO`
for the endpoint node IDs, followed by property columns in the declared order.
`pa.Table.from_pylist(rows)` correctly infers types from the dict values.
"""
if not rows:
return
tbl = pa.Table.from_pylist(rows)
conn.execute(f"COPY {table_name} FROM $rows", {"rows": tbl})


def _existing_symbol_ids(conn: ladybug.Connection) -> set[str]:
"""Return every Symbol node id currently in the graph.

Bulk ``COPY FROM`` enforces referential integrity: a REL row whose FROM/TO
endpoint isn't a loaded node raises ``Unable to find primary key value``. The
legacy per-row ``MERGE (a:Symbol {id:$src}),(b:Symbol {id:$dst})`` silently
dropped such edges (a ``MATCH`` against a missing endpoint creates nothing).
``_write_edges`` filters edge rows against this set to reproduce that exactly.

This queries the live DB rather than just ``tables`` because ``_write_edges``
is shared with the incremental path, whose edges legitimately reference nodes
written in prior runs. Both paths call ``_write_nodes`` before ``_write_edges``,
so freshly written nodes are included.
"""
result = conn.execute("MATCH (n:Symbol) RETURN n.id")
ids: set[str] = set()
while result.has_next():
ids.add(result.get_next()[0])
return ids


# Column-order constants for bulk COPY FROM.
# For REL tables, the first two entries are FROM/TO node primary keys (kuzu requirement).
# Order matches the corresponding _SCHEMA_* declarations above.
_NODE_COLUMNS = [
"id", "kind", "name", "fqn", "package", "module", "microservice",
"filename", "start_line", "end_line", "start_byte", "end_byte",
"modifiers", "annotations", "capabilities", "role", "signature", "parent_id", "resolved"
]

_REL_EXTENDS_COLUMNS = ["FROM", "TO", "source_file", "dst_name", "dst_fqn", "resolved"]
_REL_IMPLEMENTS_COLUMNS = ["FROM", "TO", "source_file", "dst_name", "dst_fqn", "resolved"]
_REL_INJECTS_COLUMNS = ["FROM", "TO", "source_file", "dst_name", "dst_fqn", "resolved", "mechanism", "annotation", "field_or_param"]
_REL_DECLARES_COLUMNS = ["FROM", "TO", "source_file"]
_REL_OVERRIDES_COLUMNS = ["FROM", "TO", "source_file"]
_REL_CALLS_COLUMNS = ["FROM", "TO", "source_file", "call_site_line", "call_site_byte", "arg_count", "confidence", "strategy", "source", "resolved", "callee_declaring_role"]

_UNRESOLVED_CALL_SITE_COLUMNS = ["id", "caller_id", "call_site_line", "call_site_byte", "arg_count", "callee_simple", "receiver_expr", "reason"]
_REL_UNRESOLVED_AT_COLUMNS = ["FROM", "TO", "source_file"]


_CREATE_SYMBOL = (
"CREATE (:Symbol {id: $id, kind: $kind, name: $name, fqn: $fqn, "
"package: $package, module: $module, microservice: $microservice, "
Expand Down Expand Up @@ -3103,37 +3161,6 @@ def _write_nodes(
_write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_CREATE_SYMBOL)


_CREATE_EXT = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:EXTENDS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)"
)
_CREATE_IMPL = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:IMPLEMENTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)"
)
_CREATE_INJ = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:INJECTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved, "
"mechanism: $mechanism, annotation: $annotation, field_or_param: $field_or_param}]->(b)"
)
_CREATE_DECL = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:DECLARES {source_file: $source_file}]->(b)"
)
_CREATE_OVERRIDES = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:OVERRIDES {source_file: $source_file}]->(b)"
)
_CREATE_CALL = (
"MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) "
"CREATE (a)-[:CALLS {"
"source_file: $source_file, "
"call_site_line: $line, call_site_byte: $byte, arg_count: $argc, "
"confidence: $conf, strategy: $strat, source: $src_kind, resolved: $resolved, "
"callee_declaring_role: $callee_declaring_role"
"}]->(b)"
)

_CREATE_ROUTE = (
"CREATE (:Route {"
"id: $id, kind: $kind, framework: $framework, method: $method, "
Expand Down Expand Up @@ -3246,93 +3273,126 @@ def _write_edges(conn: ladybug.Connection, tables: GraphTables, _file_by_node_id
if _file_by_node_id is None:
_file_by_node_id = _build_file_by_node_id(tables)

for r in tables.extends_rows:
conn.execute(_CREATE_EXT, {
"src": r.src_id, "dst": r.dst_id,
# Bulk COPY FROM enforces referential integrity — a REL row whose endpoint
# node isn't loaded raises "Unable to find primary key value". The legacy
# per-row MERGE silently skipped such edges; drop them here to preserve the
# per-row graph exactly. _existing_symbol_ids reads the live DB (not just
# `tables`) so the incremental path's references to prior-run nodes still hold.
valid_ids = _existing_symbol_ids(conn)

# Stage EXTENDS rows
extends_rows = [
{
"FROM": r.src_id, "TO": r.dst_id,
"source_file": _file_by_node_id.get(r.src_id, ""),
"dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved,
})
for r in tables.implements_rows:
conn.execute(_CREATE_IMPL, {
"src": r.src_id, "dst": r.dst_id,
}
for r in tables.extends_rows
if r.src_id in valid_ids and r.dst_id in valid_ids
]
_bulk_copy(conn, "EXTENDS", _REL_EXTENDS_COLUMNS, extends_rows)

# Stage IMPLEMENTS rows
implements_rows = [
{
"FROM": r.src_id, "TO": r.dst_id,
"source_file": _file_by_node_id.get(r.src_id, ""),
"dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved,
})
for r in tables.injects_rows:
conn.execute(_CREATE_INJ, {
"src": r.src_id, "dst": r.dst_id,
}
for r in tables.implements_rows
if r.src_id in valid_ids and r.dst_id in valid_ids
]
_bulk_copy(conn, "IMPLEMENTS", _REL_IMPLEMENTS_COLUMNS, implements_rows)

# Stage INJECTS rows
injects_rows = [
{
"FROM": r.src_id, "TO": r.dst_id,
"source_file": _file_by_node_id.get(r.src_id, ""),
"dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved,
"mechanism": r.mechanism, "annotation": r.annotation,
"field_or_param": r.field_or_param,
})
}
for r in tables.injects_rows
if r.src_id in valid_ids and r.dst_id in valid_ids
]
_bulk_copy(conn, "INJECTS", _REL_INJECTS_COLUMNS, injects_rows)

for row in tables.declares_rows:
conn.execute(_CREATE_DECL, {
"src": row.src_id, "dst": row.dst_id,
# Stage DECLARES rows
declares_rows = [
{
"FROM": row.src_id, "TO": row.dst_id,
"source_file": _file_by_node_id.get(row.src_id, ""),
})
}
for row in tables.declares_rows
if row.src_id in valid_ids and row.dst_id in valid_ids
]
_bulk_copy(conn, "DECLARES", _REL_DECLARES_COLUMNS, declares_rows)

for row in tables.overrides_rows:
conn.execute(_CREATE_OVERRIDES, {
"src": row.src_id, "dst": row.dst_id,
# Stage OVERRIDES rows
overrides_rows = [
{
"FROM": row.src_id, "TO": row.dst_id,
"source_file": _file_by_node_id.get(row.src_id, ""),
})
}
for row in tables.overrides_rows
if row.src_id in valid_ids and row.dst_id in valid_ids
]
_bulk_copy(conn, "OVERRIDES", _REL_OVERRIDES_COLUMNS, overrides_rows)

# Stage CALLS rows with dedup and callee_declaring_role materialization
seen_calls: set[tuple[str, str, int, int]] = set()
unique_calls: list[CallsRow] = []
calls_rows: list[dict] = []
member_by_id = {m.node_id: m for m in tables.members}
for row in tables.calls_rows:
if row.src_id not in valid_ids or row.dst_id not in valid_ids:
continue
key = (row.src_id, row.dst_id, row.arg_count, row.call_site_line)
if key not in seen_calls:
seen_calls.add(key)
unique_calls.append(row)

member_by_id = {m.node_id: m for m in tables.members}
for row in unique_calls:
conn.execute(_CREATE_CALL, {
"src": row.src_id, "dst": row.dst_id,
if key in seen_calls:
continue
seen_calls.add(key)
calls_rows.append({
"FROM": row.src_id, "TO": row.dst_id,
"source_file": _file_by_node_id.get(row.src_id, ""),
"line": row.call_site_line,
"byte": row.call_site_byte,
"argc": row.arg_count,
"conf": row.confidence,
"strat": row.strategy,
"src_kind": row.source,
"resolved": row.resolved,
"call_site_line": row.call_site_line, "call_site_byte": row.call_site_byte,
"arg_count": row.arg_count, "confidence": row.confidence, "strategy": row.strategy,
"source": row.source, "resolved": row.resolved,
"callee_declaring_role": _callee_declaring_role_at_write(
tables, row.dst_id, member_by_id=member_by_id,
),
})
_bulk_copy(conn, "CALLS", _REL_CALLS_COLUMNS, calls_rows)

_CREATE_UNRESOLVED = (
"CREATE (:UnresolvedCallSite {"
"id: $id, caller_id: $caller_id, call_site_line: $line, call_site_byte: $byte, "
"arg_count: $argc, callee_simple: $callee, receiver_expr: $recv, reason: $reason"
"})"
)
_CREATE_UNRESOLVED_AT = (
"MATCH (a:Symbol {id: $caller}), (u:UnresolvedCallSite {id: $ucs}) "
"CREATE (a)-[:UNRESOLVED_AT {source_file: $source_file}]->(u)"
)
# Stage UnresolvedCallSite node rows (must load before UNRESOLVED_AT edges)
seen_ucs: set[str] = set()
ucs_rows: list[dict] = []
for row in tables.unresolved_call_site_rows:
if row.id in seen_ucs:
continue
seen_ucs.add(row.id)
conn.execute(_CREATE_UNRESOLVED, {
ucs_rows.append({
"id": row.id,
"caller_id": row.caller_id,
"line": row.call_site_line,
"byte": row.call_site_byte,
"argc": row.arg_count,
"callee": row.callee_simple,
"recv": row.receiver_expr,
"call_site_line": row.call_site_line,
"call_site_byte": row.call_site_byte,
"arg_count": row.arg_count,
"callee_simple": row.callee_simple,
"receiver_expr": row.receiver_expr,
"reason": row.reason,
})
conn.execute(_CREATE_UNRESOLVED_AT, {
"caller": row.caller_id, "ucs": row.id,
"source_file": _file_by_node_id.get(row.caller_id, ""),
})
_bulk_copy(conn, "UnresolvedCallSite", _UNRESOLVED_CALL_SITE_COLUMNS, ucs_rows)

# Stage UNRESOLVED_AT edge rows (one per unique UnresolvedCallSite node)
# Use the same ucs_rows list to ensure 1:1 correspondence
unresolved_at_rows = [
{
"FROM": ucs_row["caller_id"], "TO": ucs_row["id"],
"source_file": _file_by_node_id.get(ucs_row["caller_id"], ""),
}
for ucs_row in ucs_rows
if ucs_row["caller_id"] in valid_ids
]
_bulk_copy(conn, "UNRESOLVED_AT", _REL_UNRESOLVED_AT_COLUMNS, unresolved_at_rows)


def _write_routes_and_exposes(conn: ladybug.Connection, tables: GraphTables, _file_by_node_id: dict[str, str] | None = None) -> None:
Expand Down
Loading
Loading