Skip to content

adi2355/multi-source-rag-pipeline

Repository files navigation

Terminal header

Overview

A multi-source Retrieval-Augmented Generation pipeline that ingests AI/ML knowledge from Instagram video transcripts, ArXiv research papers, and GitHub repositories, then organizes it into a searchable knowledge system with concept-level understanding. The pipeline spans seven layers: collection, processing, storage, knowledge extraction, vector embedding, hybrid retrieval, and LLM-powered answer generation.

The architecture treats each content source as a first-class data stream with its own ingestion, processing, and normalization path, converging into a SQLite store for structured content, FTS5 keyword indexes, and a concept knowledge graph, while 768-dimensional sentence-transformer embeddings are served through Databricks Vector Search. Retrieval combines Databricks Vector Search with FTS5 keyword matching through an adaptive weighting system that classifies each query and adjusts strategy in real time. Generation routes LLM calls through a Databricks AI Gateway for cost governance and policy guardrails, and every retrieval and prompt version is tracked in MLflow against a precision,, recall, NDCG, MRR, answer latency, and hallucination-risk evaluation harness.


Technology Stack

Languages Python SQL JavaScript
AI / LLM Claude Mistral Whisper
ML / Embeddings Sentence Transformers PyTorch NumPy scikit-learn
Platform / LLMOps Databricks Vector Search Databricks AI Gateway MLflow
Data & Storage SQLite FTS5 NetworkX
Web & API Flask Swagger Plotly

Engineering Principles

1. Source-Agnostic Convergence

Every content source -- Instagram video transcripts, ArXiv research papers, GitHub repositories -- enters through its own specialized ingestion path but converges into a single unified schema (ai_content) with normalized metadata. Downstream systems (embedding, search, knowledge extraction) operate on this unified representation without knowledge of the original source.

Goal: Add a new content source by implementing one collector and one normalizer, with zero changes to retrieval, embedding, or knowledge graph logic.

2. Hybrid Retrieval Over Single-Strategy Search

Pure vector search misses exact-match terminology. Pure keyword search misses semantic similarity. The retrieval layer combines Databricks Vector Search (serving 768-dim sentence-transformer embeddings) with SQLite FTS5 keyword matching through an adaptive weighting system that classifies each query (code, factual, conceptual) and adjusts the vector-to-keyword balance in real time. A feedback loop learns optimal weights from user search interactions.

Goal: Every query type -- exact code snippets, broad conceptual questions, specific factual lookups -- returns relevant results without manual tuning.

3. Knowledge-First Architecture

Raw documents are not just stored and embedded -- they are distilled into a structured concept graph using LLM-powered extraction. Concepts, their categories (algorithm, model, technique, framework), and weighted relationships form a NetworkX graph that supports centrality analysis, community detection, and concept-aware retrieval.

Goal: Answer questions about relationships between ideas, not just questions about individual documents.

4. Measurable Retrieval Quality

The system includes an MLflow-tracked evaluation harness that records precision, recall, F1, NDCG, MRR, answer latency, and hallucination-risk scores across prompt and retrieval versions. Test queries are generated programmatically from knowledge graph concepts, ensuring evaluation coverage tracks the actual knowledge base, and every run is logged as an MLflow experiment so regressions are caught before they ship.

Goal: Every retrieval change is measured against a reproducible, versioned benchmark -- not validated by subjective impression.

5. Governed and Cost-Efficient LLM Integration

All LLM calls -- synthesis, concept extraction, summarization -- are routed through a Databricks AI Gateway that enforces policy guardrails, centralizes cost governance across models, and captures per-request telemetry for downstream MLflow tracking. On top of the gateway, summarization uses the Claude Message Batches API (up to 100 items per batch, asynchronous polling), achieving approximately 50% cost reduction compared to sequential API calls. OCR uses Mistral AI for PDF text extraction with automatic chunking for large documents and PyPDF2 fallback for robustness.

Goal: Process large document collections at scale with predictable spend and auditable policy enforcement, not proportional cost scaling.


Pipeline Architecture

The system operates as a seven-layer pipeline. Data flows from collection through processing, storage, knowledge extraction, and embedding before reaching the retrieval and generation layers.

