Skip to content

Plan: Aggressive same-type KG cleanup + graph ranking batch pass #1

@hammertoe

Description

@hammertoe

Context

We need a one-time (and repeatable) cleanup pass for the canonical KG with aggressive deduping and graph-based ranking.

Current profile snapshot from production-like data:

  • Nodes: 33,840 (kg_nodes)
  • Edges: 81,260 (kg_edges)
  • Duplicate triple instances (source_id,predicate,target_id): 8,110
  • Non-allowlist predicates: 29 rows
  • Non-allowlist node types: 8 rows
  • Discourse edges not Person->Person: 8,973 / 10,805

Decision from product direction:

  • Aggressive cleanup mode
  • Same-type merges only (after normalization)

Goal

Build a deterministic, auditable batch pipeline that:

  1. Cleans schema/type/predicate drift
  2. Aggressively deduplicates nodes within type
  3. Rewrites + collapses duplicate edges with provenance preservation
  4. Computes node and edge ranking scores (PageRank-style)
  5. Exports clean artifacts and loads into a fresh Postgres instance
  6. Integrates ranking into retrieval ordering

Non-Goals

  • Cross-type merges (explicitly out for this pass)
  • Real-time streaming cleanup (batch/offline only)
  • Human-in-the-loop UI in first iteration (audit files only)
  • Full ML training pipeline (Splink/PyKEEN/etc.) in v1

Deliverables

  • New script: scripts/kg_cleanup_pass.py
  • New module(s):
    • lib/knowledge_graph/cleanup/contracts.py
    • lib/knowledge_graph/cleanup/normalize.py
    • lib/knowledge_graph/cleanup/candidates.py
    • lib/knowledge_graph/cleanup/cluster.py
    • lib/knowledge_graph/cleanup/rewrite.py
    • lib/knowledge_graph/cleanup/rank.py
    • lib/knowledge_graph/cleanup/export_load.py
  • SQL migration for ranking/support fields
  • Tests for all major phases
  • Runbook in docs for dry-run -> export -> load -> verify

Proposed Data Model Changes

kg_nodes

  • pagerank_score DOUBLE PRECISION NULL
  • merge_cluster_id TEXT NULL
  • merged_from_count INTEGER NOT NULL DEFAULT 1

kg_edges

  • support_count INTEGER NOT NULL DEFAULT 1
  • edge_weight DOUBLE PRECISION NULL
  • edge_rank_score DOUBLE PRECISION NULL

Optional audit tables (or artifact files if preferred)

  • kg_node_merge_map(old_node_id, new_node_id, reason, score, run_id)
  • kg_edge_drop_log(edge_id, reason, run_id)

Pipeline Design

Phase 0: Profile + Snapshot

  • Add --mode profile to print and persist baseline metrics JSON
  • Capture counts by type/predicate, drift counts, duplicate-triple ratios
  • Store under artifacts/kg_cleanup/<run_id>/profile_before.json

Phase 1: Contracts + Drift Repair

  • Enforce allowed node types:
    • foaf:Person, schema:Legislation, schema:Organization, schema:Place, skos:Concept
  • Remap known type drift (e.g., foaf:Organization -> schema:Organization, schema:Person -> foaf:Person)
  • Enforce predicate allowlist and remap common drift:
    • e.g. QUESTION -> QUESTIONS, DISAGREE_WITH -> DISAGREES_WITH, AGREE_WITH -> AGREES_WITH, AIMS_TO -> AIMS_TO_REDUCE (only where mapping is explicitly approved)
  • Record all remaps in audit output

Phase 2: Robust Normalization

  • Normalize labels for matching keys (without rewriting canonical labels immediately):
    • Unicode NFKD fold
    • lowercase
    • collapse whitespace
    • punctuation normalization and removal for comparison keys
  • Keep both:
    • display_label (current label)
    • norm_key (matching key)

Phase 3: Aggressive Candidate Generation (same-type only)

  • Blocking by type-specific keys:
    • Person: stripped honorific name key + surname blocks + initials
    • Legislation: bill-number-like tokens, year-stripped title keys
    • Organization/Place/Concept: normalized token blocks + prefix/suffix blocking
  • Candidate expansion with:
    • rapidfuzz similarity
    • embedding cosine (if both embeddings exist)
    • neighborhood overlap (Jaccard over typed neighbors)

Phase 4: Merge Scoring + Clustering

  • Merge score (initial default):
    • 0.45 * label_sim + 0.30 * embedding_sim + 0.15 * neighbor_jaccard + 0.10 * alias_overlap
  • Hard guards:
    • never cross type
    • block generic labels from auto-merge without structural support (government, member, bill, etc.)
  • Cluster with union-find over candidate pairs above threshold
  • Canonical node choice:
    1. prefer speaker_* for Person
    2. higher degree
    3. richer alias/evidence footprint
    4. deterministic tie-break (id lexical)
  • Emit node_merge_map

