diff --git a/knowledge_base/pipeline_with_ai_parse_document/.gitignore b/knowledge_base/pipeline_with_ai_parse_document/.gitignore new file mode 100644 index 0000000..0707725 --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/.gitignore @@ -0,0 +1,28 @@ +# Databricks +.databricks/ + +# Python +build/ +dist/ +__pycache__/ +*.egg-info +.venv/ +*.py[cod] + +# Local configuration (keep your settings private) +databricks.local.yml + +# IDE +.idea/ +.vscode/ +.DS_Store + +# Scratch/temporary files +scratch/** +!scratch/README.md + +# Test documents (don't commit large PDFs) +*.pdf +*.png +*.jpg +*.jpeg diff --git a/knowledge_base/pipeline_with_ai_parse_document/README.md b/knowledge_base/pipeline_with_ai_parse_document/README.md new file mode 100644 index 0000000..d49df05 --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/README.md @@ -0,0 +1,184 @@ +# AI Document Processing Pipeline with Lakeflow + +A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Lakeflow Declarative Pipelines. + +## Overview + +This example shows how to build an incremental streaming pipeline that: +1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document) +2. **Extracts** clean text with incremental processing +3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs + +All stages use [Lakeflow Declarative Pipelines](https://docs.databricks.com/aws/en/dlt/concepts) streaming tables for efficient incremental processing. + +## Architecture + +``` +Source Documents (UC Volume) + ↓ + ai_parse_document → parsed_documents_raw (variant) + ↓ + text extraction → parsed_documents_text (string) + ↓ + ai_query → parsed_documents_structured (json) +``` + +### Key Features + +- **Incremental processing**: Only new files are processed using Lakeflow streaming tables +- **Error handling**: Gracefully handles parsing failures +- **Visual debugging**: Interactive notebook for inspecting results + +## Prerequisites + +- Databricks workspace with Unity Catalog +- Databricks CLI v0.218.0+ +- Unity Catalog volumes for: + - Source documents (PDFs/images) + - Parsed output images +- AI functions (`ai_parse_document`, `ai_query`) + +## Quick Start + +1. **Install and authenticate** + ```bash + databricks auth login --host https://your-workspace.cloud.databricks.com + ``` + +2. **Configure** `databricks.yml` with your workspace settings + +3. **Validate** the bundle configuration + ```bash + databricks bundle validate + ``` + +4. **Deploy** + ```bash + databricks bundle deploy + ``` + +5. **Upload documents** to your source volume + +6. **Run pipeline** from the Databricks UI (Lakeflow Pipelines) + +## Configuration + +Edit `databricks.yml`: + +```yaml +variables: + catalog: main # Your catalog + schema: default # Your schema + source_volume_path: /Volumes/main/default/source_docs # Source PDFs + output_volume_path: /Volumes/main/default/parsed_out # Parsed images +``` + +## Pipeline Stages + +### Stage 1: Document Parsing +**File**: `src/transformations/ai_parse_document_variant.sql` + +Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images: +- Reads files from volume using `READ_FILES` +- Stores variant output with bounding boxes +- Incremental: processes only new files + +### Stage 2: Text Extraction +**File**: `src/transformations/ai_parse_document_text.sql` + +Extracts clean concatenated text using `transform()`: +- Handles both parser v1.0 and v1.1 formats +- Uses `transform()` for incremental append (no aggregations) +- Includes error handling for failed parses + +### Stage 3: AI Query Extraction +**File**: `src/transformations/ai_query_extraction.sql` + +Applies LLM to extract structured insights: +- Uses `ai_query` with Claude Sonnet 4 +- Customizable prompt for domain-specific extraction +- Outputs structured JSON + +## Visual Debugger + +The included notebook visualizes parsing results with interactive bounding boxes. + +**Open**: `src/explorations/ai_parse_document -- debug output.py` + +**Configure widgets**: +- `input_file`: `/Volumes/main/default/source_docs/sample.pdf` +- `image_output_path`: `/Volumes/main/default/parsed_out/` +- `page_selection`: `all` (or `1-3`, `1,5,10`) + +**Features**: +- Color-coded bounding boxes by element type +- Hover tooltips showing extracted content +- Automatic image scaling +- Page selection support + +## Customization + +**Change source file patterns**: +```sql +-- In ai_parse_document_variant.sql +FROM STREAM READ_FILES( + '/Volumes/main/default/source_docs/*.{pdf,jpg,png}', + format => 'binaryFile' +) +``` + +**Customize AI extraction**: +```sql +-- In ai_query_extraction.sql +ai_query( + 'databricks-claude-sonnet-4', + concat('Extract key dates and amounts from: ', text), + returnType => 'STRING', + modelParameters => named_struct('max_tokens', 2000, 'temperature', 0.1) +) +``` + +## Project Structure + +``` +. +├── databricks.yml # Bundle configuration +├── resources/ +│ └── ai_parse_document_pipeline.pipeline.yml +├── src/ +│ ├── transformations/ +│ │ ├── ai_parse_document_variant.sql +│ │ ├── ai_parse_document_text.sql +│ │ └── ai_query_extraction.sql +│ └── explorations/ +│ └── ai_parse_document -- debug output.py +└── README.md +``` + +## Key Concepts + +### Incremental Processing with Lakeflow +Streaming tables process only new data: +```sql +CREATE OR REFRESH STREAMING TABLE table_name AS ( + SELECT * FROM STREAM(source_table) +) +``` + +### Avoiding Complete Mode +Using `transform()` instead of aggregations enables incremental append: +```sql +-- Good: Incremental append +transform(array, element -> element.content) + +-- Avoid: Forces complete mode +collect_list(...) GROUP BY ... +``` + +## Resources + +- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/) +- [Lakeflow Declarative Pipelines](https://docs.databricks.com/aws/en/dlt/concepts) +- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document) +- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) + diff --git a/knowledge_base/pipeline_with_ai_parse_document/databricks.yml b/knowledge_base/pipeline_with_ai_parse_document/databricks.yml new file mode 100644 index 0000000..93d87ab --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/databricks.yml @@ -0,0 +1,40 @@ +# This is a Databricks asset bundle definition for ai_parse_document_pipeline. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: ai_parse_document_pipeline + +variables: + catalog: + description: The catalog name for the pipeline + default: main + schema: + description: The schema name for the pipeline + default: default + source_volume_path: + description: Source volume path for PDF files + default: /Volumes/main/default/source_documents + output_volume_path: + description: Output volume path for processed images + default: /Volumes/main/default/parsed_output + +include: + - resources/*.yml + +targets: + dev: + # The default target uses 'mode: development' to create a development copy. + # - Deployed resources get prefixed with '[dev my_user_name]' + # - Any job schedules and triggers are paused by default. + # See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html. + mode: development + default: true + workspace: + host: https://your-workspace.cloud.databricks.com + + prod: + mode: production + workspace: + host: https://your-workspace.cloud.databricks.com + permissions: + - group_name: users + level: CAN_VIEW diff --git a/knowledge_base/pipeline_with_ai_parse_document/resources/ai_parse_document_pipeline.pipeline.yml b/knowledge_base/pipeline_with_ai_parse_document/resources/ai_parse_document_pipeline.pipeline.yml new file mode 100644 index 0000000..5a7d3bd --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/resources/ai_parse_document_pipeline.pipeline.yml @@ -0,0 +1,16 @@ +resources: + pipelines: + ai_parse_document_pipeline: + name: ai_parse_document_pipeline + libraries: + - file: + path: ../src/transformations/ai_parse_document_variant.sql + - file: + path: ../src/transformations/ai_parse_document_text.sql + - file: + path: ../src/transformations/ai_query_extraction.sql + catalog: ${var.catalog} + channel: CURRENT + photon: true + schema: ${var.schema} + serverless: true diff --git a/knowledge_base/pipeline_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py b/knowledge_base/pipeline_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py new file mode 100644 index 0000000..2f39afa --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py @@ -0,0 +1,782 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # 🔍 AI Parse Document Debug Interface +# MAGIC +# MAGIC Version 1.3 +# MAGIC +# MAGIC Last update: Oct 6, 2025 +# MAGIC +# MAGIC Changelog: +# MAGIC - Simplified widget parameters: `input_file` and `image_output_path` now accept full volume paths +# MAGIC - Removed separate `catalog`, `schema`, `volume` widgets +# MAGIC - `input_file` supports wildcards for processing multiple files (e.g., `/Volumes/catalog/schema/volume/input/*`) +# MAGIC +# MAGIC ## Overview +# MAGIC This notebook provides a **visual debugging interface** for analyzing the output of Databricks' `ai_parse_document` function. It renders parsed documents with interactive bounding box overlays, allowing you to inspect what content was extracted from each region of your documents. +# MAGIC +# MAGIC ## Features +# MAGIC - 📊 **Visual Bounding Boxes**: Color-coded overlays showing the exact regions where text/elements were detected +# MAGIC - 🎯 **Interactive Tooltips**: Hover over any bounding box to see the parsed content from that region +# MAGIC - 📐 **Automatic Scaling**: Large documents are automatically scaled to fit within 1024px width for optimal viewing +# MAGIC - 🎨 **Element Type Visualization**: Different colors for different element types (text, headers, tables, figures, etc.) +# MAGIC +# MAGIC ## Required Parameters +# MAGIC +# MAGIC This interface requires widget parameters to be configured before running: +# MAGIC +# MAGIC ### 1. `input_file` +# MAGIC - **Description**: Full Unity Catalog volume path to the document(s) you want to parse and visualize +# MAGIC - **Examples**: +# MAGIC - Single file: `/Volumes/catalog/schema/volume/input/document.pdf` +# MAGIC - All files in directory: `/Volumes/catalog/schema/volume/input/*` +# MAGIC - Pattern matching: `/Volumes/catalog/schema/volume/input/*.pdf` +# MAGIC - **Requirements**: Read access to the volume containing your PDF/image files +# MAGIC +# MAGIC ### 2. `image_output_path` +# MAGIC - **Description**: Full Unity Catalog volume path where `ai_parse_document` will store the extracted page images +# MAGIC - **Example**: `/Volumes/catalog/schema/volume/output/` +# MAGIC - **Requirements**: Write access required for storing intermediate image outputs +# MAGIC - **Note**: As documented in the [official Databricks documentation](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document), this path is used by the parsing function to store page images that are referenced in the output +# MAGIC +# MAGIC ### 3. `page_selection` +# MAGIC - **Description**: Specifies which pages to display in the visualization +# MAGIC - **Supported formats**: +# MAGIC - `"all"` or leave empty: Display all pages +# MAGIC - `"3"`: Display only page 3 (1-indexed) +# MAGIC - `"1-5"`: Display pages 1 through 5 (inclusive, 1-indexed) +# MAGIC - `"1,3,5"`: Display specific pages (1-indexed) +# MAGIC - `"1-3,7,10-12"`: Mixed ranges and individual pages +# MAGIC +# MAGIC ## Usage Instructions +# MAGIC +# MAGIC 1. **Clone this notebook** to your workspace: +# MAGIC - Select **"File -> Clone"** button in the top toolbar +# MAGIC - Choose your desired location in your workspace +# MAGIC - This ensures you have a personal copy you can modify and run +# MAGIC +# MAGIC 2. **Prepare your Unity Catalog volumes**: +# MAGIC - Create or identify a volume for your PDF/image files +# MAGIC - Create or identify a volume for output images +# MAGIC - Upload your PDF files to the input location +# MAGIC +# MAGIC 3. **Configure the widget parameters** at the top of this notebook: +# MAGIC - Set `input_file` to the full volume path (file or directory with wildcard) +# MAGIC - Set `image_output_path` to the full volume path for outputs +# MAGIC - Set `page_selection` to control which pages to visualize +# MAGIC +# MAGIC 4. **Run all code cells** which will generate visual debugging results. +# MAGIC +# MAGIC ## What You'll See +# MAGIC +# MAGIC - **Document Summary**: Overview of pages, element counts, and document metadata +# MAGIC - **Color Legend**: Visual guide showing which colors represent which element types +# MAGIC - **Annotated Images**: Each page with overlaid bounding boxes +# MAGIC - Hover over any box to see the extracted content +# MAGIC - Yellow highlight indicates the currently hovered element +# MAGIC - **Parsed Elements List**: Complete list of all extracted elements with their content + +# COMMAND ---------- + +# Exec Parameters + +dbutils.widgets.text("input_file", "/Volumes/main/default/source_documents/sample.pdf") +dbutils.widgets.text("image_output_path", "/Volumes/main/default/parsed_output/") +dbutils.widgets.text("page_selection", "all") + +input_file = dbutils.widgets.get("input_file") +image_output_path = dbutils.widgets.get("image_output_path") +page_selection = dbutils.widgets.get("page_selection") + +# COMMAND ---------- + +# DBTITLE 1,Configuration Parameters +# Path configuration - use widget values as-is + +source_files = input_file + +# Parse page selection string and return list of page indices to display. +# +# Supported formats: +# - "all" or None: Display all pages +# - "3": Display specific page (1-indexed) +# - "1-5": Display page range (inclusive, 1-indexed) +# - "1,3,5": Display list of specific pages (1-indexed) +# - "1-3,7,10-12": Mixed ranges and individual pages +page_selection = f"{page_selection}" + +# COMMAND ---------- + +# DBTITLE 1,Run Document Parse Code (may take some time) +# SQL statement with ai_parse_document() +# Note: input_file can be a single file path or a directory path with wildcard +sql = f''' +with parsed_documents AS ( + SELECT + path, + ai_parse_document(content + , + map( + 'version', '2.0', + 'imageOutputPath', '{image_output_path}', + 'descriptionElementTypes', '*' + ) + ) as parsed + FROM + read_files('{source_files}', format => 'binaryFile') +) +select * from parsed_documents +''' + +parsed_results = [row.parsed for row in spark.sql(sql).collect()] + +# COMMAND ---------- + +import json +from typing import Dict, List, Any, Optional, Tuple, Set, Union +from IPython.display import HTML, display +import base64 +import os +from PIL import Image +import io + +class DocumentRenderer: + def __init__(self): + # Color mapping for different element types + self.element_colors = { + 'section_header': '#FF6B6B', + 'text': '#4ECDC4', + 'figure': '#45B7D1', + 'caption': '#96CEB4', + 'page_footer': '#FFEAA7', + 'page_header': '#DDA0DD', + 'table': '#98D8C8', + 'list': '#F7DC6F', + 'default': '#BDC3C7' + } + + def _parse_page_selection(self, page_selection: Union[str, None], total_pages: int) -> Set[int]: + """Parse page selection string and return set of page indices (0-based). + + Args: + page_selection: Selection string or None + total_pages: Total number of pages available + + Returns: + Set of 0-based page indices to display + """ + # Handle None or "all" - return all pages + if page_selection is None or page_selection.lower() == "all": + return set(range(total_pages)) + + selected_pages = set() + + # Clean the input + page_selection = page_selection.strip() + + # Split by commas for multiple selections + parts = page_selection.split(',') + + for part in parts: + part = part.strip() + + # Check if it's a range (contains hyphen) + if '-' in part: + try: + # Split range and convert to integers + range_parts = part.split('-') + if len(range_parts) == 2: + start = int(range_parts[0].strip()) + end = int(range_parts[1].strip()) + + # Convert from 1-indexed to 0-indexed + start_idx = start - 1 + end_idx = end - 1 + + # Add all pages in range (inclusive) + for i in range(start_idx, end_idx + 1): + if 0 <= i < total_pages: + selected_pages.add(i) + except ValueError: + print(f"Warning: Invalid range '{part}' in page selection") + else: + # Single page number + try: + page_num = int(part.strip()) + # Convert from 1-indexed to 0-indexed + page_idx = page_num - 1 + if 0 <= page_idx < total_pages: + selected_pages.add(page_idx) + else: + print(f"Warning: Page {page_num} is out of range (1-{total_pages})") + except ValueError: + print(f"Warning: Invalid page number '{part}' in page selection") + + # If no valid pages were selected, default to all pages + if not selected_pages: + print(f"Warning: No valid pages in selection '{page_selection}'. Showing all pages.") + return set(range(total_pages)) + + return selected_pages + + def _get_element_color(self, element_type: str) -> str: + """Get color for element type.""" + return self.element_colors.get(element_type.lower(), self.element_colors['default']) + + def _get_image_dimensions(self, image_path: str) -> Optional[Tuple[int, int]]: + """Get dimensions of an image file.""" + try: + if os.path.exists(image_path): + with Image.open(image_path) as img: + return img.size # Returns (width, height) + return None + except Exception as e: + print(f"Error getting image dimensions for {image_path}: {e}") + return None + + def _load_image_as_base64(self, image_path: str) -> Optional[str]: + """Load image from file path and convert to base64.""" + try: + if os.path.exists(image_path): + with open(image_path, 'rb') as img_file: + img_data = img_file.read() + img_base64 = base64.b64encode(img_data).decode('utf-8') + ext = os.path.splitext(image_path)[1].lower() + if ext in ['.jpg', '.jpeg']: + return f"data:image/jpeg;base64,{img_base64}" + elif ext in ['.png']: + return f"data:image/png;base64,{img_base64}" + else: + return f"data:image/jpeg;base64,{img_base64}" + return None + except Exception as e: + print(f"Error loading image {image_path}: {e}") + return None + + def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> str: + """Render element content with appropriate formatting for both tooltip and element list display. + + Args: + element: The element dictionary containing content/description + for_tooltip: Whether this is for tooltip display (affects styling and truncation) + """ + element_type = element.get('type', 'unknown') + content = element.get('content', '') + description = element.get('description', '') + + display_content = "" + + if content: + if element_type == 'table': + # Render the HTML table with styling + table_html = content + + # Apply different styling based on context + if for_tooltip: + # Compact styling for tooltips with light theme + # Use full width available for tooltip tables + table_style = f'''style="width: 100%; border-collapse: collapse; margin: 5px 0; font-size: 10px;"''' + th_style = 'style="border: 1px solid #ddd; padding: 4px; background: #f8f9fa; color: #333; font-weight: bold; text-align: left; font-size: 10px;"' + td_style = 'style="border: 1px solid #ddd; padding: 4px; color: #333; font-size: 10px;"' + thead_style = 'style="background: #e9ecef;"' + else: + # Full styling for element list + table_style = '''style="width: 100%; border-collapse: collapse; margin: 10px 0; font-size: 13px;"''' + th_style = 'style="border: 1px solid #ddd; padding: 8px; background: #f5f5f5; font-weight: bold; text-align: left;"' + td_style = 'style="border: 1px solid #ddd; padding: 8px;"' + thead_style = 'style="background: #f0f0f0;"' + + # Apply styling transformations + if '' in table_html: + table_html = table_html.replace('
', f'
') + if '' in table_html: + table_html = table_html.replace('', f'') + + if for_tooltip: + display_content = table_html + else: + display_content = f"
{table_html}
" + else: + # Regular content handling + if for_tooltip and len(content) > 500: + # Truncate for tooltip display and escape HTML for safety + display_content = self._escape_for_html_attribute(content[:500] + "...") + else: + display_content = self._escape_for_html_attribute(content) if for_tooltip else content + elif description: + desc_content = description + if for_tooltip and len(desc_content) > 500: + desc_content = desc_content[:500] + "..." + + if for_tooltip: + display_content = self._escape_for_html_attribute(f"Description: {desc_content}") + else: + display_content = f"Description: {desc_content}" + else: + display_content = "No content available" if for_tooltip else "No content" + + return display_content + + def _escape_for_html_attribute(self, text: str) -> str: + """Escape text for safe use in HTML attributes.""" + return (text.replace('&', '&') + .replace('<', '<') + .replace('>', '>') + .replace('"', '"') + .replace("'", ''') + .replace('\n', '
')) + + def _calculate_tooltip_width(self, element: Dict, image_width: int) -> int: + """Calculate dynamic tooltip width based on table content.""" + element_type = element.get('type', 'unknown') + content = element.get('content', '') + + if element_type == 'table' and content: + # Count columns by looking for ', content, re.DOTALL | re.IGNORECASE) + if first_row_match: + first_row = first_row_match.group(1) + # Count th or td tags + th_count = len(re.findall(r']*>', first_row, re.IGNORECASE)) + td_count = len(re.findall(r']*>', first_row, re.IGNORECASE)) + column_count = max(th_count, td_count) + + if column_count > 0: + # Base width + additional width per column + base_width = 300 + width_per_column = 80 + calculated_width = base_width + (column_count * width_per_column) + + # Cap at 4/5th of image width + max_width = int(image_width * 0.8) + return min(calculated_width, max_width) + + # Default width for non-tables or when calculation fails + return 400 + + def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: + """Create annotated image with SCALING to fit within 1024px width.""" + image_uri = page.get('image_uri', '') + page_id = page.get('id', 0) + + if not image_uri: + return "

No image URI found for this page

" + + # Load image + img_data_uri = self._load_image_as_base64(image_uri) + if not img_data_uri: + return f""" +
+ Could not load image: {image_uri}
+ Make sure the file exists and is accessible. +
+ """ + + # Get original image dimensions + original_dimensions = self._get_image_dimensions(image_uri) + if not original_dimensions: + # Fallback: display without explicit scaling + original_width, original_height = 1024, 768 # Default fallback + else: + original_width, original_height = original_dimensions + + # Calculate scaling factor to fit within 1024px width + max_display_width = 1024 + scale_factor = 1.0 + display_width = original_width + display_height = original_height + + if original_width > max_display_width: + scale_factor = max_display_width / original_width + display_width = max_display_width + display_height = int(original_height * scale_factor) + + # Filter elements for this page and collect their bounding boxes + page_elements = [] + + for elem in elements: + elem_bboxes = [] + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) == page_id: + coord = bbox.get('coord', []) + if len(coord) >= 4: + elem_bboxes.append(bbox) + + if elem_bboxes: + page_elements.append({ + 'element': elem, + 'bboxes': elem_bboxes + }) + + if not page_elements: + return f"

No elements found for page {page_id}

" + + header_info = f""" +
+ Page {page_id + 1}: {len(page_elements)} elements
+ Original size: {original_width}×{original_height}px | + Display size: {display_width}×{display_height}px | + Scale factor: {scale_factor:.3f}
+
+ """ + + # Generate unique container ID for this page + container_id = f"page_container_{page_id}_{id(self)}" + + # Create bounding box overlays using SCALED coordinates with hover functionality + overlays = [] + + for idx, item in enumerate(page_elements): + element = item['element'] + element_id = element.get('id', 'N/A') + element_type = element.get('type', 'unknown') + color = self._get_element_color(element_type) + + # Use the shared content renderer for tooltip + tooltip_content = self._render_element_content(element, for_tooltip=True) + + # Calculate dynamic tooltip width + tooltip_width = self._calculate_tooltip_width(element, display_width) + + # Tables should render as HTML, other content should be escaped + + for bbox_idx, bbox in enumerate(item['bboxes']): + coord = bbox.get('coord', []) + if len(coord) >= 4: + x1, y1, x2, y2 = coord + + # Apply scaling to coordinates + scaled_x1 = x1 * scale_factor + scaled_y1 = y1 * scale_factor + scaled_x2 = x2 * scale_factor + scaled_y2 = y2 * scale_factor + + width = scaled_x2 - scaled_x1 + height = scaled_y2 - scaled_y1 + + # Skip invalid boxes + if width <= 0 or height <= 0: + continue + + # Position label above box when possible + label_top = -18 if scaled_y1 >= 18 else 2 + + # Unique ID for this bounding box + box_id = f"bbox_{page_id}_{idx}_{bbox_idx}" + + # Calculate tooltip position (prefer right side, but switch to left if needed) + tooltip_left = 10 + + overlay = f""" +
+
+ {element_type.upper()[:6]}#{element_id} +
+ +
+
+ {element_type.upper()} #{element_id} +
+
+ {tooltip_content} +
+
+
+ """ + overlays.append(overlay) + + # Pure CSS hover functionality (works in Databricks) + styles = f""" + + """ + + return f""" + {header_info} + {styles} +
+ Page {page_id + 1} + {''.join(overlays)} +
+ """ + + def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str: + """Create a detailed list of elements for a specific page.""" + # Filter elements for this page + page_elements = [] + + for elem in elements: + elem_bboxes = [] + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) == page_id: + elem_bboxes.append(bbox) + + if elem_bboxes: + page_elements.append(elem) + + if not page_elements: + return f"

No elements found for page {page_id + 1}

" + + html_parts = [] + + for element in page_elements: + element_id = element.get('id', 'N/A') + element_type = element.get('type', 'unknown') + color = self._get_element_color(element_type) + + # Get bounding box info for this page only + bbox_info = "No bbox" + bbox_list = element.get('bbox', []) + if bbox_list: + bbox_details = [] + for bbox in bbox_list: + if bbox.get('page_id', 0) == page_id: + coord = bbox.get('coord', []) + if len(coord) >= 4: + bbox_details.append(f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]") + bbox_info = "; ".join(bbox_details) if bbox_details else "Invalid bbox" + + # Use the shared content renderer for element list display + display_content = self._render_element_content(element, for_tooltip=False) + + element_html = f""" +
+
+