┌─────────────────────────────────────────────────────────────────────────┐
│                           Data Collection                               │
│   Instagram (instaloader)  ·  ArXiv (arxiv API)  ·  GitHub (REST API)  │
└────────────────────────────────┬────────────────────────────────────────┘
                                 │
┌────────────────────────────────▼────────────────────────────────────────┐
│                          Data Processing                                │
│   Whisper Transcription  ·  Mistral OCR  ·  Claude Batch Summaries     │
└────────────────────────────────┬────────────────────────────────────────┘
                                 │
┌────────────────────────────────▼────────────────────────────────────────┐
│                        SQLite Unified Store                             │
│   ai_content  ·  source-specific tables  ·  FTS5 virtual tables        │
└───────────┬────────────────────┬────────────────────┬───────────────────┘
            │                    │                    │
┌───────────▼──────┐  ┌─────────▼──────────┐  ┌─────▼─────────────────┐
│  Knowledge       │  │  Databricks         │  │  Hybrid Retrieval     │
│  Extraction      │  │  Vector Search      │  │  Vector + FTS5 fusion │
│  Concepts +      │  │  768-dim, overlap   │  │  Adaptive weighting   │
│  Graph           │  │  chunking           │  │  Feedback learning    │
└───────────┬──────┘  └─────────┬──────────┘  └─────┬─────────────────┘
            │                   │                    │
┌───────────▼───────────────────▼────────────────────▼────────────────────┐
│              LLM Response Generation  ·  Databricks AI Gateway          │
│   Context selection  ·  Source citations  ·  Policy guardrails          │
│   Cost governance  ·  MLflow-tracked prompt and retrieval versions      │
└─────────────────────────────────────────────────────────────────────────┘

Multi-Source Ingestion

Platform Video Pipeline

The scraper uses instaloader with proxy rotation, account credential cycling, and rate-limit detection with configurable cooldown periods. Account state is tracked persistently in JSON files. Audio is extracted and transcribed locally using OpenAI Whisper, producing timestamped transcript segments.

ArXiv Research Papers

Papers are collected via the arxiv API with configurable search queries and date ranges. PDF text is extracted using the Mistral AI OCR API with automatic chunking for large documents. A PyPDF2 fallback ensures extraction succeeds when the OCR API is unavailable. Papers enter a download-only mode for batch collection, followed by a separate processing phase.

GitHub Repositories

Public repositories are collected via the GitHub REST API. Repository metadata, README content, file structure, and primary language information are normalized into the unified content schema.

Guarantee: Each source operates independently -- an Instagram rate-limit event does not block ArXiv paper processing or GitHub collection.


Knowledge Graph Engine

The concept extraction pipeline uses Claude to identify concepts from processed content, classify them by category (algorithm, model, technique, framework, concept, tool, dataset, metric), and extract weighted relationships with confidence scores. The resulting graph is built and analyzed using NetworkX.

Graph capabilities:

  • PageRank centrality analysis for identifying foundational concepts
  • Community detection for discovering concept clusters
  • Subgraph extraction around specific topics
  • Interactive Plotly visualization and static Matplotlib rendering
  • GEXF and JSON export for external analysis tools

Guarantee: The knowledge graph is a queryable, structured representation of the knowledge base -- not a visualization artifact.


Hybrid Retrieval System

Embedding Layer

Text is chunked with configurable size (default 1000 characters) and overlap (200 characters), using intelligent boundary detection that respects paragraph breaks, newlines, sentence endings, and word boundaries. Embeddings are generated using multi-qa-mpnet-base-dot-v1 (768 dimensions) from sentence-transformers and indexed in a Databricks Vector Search endpoint that serves approximate-nearest-neighbor queries for the retrieval layer. A TF-IDF hash-based fallback activates when the embedding model or the vector endpoint is unreachable, so retrieval degrades gracefully instead of failing.

Adaptive Search Weighting

The hybrid search layer classifies each query and applies dynamic weights:

Query Type Vector Weight Keyword Weight Trigger
Code queries 0.50 0.50 Code-like tokens detected
Factual queries 0.60 0.40 Specific entity or fact pattern
Conceptual queries 0.80 0.20 Abstract or relationship question
Short queries (1--2 words) -0.10 adjustment +0.10 adjustment Token count <= 2
Exact-match (quoted) -0.20 adjustment +0.20 adjustment Quoted phrase detected

