Skip to content

Commit 585cb40

Browse files
committed
Add workflow_with_ai_parse_document example
This example demonstrates incremental document processing using: - ai_parse_document for extracting structured data from PDFs/images - ai_query for LLM-based content analysis - Databricks Workflows with Structured Streaming and serverless compute Key features: - Python notebooks with Structured Streaming for incremental processing - Serverless compute for cost efficiency - Parameterized workflow with catalog, schema, and table names - Checkpointed streaming to process only new data - Visual debugging notebook with interactive bounding boxes
1 parent 7f2cd8e commit 585cb40

File tree

8 files changed

+1295
-0
lines changed

8 files changed

+1295
-0
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Databricks
2+
.databricks/
3+
4+
# Python
5+
build/
6+
dist/
7+
__pycache__/
8+
*.egg-info
9+
.venv/
10+
*.py[cod]
11+
12+
# Local configuration (keep your settings private)
13+
databricks.local.yml
14+
15+
# IDE
16+
.idea/
17+
.vscode/
18+
.DS_Store
19+
20+
# Scratch/temporary files
21+
scratch/**
22+
!scratch/README.md
23+
24+
# Test documents (don't commit large PDFs)
25+
*.pdf
26+
*.png
27+
*.jpg
28+
*.jpeg
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# AI Document Processing Workflow with Structured Streaming
2+
3+
A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Databricks Workflows with Structured Streaming.
4+
5+
## Overview
6+
7+
This example shows how to build an incremental workflow that:
8+
1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
9+
2. **Extracts** clean text with incremental processing
10+
3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs
11+
12+
All stages run as Python notebook tasks in a Databricks Workflow using Structured Streaming with serverless compute.
13+
14+
## Architecture
15+
16+
```
17+
Source Documents (UC Volume)
18+
19+
Task 1: ai_parse_document → parsed_documents_raw (variant)
20+
21+
Task 2: text extraction → parsed_documents_text (string)
22+
23+
Task 3: ai_query → parsed_documents_structured (json)
24+
```
25+
26+
### Key Features
27+
28+
- **Incremental processing**: Only new files are processed using Structured Streaming checkpoints
29+
- **Serverless compute**: Runs on serverless compute for cost efficiency
30+
- **Task dependencies**: Sequential execution with automatic dependency management
31+
- **Parameterized**: Catalog, schema, volumes, and table names configurable via variables
32+
- **Error handling**: Gracefully handles parsing failures
33+
- **Visual debugging**: Interactive notebook for inspecting results
34+
35+
## Prerequisites
36+
37+
- Databricks workspace with Unity Catalog
38+
- Databricks CLI v0.218.0+
39+
- Unity Catalog volumes for:
40+
- Source documents (PDFs/images)
41+
- Parsed output images
42+
- Streaming checkpoints
43+
- AI functions (`ai_parse_document`, `ai_query`)
44+
45+
## Quick Start
46+
47+
1. **Install and authenticate**
48+
```bash
49+
databricks auth login --host https://your-workspace.cloud.databricks.com
50+
```
51+
52+
2. **Configure** `databricks.yml` with your workspace settings
53+
54+
3. **Validate** the bundle configuration
55+
```bash
56+
databricks bundle validate
57+
```
58+
59+
4. **Deploy**
60+
```bash
61+
databricks bundle deploy
62+
```
63+
64+
5. **Upload documents** to your source volume
65+
66+
6. **Run workflow** from the Databricks UI (Workflows)
67+
68+
## Configuration
69+
70+
Edit `databricks.yml`:
71+
72+
```yaml
73+
variables:
74+
catalog: main # Your catalog
75+
schema: default # Your schema
76+
source_volume_path: /Volumes/main/default/source_documents # Source PDFs
77+
output_volume_path: /Volumes/main/default/parsed_output # Parsed images
78+
checkpoint_base_path: /tmp/checkpoints/ai_parse_workflow # Checkpoints
79+
raw_table_name: parsed_documents_raw # Table names
80+
text_table_name: parsed_documents_text
81+
structured_table_name: parsed_documents_structured
82+
```
83+
84+
## Workflow Tasks
85+
86+
### Task 1: Document Parsing
87+
**File**: `src/transformations/01_parse_documents.py`
88+
89+
Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images:
90+
- Reads files from volume using Structured Streaming
91+
- Stores variant output with bounding boxes
92+
- Incremental: checkpointed streaming prevents reprocessing
93+
94+
### Task 2: Text Extraction
95+
**File**: `src/transformations/02_extract_text.py`
96+
97+
Extracts clean concatenated text using `transform()`:
98+
- Reads from previous task's table via streaming
99+
- Handles both parser v1.0 and v2.0 formats
100+
- Uses `transform()` for efficient text extraction
101+
- Includes error handling for failed parses
102+
103+
### Task 3: AI Query Extraction
104+
**File**: `src/transformations/03_extract_structured_data.py`
105+
106+
Applies LLM to extract structured insights:
107+
- Reads from text table via streaming
108+
- Uses `ai_query` with Claude Sonnet 4
109+
- Customizable prompt for domain-specific extraction
110+
- Outputs structured JSON
111+
112+
## Visual Debugger
113+
114+
The included notebook visualizes parsing results with interactive bounding boxes.
115+
116+
**Open**: `src/explorations/ai_parse_document -- debug output.py`
117+
118+
**Configure widgets**:
119+
- `input_file`: `/Volumes/main/default/source_docs/sample.pdf`
120+
- `image_output_path`: `/Volumes/main/default/parsed_out/`
121+
- `page_selection`: `all` (or `1-3`, `1,5,10`)
122+
123+
**Features**:
124+
- Color-coded bounding boxes by element type
125+
- Hover tooltips showing extracted content
126+
- Automatic image scaling
127+
- Page selection support
128+
129+
## Project Structure
130+
131+
```
132+
.
133+
├── databricks.yml # Bundle configuration
134+
├── resources/
135+
│ └── ai_parse_document_workflow.job.yml
136+
├── src/
137+
│ ├── transformations/
138+
│ │ ├── 01_parse_documents.py
139+
│ │ ├── 02_extract_text.py
140+
│ │ └── 03_extract_structured_data.py
141+
│ └── explorations/
142+
│ └── ai_parse_document -- debug output.py
143+
└── README.md
144+
```
145+
146+
## Resources
147+
148+
- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/)
149+
- [Databricks Workflows](https://docs.databricks.com/workflows/)
150+
- [Structured Streaming](https://docs.databricks.com/structured-streaming/)
151+
- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
152+
- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# This is a Databricks asset bundle definition for ai_parse_document_workflow.
2+
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
3+
bundle:
4+
name: ai_parse_document_workflow
5+
6+
variables:
7+
catalog:
8+
description: The catalog name for the workflow
9+
default: main
10+
schema:
11+
description: The schema name for the workflow
12+
default: default
13+
source_volume_path:
14+
description: Source volume path for PDF files
15+
default: /Volumes/main/default/source_documents
16+
output_volume_path:
17+
description: Output volume path for processed images
18+
default: /Volumes/main/default/parsed_output
19+
checkpoint_base_path:
20+
description: Base path for Structured Streaming checkpoints
21+
default: /tmp/checkpoints/ai_parse_workflow
22+
raw_table_name:
23+
description: Table name for raw parsed documents
24+
default: parsed_documents_raw
25+
text_table_name:
26+
description: Table name for extracted text
27+
default: parsed_documents_text
28+
structured_table_name:
29+
description: Table name for structured data
30+
default: parsed_documents_structured
31+
32+
include:
33+
- resources/*.yml
34+
35+
targets:
36+
dev:
37+
# The default target uses 'mode: development' to create a development copy.
38+
# - Deployed resources get prefixed with '[dev my_user_name]'
39+
# - Any job schedules and triggers are paused by default.
40+
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
41+
mode: development
42+
default: true
43+
workspace:
44+
host: https://your-workspace.cloud.databricks.com
45+
46+
prod:
47+
mode: production
48+
workspace:
49+
host: https://your-workspace.cloud.databricks.com
50+
permissions:
51+
- group_name: users
52+
level: CAN_VIEW
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
resources:
2+
jobs:
3+
ai_parse_document_workflow:
4+
name: ai_parse_document_workflow
5+
6+
environments:
7+
- environment_key: serverless_env
8+
spec:
9+
client: "3"
10+
11+
tasks:
12+
- task_key: parse_documents
13+
environment_key: serverless_env
14+
notebook_task:
15+
notebook_path: ../src/transformations/01_parse_documents.py
16+
base_parameters:
17+
catalog: ${var.catalog}
18+
schema: ${var.schema}
19+
source_volume_path: ${var.source_volume_path}
20+
output_volume_path: ${var.output_volume_path}
21+
checkpoint_location: ${var.checkpoint_base_path}/01_parse_documents
22+
table_name: ${var.raw_table_name}
23+
24+
- task_key: extract_text
25+
depends_on:
26+
- task_key: parse_documents
27+
environment_key: serverless_env
28+
notebook_task:
29+
notebook_path: ../src/transformations/02_extract_text.py
30+
base_parameters:
31+
catalog: ${var.catalog}
32+
schema: ${var.schema}
33+
checkpoint_location: ${var.checkpoint_base_path}/02_extract_text
34+
source_table_name: ${var.raw_table_name}
35+
table_name: ${var.text_table_name}
36+
37+
- task_key: extract_structured_data
38+
depends_on:
39+
- task_key: extract_text
40+
environment_key: serverless_env
41+
notebook_task:
42+
notebook_path: ../src/transformations/03_extract_structured_data.py
43+
base_parameters:
44+
catalog: ${var.catalog}
45+
schema: ${var.schema}
46+
checkpoint_location: ${var.checkpoint_base_path}/03_extract_structured_data
47+
source_table_name: ${var.text_table_name}
48+
table_name: ${var.structured_table_name}
49+
50+
max_concurrent_runs: 1
51+
52+
# Optional: Add a schedule
53+
# schedule:
54+
# quartz_cron_expression: "0 0 * * * ?"
55+
# timezone_id: "UTC"

0 commit comments

Comments
 (0)