+ {element_type.upper().replace('_', ' ')} (ID: {element_id}) +

+ + {bbox_info} + +
+
+ {display_content} +
+
+ """ + html_parts.append(element_html) + + return f""" +
+

📋 Page {page_id + 1} Elements ({len(page_elements)} items)

+ {''.join(html_parts)} +
+ """ + + def _create_summary(self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int) -> str: + """Create a summary with page selection info.""" + elements = document.get('elements', []) + + # Count elements only on selected pages + selected_elements = [] + for elem in elements: + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) in selected_pages: + selected_elements.append(elem) + break + + # Count by type (for selected pages) + type_counts = {} + for elem in selected_elements: + elem_type = elem.get('type', 'unknown') + type_counts[elem_type] = type_counts.get(elem_type, 0) + 1 + + type_list = ', '.join([f"{t}: {c}" for t, c in type_counts.items()]) + + # Create page selection info + if len(selected_pages) == total_pages: + page_info = f"All {total_pages} pages" + else: + # Convert to 1-indexed for display + page_nums = sorted([p + 1 for p in selected_pages]) + if len(page_nums) <= 10: + page_info = f"Pages {', '.join(map(str, page_nums))} ({len(selected_pages)} of {total_pages})" + else: + page_info = f"{len(selected_pages)} of {total_pages} pages selected" + + return f""" +
+

