Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions knowledge_base/workflow_vector_search_ingestion_for_rag/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Databricks
.databricks/

# Python
build/
dist/
__pycache__/
*.egg-info
.venv/
*.py[cod]

# Local configuration (keep your settings private)
databricks.local.yml

# IDE
.idea/
.vscode/
.DS_Store

# Scratch/temporary files
scratch/**
!scratch/README.md

# Test documents (don't commit large PDFs)
*.pdf
*.png
*.jpg
*.jpeg
163 changes: 163 additions & 0 deletions knowledge_base/workflow_vector_search_ingestion_for_rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# AI Document Processing Workflow for RAG with Structured Streaming

A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, chunking, and Databricks Workflows with Structured Streaming.

## Overview

This example shows how to build an incremental workflow that:
1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
2. **Extracts** clean text with incremental processing
3. **Chunks** text into smaller pieces suitable for embedding
4. **Indexes** chunks using Databricks Vector Search for RAG applications

All stages run as Python notebook tasks in a Databricks Workflow using Structured Streaming with serverless compute.

## Architecture

```
Source Documents (UC Volume)
Task 1: ai_parse_document → parsed_documents_raw (variant)
Task 2: text extraction → parsed_documents_text (string)
Task 3: text chunking → parsed_documents_text_chunked (string)
Task 4: vector search index → Vector Search Index
```

### Key Features

- **Incremental processing**: Only new files are processed using Structured Streaming checkpoints
- **Serverless compute**: Runs on serverless compute for cost efficiency
- **Task dependencies**: Sequential execution with automatic dependency management
- **Parameterized**: Catalog, schema, volumes, and table names configurable via variables
- **Error handling**: Gracefully handles parsing failures
- **Token-aware chunking**: Smart text splitting based on embedding model tokenization
- **Change Data Feed**: Enables efficient incremental updates through the pipeline

## Prerequisites

- Databricks workspace with Unity Catalog
- Databricks CLI v0.218.0+
- Unity Catalog volumes for:
- Source documents (PDFs/images)
- Parsed output images
- Streaming checkpoints
- Chunking cache
- AI functions (`ai_parse_document`)
- Embedding model endpoint (e.g., `databricks-gte-large-en`)
- Vector Search endpoint (or it will be created automatically)

## Quick Start

1. **Install and authenticate**
```bash
databricks auth login --host https://your-workspace.cloud.databricks.com
```

2. **Configure** `databricks.yml` with your workspace settings

3. **Validate** the bundle configuration
```bash
databricks bundle validate
```

4. **Deploy**
```bash
databricks bundle deploy
```

5. **Upload documents** to your source volume

6. **Run workflow** from the Databricks UI (Workflows)

## Configuration

Edit `databricks.yml`:

```yaml
variables:
catalog: main # Your catalog
schema: default # Your schema
source_volume_path: /Volumes/main/default/source_documents # Source PDFs
output_volume_path: /Volumes/main/default/parsed_output # Parsed images
checkpoint_base_path: /tmp/checkpoints/ai_parse_workflow # Checkpoints
chunking_cache_location: /tmp/cache/chunking # Chunking cache
raw_table_name: parsed_documents_raw # Table names
text_table_name: parsed_documents_text
chunked_table_name: parsed_documents_text_chunked
vector_index_name: parsed_documents_vector_index
embedding_model_endpoint: databricks-gte-large-en # Embedding model
vector_search_endpoint_name: vector-search-shared-endpoint # Vector Search endpoint
```

## Workflow Tasks

### Task 1: Document Parsing
**File**: `src/transformations/01_parse_documents.py`

Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images:
- Reads files from volume using Structured Streaming
- Stores variant output with bounding boxes
- Incremental: checkpointed streaming prevents reprocessing
- Supports PDF, JPG, JPEG, PNG files

### Task 2: Text Extraction
**File**: `src/transformations/02_extract_text.py`

Extracts clean concatenated text using `transform()`:
- Reads from previous task's table via streaming
- Handles both parser v1.0 and v2.0 formats
- Uses `transform()` for efficient text extraction
- Includes error handling for failed parses
- Enables Change Data Feed (CDF) on output table

### Task 3: Text Chunking
**File**: `src/transformations/03_chunk_text.py`

Chunks text into smaller pieces suitable for embedding:
- Reads from text table via streaming using CDF
- Uses LangChain's RecursiveCharacterTextSplitter
- Token-aware chunking based on embedding model
- Supports multiple embedding models (GTE, BGE, OpenAI)
- Configurable chunk size and overlap
- Creates MD5 hash as chunk_id for deduplication
- Enables Change Data Feed (CDF) on output table

### Task 4: Vector Search Index
**File**: `src/transformations/04_vector_search_index.py`

Creates and manages Databricks Vector Search index:
- Creates or validates Vector Search endpoint
- Creates Delta Sync index from chunked text table
- Automatically generates embeddings using specified model
- Triggers index sync to process new/updated chunks
- Uses triggered pipeline type for on-demand updates
- Robust error handling and status checking

## Project Structure

```
.
├── databricks.yml # Bundle configuration
├── requirements.txt # Python dependencies
├── resources/
│ └── vector_search_ingestion.job.yml # Workflow definition
├── src/
│ └── transformations/
│ ├── 01_parse_documents.py # Parse PDFs/images with ai_parse_document
│ ├── 02_extract_text.py # Extract text from parsed documents
│ ├── 03_chunk_text.py # Chunk text for embedding
│ └── 04_vector_search_index.py # Create Vector Search index
└── README.md
```

## Resources

- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/)
- [Databricks Workflows](https://docs.databricks.com/workflows/)
- [Structured Streaming](https://docs.databricks.com/structured-streaming/)
- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
- [Databricks Vector Search](https://docs.databricks.com/generative-ai/vector-search.html)
- [LangChain Text Splitters](https://python.langchain.com/docs/modules/data_connection/document_transformers/)
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# This is a Databricks asset bundle definition for ai_parse_document_workflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: ai_parse_document_workflow

variables:
catalog:
description: The catalog name for the workflow
default: main
schema:
description: The schema name for the workflow
default: rag
source_volume_path:
description: Source volume path for PDF files
default: /Volumes/main/rag/documents
output_volume_path:
description: Output volume path for processed images
default: /Volumes/main/rag/temp/parsed_output
checkpoint_base_path:
description: Base path for Structured Streaming checkpoints
default: /Volumes/main/rag/temp/checkpoints/ai_parse_workflow
raw_table_name:
description: Table name for raw parsed documents
default: parsed_documents_raw
text_table_name:
description: Table name for extracted text
default: parsed_documents_text
chunk_table_name:
description: Table name for chunked text
default: parsed_documents_text_chunked
embedding_model_endpoint:
description: Embedding model endpoint for chunking and vector search
default: databricks-gte-large-en
chunk_size_tokens:
description: Chunk size in tokens
default: 1024
chunk_overlap_tokens:
description: Chunk overlap in tokens
default: 256
vector_search_endpoint_name:
description: Vector Search endpoint name
default: vector-search-shared-endpoint
vector_search_index_name:
description: Vector Search index name
default: parsed_documents_vector_index
vector_search_primary_key:
description: Primary key column for vector search
default: chunk_id
vector_search_embedding_source_column:
description: Text column for embeddings in vector search
default: chunked_text

include:
- resources/*.yml

targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: https://e2-demo-field-eng.cloud.databricks.com/

prod:
mode: production
workspace:
host: https://e2-demo-field-eng.cloud.databricks.com/
permissions:
- group_name: users
level: CAN_VIEW
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
databricks-vectorsearch
transformers
langchain-text-splitters
tiktoken
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
resources:
jobs:
ai_parse_document_workflow:
name: ai_parse_document_workflow

# Optional: Add a schedule
# schedule:
# quartz_cron_expression: "0 0 * * * ?"
# timezone_id: "UTC"

# Job-level parameters shared across all tasks
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}

environments:
- environment_key: serverless_env
spec:
client: "3"
dependencies:
- "databricks-vectorsearch"

tasks:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/01_parse_documents.py
base_parameters:
source_volume_path: ${var.source_volume_path}
output_volume_path: ${var.output_volume_path}
checkpoint_location: ${var.checkpoint_base_path}/01_parse_documents
table_name: ${var.raw_table_name}

- task_key: extract_text
depends_on:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/02_extract_text.py
base_parameters:
checkpoint_location: ${var.checkpoint_base_path}/02_extract_text
source_table_name: ${var.raw_table_name}
table_name: ${var.text_table_name}

- task_key: chunk_text
depends_on:
- task_key: extract_text
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/03_chunk_text.py
base_parameters:
checkpoint_location: ${var.checkpoint_base_path}/03_chunk_text
chunking_cache_location: ${var.checkpoint_base_path}/03_chunk_text_cache
source_table_name: ${var.text_table_name}
table_name: ${var.chunk_table_name}
embedding_model_endpoint: ${var.embedding_model_endpoint}
chunk_size_tokens: ${var.chunk_size_tokens}
chunk_overlap_tokens: ${var.chunk_overlap_tokens}

- task_key: create_vector_search_index
depends_on:
- task_key: chunk_text
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/04_vector_search_index.py
base_parameters:
source_table_name: ${var.chunk_table_name}
endpoint_name: ${var.vector_search_endpoint_name}
index_name: ${var.vector_search_index_name}
primary_key: ${var.vector_search_primary_key}
embedding_source_column: ${var.vector_search_embedding_source_column}
embedding_model_endpoint: ${var.embedding_model_endpoint}

max_concurrent_runs: 1
Loading