Weights are further refined by a feedback learning loop (search_query_log, search_feedback, weight_patterns tables) that tracks which weight configurations produce the best user-rated results.

Guarantee: Search quality improves over time without manual retuning, driven by observed user interactions.


Hardest Problems Solved

1. Adaptive Retrieval Without Manual Tuning

Problem: A fixed vector-to-keyword weight ratio works well for some query types and poorly for others. Code queries need strong keyword matching; conceptual queries need strong semantic matching. Manual tuning does not scale.

Solution: The hybrid search system classifies each incoming query, applies a base weight configuration for the detected query type, then adjusts further based on query-specific signals (length, quoted phrases, code tokens). A feedback loop records user interactions and learns which weight patterns produce the best results for observed query distributions, progressively refining the default weights.

2. Structured Knowledge from Unstructured Text

Problem: Video transcripts and research papers contain latent concept relationships invisible to keyword and vector search. "Attention mechanism" and "transformer architecture" are deeply related, but a document about one may never mention the other by name.

Solution: The concept extraction pipeline uses Claude to identify concepts, classify them into a controlled taxonomy, and extract explicit relationships with confidence scores and relationship types. The resulting NetworkX graph makes latent relationships queryable -- enabling graph-based retrieval that surfaces documents connected through concept chains, not just direct textual similarity.

3. Governed LLM Calls and Versioned Retrieval Quality

Problem: A RAG pipeline that calls model APIs directly has no central place to enforce policy, track cost across providers, or tie a retrieval regression back to the prompt and index version that caused it. Ad-hoc logging and per-call auth keys do not scale once multiple models, prompt versions, and retrieval strategies are in flight.

Solution: Every outbound LLM call -- answer synthesis, concept extraction, batch summarization -- is routed through a Databricks AI Gateway that centralizes credentials, enforces policy guardrails, and emits per-request telemetry. Each evaluation run (prompt version, retrieval configuration, weighting profile) is logged as an MLflow experiment with precision, recall, NDCG, MRR, answer latency, and hallucination-risk metrics, so every change lands against a versioned benchmark and regressions surface before they ship.


System Domains

Domain Responsibility Key Modules
Ingestion Source-specific collection, rate-limit handling, credential management downloader.py, arxiv_collector.py, github_collector.py
Processing Transcription, OCR, summarization, text normalization transcriber.py, mistral_ocr.py, summarizer.py
Storage Schema management, migrations, unified content table, FTS indexes create_db.sql, db_migration.py, init_db.py
Knowledge Concept extraction, graph construction, centrality analysis concept_extractor.py, knowledge_graph.py
Embedding Text chunking, vector generation, Databricks Vector Search indexing chunking.py, embeddings.py, generate_embeddings.py
Retrieval Databricks Vector Search, FTS5 keyword search, hybrid fusion, adaptive weighting vector_search.py, hybrid_search.py, context_builder.py
Generation LLM context assembly, response generation, source citation, Databricks AI Gateway routing llm_integration.py, context_builder.py
Evaluation Retrieval metrics, answer quality, test generation, MLflow experiment tracking evaluation/*.py
Web Flask interface, REST API, Swagger documentation app.py, api/*.py

Deep Dive: Technical Documentation

Document Focus Area
RAG Pipeline End-to-end RAG usage, CLI commands, query API
Knowledge Graph Concept extraction, graph analysis, visualization
Vector and Hybrid Search Embedding generation, search strategies, adaptive weighting
ArXiv Collector Paper collection, OCR pipeline, batch processing
Application Guide Installation, configuration, CLI usage, web interface

Architectural Patterns

Pattern Implementation
Source-Agnostic Schema Unified ai_content table with source-specific metadata in dedicated tables; downstream consumers are source-blind
Adaptive Weighting Query classification, base weights, signal adjustments, feedback-refined weights via weight_patterns
Concept Knowledge Graph LLM extraction into typed nodes and weighted edges, NetworkX analysis, queryable graph structure
Managed Vector Retrieval 768-dim sentence-transformer embeddings served via Databricks Vector Search for approximate-nearest-neighbor queries
Gateway-Mediated LLM Calls All model traffic routed through Databricks AI Gateway for credentials, policy guardrails, and unified cost governance
Batch LLM Processing Claude Message Batches API with async polling, UUID tracking, 50% cost reduction over sequential calls
Graceful Degradation Mistral OCR with PyPDF2 fallback; sentence-transformers with TF-IDF hash fallback; partial progress preservation
MLflow-Tracked Evaluation Prompt and retrieval versions logged as experiments; precision, recall, NDCG, MRR, answer latency, hallucination-risk tracked across runs

Evaluation Framework

The evaluation suite generates test queries programmatically from knowledge graph concepts, ensuring coverage evolves with the knowledge base. Every run is logged as an MLflow experiment -- tagged with the prompt version, retrieval configuration, and weighting profile -- so retrieval and prompt changes are measured against a versioned benchmark rather than a subjective impression. Metrics computed across search strategies:

Metric Purpose
Precision@k Fraction of retrieved results that are relevant
Recall@k Fraction of relevant results that are retrieved
F1@k Harmonic mean of precision and recall
NDCG Normalized discounted cumulative gain -- measures ranking quality
MRR Mean reciprocal rank -- measures position of first relevant result
Answer Latency End-to-end p50/p95 latency from query receipt to final token, tracked per retrieval and prompt version
Hallucination Risk Claim-level groundedness score over synthesized answers, flagging spans without retrieved-context support

Results are viewable through an interactive evaluation dashboard and the MLflow UI, with runs comparable side-by-side across prompt and retrieval versions.


Folder Structure


src/
├── run.py                          --- CLI entry point
├── app.py                          --- Flask web interface
├── downloader.py                   --- Instagram scraper, proxy rotation, rate limiting
├── transcriber.py                  --- Whisper audio transcription
├── summarizer.py                   --- Claude batch summarization
├── arxiv_collector.py              --- ArXiv paper collection + Mistral OCR
├── github_collector.py             --- GitHub repository collection
├── mistral_ocr.py                  --- Mistral AI OCR wrapper
│
├── embeddings.py                   --- Sentence-transformers embedding generation, VS upsert fan-out
├── generate_embeddings.py          --- Batch embedding orchestration
├── vector_search.py                --- Vector similarity (managed VS + local cosine dispatch)
├── hybrid_search.py                --- Hybrid retrieval (native VS hybrid + local FTS5 fusion)
├── chunking.py                     --- Text chunking with overlap
├── context_builder.py              --- RAG context selection and formatting
├── llm_integration.py              --- Claude response generation
├── llm_client.py                   --- Unified LLM call surface (Gateway-first, SDK fallback)
├── ai_gateway_client.py            --- Mosaic AI Gateway client (external-model routes)
├── databricks_vector_client.py     --- Mosaic AI Vector Search client (Direct Access Index)
├── governance_metrics.py           --- Fallback-activation counter for observability
│
├── concept_extractor.py            --- LLM-powered concept extraction
├── knowledge_graph.py              --- Graph construction, analysis, visualization
├── concept_schema.sql              --- Knowledge graph schema
│
├── create_db.sql                   --- Database schema
├── db_migration.py                 --- Schema migrations
├── init_db.py                      --- Database initialization
│
├── api/
│   ├── api.py                      --- REST API endpoints
│   ├── api_knowledge.py            --- Knowledge graph API
│   └── swagger.py                  --- OpenAPI specification
│
├── evaluation/
│   ├── retrieval_metrics.py        --- Precision, recall, NDCG, MRR
│   ├── answer_evaluator.py         --- Answer quality evaluation
│   ├── test_queries.py             --- Programmatic test generation
│   ├── test_runner.py              --- Evaluation orchestration (MLflow-wrapped)
│   ├── mlflow_eval.py              --- MLflow experiment + run tracking
│   └── dashboard.py                --- Interactive results dashboard
│
├── templates/                      --- Flask HTML templates
├── data/
│   ├── audio/                      --- Transcribed audio files
│   ├── transcripts/                --- JSON transcript output
│   ├── papers/                     --- ArXiv paper text
│   ├── visualizations/             --- Knowledge graph renders
│   └── summaries_cache/            --- Cached Claude summaries
│
└── requirements.txt                --- Python dependencies
Terminal footer

Releases

No releases published

Packages

 
 
 

Contributors