📄 Document Summary

+

Displaying: {page_info}

+

Elements on selected pages: {len(selected_elements)}

+

Element Types: {type_list if type_list else 'None'}

+

Document ID: {str(metadata.get('id', 'N/A'))[:12]}...

+
+ """ + + def render_document(self, parsed_result: Any, page_selection: Union[str, None] = None) -> None: + """Main render function with page selection support. + + Args: + parsed_result: The parsed document result + page_selection: Page selection string. Supported formats: + - "all" or None: Display all pages + - "3": Display only page 3 (1-indexed) + - "1-5": Display pages 1 through 5 (inclusive) + - "1,3,5": Display specific pages + - "1-3,7,10-12": Mixed format + """ + try: + # Convert to dict + if hasattr(parsed_result, 'toPython'): + parsed_dict = parsed_result.toPython() + elif hasattr(parsed_result, 'toJson'): + parsed_dict = json.loads(parsed_result.toJson()) + elif isinstance(parsed_result, dict): + parsed_dict = parsed_result + else: + display(HTML(f"

❌ Could not convert result. Type: {type(parsed_result)}

")) + return + + # Extract components + document = parsed_dict.get('document', {}) + pages = document.get('pages', []) + elements = document.get('elements', []) + metadata = parsed_dict.get('metadata', {}) + + if not elements: + display(HTML("

❌ No elements found in document

")) + return + + # Parse page selection + selected_pages = self._parse_page_selection(page_selection, len(pages)) + + # Display title + display(HTML("

🔍 AI Parse Document Results

")) + + # Display summary with page selection info + summary_html = self._create_summary(document, metadata, selected_pages, len(pages)) + display(HTML(summary_html)) + + # Display color legend + legend_items = [] + for elem_type, color in self.element_colors.items(): + if elem_type != 'default': + legend_items.append(f""" + + + {elem_type.replace('_', ' ').title()} + + """) + + display(HTML(f""" +
+ 🎨 Element Colors:
+ {''.join(legend_items)} +
+ """)) + + # Display annotated images with their corresponding elements (filtered by selection) + if pages: + display(HTML("

🖼️ Annotated Images & Elements

")) + + # Sort selected pages for display + sorted_selected = sorted(selected_pages) + + for page_idx in sorted_selected: + if page_idx < len(pages): + page = pages[page_idx] + + # Display the annotated image + annotated_html = self._create_annotated_image(page, elements) + display(HTML(f"
{annotated_html}
")) + + # Display elements for this page immediately after the image + page_id = page.get('id', page_idx) + page_elements_html = self._create_page_elements_list(page_id, elements) + display(HTML(page_elements_html)) + + except Exception as e: + display(HTML(f"

❌ Error: {str(e)}

")) + import traceback + display(HTML(f"
{traceback.format_exc()}
")) + + +# Simple usage functions +def render_ai_parse_output(parsed_result, page_selection=None): + """Simple function to render ai_parse_document output with page selection. + + Args: + parsed_result: The parsed document result + page_selection: Optional page selection string. Examples: + - None or "all": Display all pages + - "3": Display only page 3 + - "1-5": Display pages 1 through 5 + - "1,3,5": Display specific pages + - "1-3,7,10-12": Mixed format + """ + renderer = DocumentRenderer() + renderer.render_document(parsed_result, page_selection) + +# COMMAND ---------- + +# DBTITLE 1,Debug Visualization Results +for parsed_result in parsed_results: + render_ai_parse_output(parsed_result, page_selection) \ No newline at end of file diff --git a/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_text.sql b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_text.sql new file mode 100644 index 0000000..299b120 --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_text.sql @@ -0,0 +1,35 @@ +CREATE OR REFRESH STREAMING TABLE parsed_documents_text +COMMENT 'Extracted text content from parsed documents with error handling' +AS ( + WITH error_documents AS ( + SELECT + path, + NULL AS text, + try_cast(parsed:error_status AS STRING) AS error_status, + parsed_at + FROM STREAM(parsed_documents_raw) + WHERE try_cast(parsed:error_status AS STRING) IS NOT NULL + ), + success_documents AS ( + SELECT + path, + concat_ws( + '\n\n', + transform( + CASE + WHEN try_cast(parsed:metadata:version AS STRING) = '1.0' + THEN try_cast(parsed:document:pages AS ARRAY) + ELSE try_cast(parsed:document:elements AS ARRAY) + END, + element -> try_cast(element:content AS STRING) + ) + ) AS text, + NULL AS error_status, + parsed_at + FROM STREAM(parsed_documents_raw) + WHERE try_cast(parsed:error_status AS STRING) IS NULL + ) + SELECT * FROM success_documents + UNION ALL + SELECT * FROM error_documents +); \ No newline at end of file diff --git a/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_variant.sql b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_variant.sql new file mode 100644 index 0000000..312eb48 --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_parse_document_variant.sql @@ -0,0 +1,31 @@ +CREATE STREAMING TABLE parsed_documents_raw +TBLPROPERTIES('delta.feature.variantType-preview' = 'supported') +COMMENT 'Raw parsed documents from ai_parse_document with variant output' +AS ( + WITH files AS ( + SELECT + path, + content + FROM STREAM READ_FILES( + '/Volumes/main/default/source_documents/*.{pdf,jpg,jpeg,png}', + format => 'binaryFile' + ) + ), + repartitioned_files AS ( + SELECT * + FROM files + DISTRIBUTE BY crc32(path) % 8 -- adjust partition count as needed + ) + SELECT + path, + ai_parse_document( + content, + map( + 'version', '2.0', + 'imageOutputPath', '/Volumes/main/default/parsed_output/', + 'descriptionElementTypes', '*' + ) + ) AS parsed, + current_timestamp() AS parsed_at + FROM repartitioned_files +) \ No newline at end of file diff --git a/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_query_extraction.sql b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_query_extraction.sql new file mode 100644 index 0000000..d33f330 --- /dev/null +++ b/knowledge_base/pipeline_with_ai_parse_document/src/transformations/ai_query_extraction.sql @@ -0,0 +1,27 @@ +CREATE OR REFRESH STREAMING TABLE parsed_documents_structured +COMMENT 'Structured data extracted from document text using AI' +AS ( + SELECT + path, + ai_query( + 'databricks-claude-sonnet-4', + concat( + 'Extract key information from this document and return as JSON. ', + 'Include: document_type, key_entities (names, organizations, locations), ', + 'dates, amounts, and a brief summary (max 100 words). ', + 'Document text: ', + text + ), + returnType => 'STRING', + modelParameters => named_struct( + 'max_tokens', 2000, + 'temperature', 0.1 + ) + ) AS extracted_json, + parsed_at, + current_timestamp() AS extraction_timestamp + FROM STREAM(parsed_documents_text) + WHERE text IS NOT NULL + AND error_status IS NULL + AND length(text) > 100 +);
' in table_html: + table_html = table_html.replace('', f'') + if '' in table_html: + table_html = table_html.replace('', f'') + if '
or tags in first row + import re + + # Find first row (either in thead or tbody) + first_row_match = re.search(r']*>(.*?)