Phase 5: Edge Rewrite + Deduplication

  • Rewrite source_id/target_id via merge map
  • Drop invalid edges where endpoints missing
  • Enforce discourse constraint in cleanup output:
    • RESPONDS_TO, AGREES_WITH, DISAGREES_WITH, QUESTIONS should be Person->Person
    • non-conforming edges are dropped or quarantined to audit log
  • Collapse duplicate edges by key:
    • (source_id, predicate, target_id, youtube_video_id)
  • Aggregate provenance:
    • earliest timestamp/seconds = min
    • utterance_ids = stable union
    • evidence = best/longest anchored evidence
    • confidence = aggregate (max + support-aware blend)
    • support_count = number of merged rows

Phase 6: Graph Ranking (In Memory)

  • Build weighted directed graph in NetworkX
  • Node score: weighted PageRank
  • Edge score components:
    • support_count (normalized)
    • confidence
    • predicate prior weight (configurable)
    • endpoint PageRank blend
  • Initial formula:
    • edge_weight = 0.50*support_norm + 0.35*confidence + 0.15*predicate_prior
    • edge_rank_score = edge_weight * avg(PR(source), PR(target)) * log1p(support_count)

Phase 7: Export + Fresh DB Load

  • Export cleaned artifacts:
    • kg_nodes_clean.csv
    • kg_edges_clean.csv
    • kg_aliases_clean.csv
    • node_merge_map.csv
    • edge_drop_log.csv
    • metrics_after.json
  • Load into fresh DB instance/schema using COPY/batch insert
  • Rebuild indexes and analyze tables

Phase 8: Retrieval Integration

  • Update retrieval ordering in Graph-RAG path to prefer rank score:
    • currently in lib/kg_hybrid_graph_rag.py edge ordering uses confidence/time
    • switch to edge_rank_score DESC NULLS LAST, confidence DESC NULLS LAST, earliest_seconds ASC

CLI Sketch

python scripts/kg_cleanup_pass.py --mode profile
python scripts/kg_cleanup_pass.py --mode dry-run --aggressive --same-type-only
python scripts/kg_cleanup_pass.py --mode export --aggressive --same-type-only --out-dir artifacts/kg_cleanup/<run_id>
python scripts/kg_cleanup_pass.py --mode load --input-dir artifacts/kg_cleanup/<run_id> --target-db-url <fresh_db>
python scripts/kg_cleanup_pass.py --mode verify --target-db-url <fresh_db>

Testing Plan (TDD)

  1. Red/green tests for each pure module:
    • normalization
    • predicate/type remap
    • candidate generation/blocking
    • merge scoring and guardrails
    • union-find clustering determinism
    • edge rewrite + provenance aggregation
    • ranking determinism on fixture graph
  2. Integration test with synthetic noisy fixture:
    • injected duplicates, drift predicates/types, and discourse violations
    • assert expected merge counts + edge collapse + score fields
  3. Script-level dry-run test:
    • no DB mutation in dry-run
    • artifact files generated

Acceptance Criteria

  • Deterministic output for same input snapshot
  • All output nodes have allowlist type
  • All output predicates in allowlist
  • Discourse predicates are Person->Person only in clean output
  • Duplicate triple instances reduced by >= 80%
  • Merge map and drop log present and non-empty when applicable
  • Ranking fields populated for >= 99% of nodes/edges
  • Graph-RAG retrieval uses rank-aware ordering

Rollout Plan

  1. Implement + unit tests
  2. Run profile on current DB and save baseline
  3. Run dry-run and inspect merge/drop reports
  4. Tune thresholds/guards once
  5. Run export
  6. Load into fresh DB
  7. Validate acceptance criteria + spot-check critical entities
  8. Switch read path to fresh DB
  9. Keep old DB as rollback source until sign-off

Risks + Mitigations

  • Over-merging generic concepts
    • Mitigation: generic-label guardrails + structural threshold floor
  • Wrong predicate remap
    • Mitigation: explicit remap table only; unknowns quarantined not auto-remapped
  • Retrieval regressions
    • Mitigation: A/B compare top-K edge sets before cutover
  • Runtime/memory spikes
    • Mitigation: typed partition processing + bounded candidate generation

Task Checklist

  • Add cleanup module scaffolding and CLI
  • Add migration for ranking/support columns
  • Implement profile mode and baseline metrics
  • Implement contracts + remap layer
  • Implement same-type aggressive dedupe pipeline
  • Implement edge rewrite/collapse with provenance aggregation
  • Implement PageRank + edge rank scoring
  • Implement export/load/verify modes
  • Add/green unit and integration tests
  • Update Graph-RAG retrieval ordering
  • Write operator runbook and execute first full dry-run

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions