diff --git a/examples/batch/parse/.gitignore b/examples/batch/parse/.gitignore new file mode 100644 index 00000000..ee119593 --- /dev/null +++ b/examples/batch/parse/.gitignore @@ -0,0 +1 @@ +sample_files/ diff --git a/examples/batch/parse/batch_parse_directory.ipynb b/examples/batch/parse/batch_parse_directory.ipynb new file mode 100644 index 00000000..d205dcd7 --- /dev/null +++ b/examples/batch/parse/batch_parse_directory.ipynb @@ -0,0 +1,555 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cell-0", + "metadata": {}, + "source": [ + "# Batch Parse with LlamaCloud Directories\n", + "\n", + "This notebook demonstrates how to use LlamaCloud's batch processing API to parse multiple files in a directory. The workflow includes:\n", + "\n", + "1. **Creating a Directory** - Set up a directory to organize your files\n", + "2. **Uploading Files** - Upload multiple files to the directory\n", + "3. **Starting a Batch Parse Job** - Kick off batch processing on all files\n", + "4. **Monitoring Progress** - Check the status and view results\n", + "\n", + "This is useful when you need to parse many documents at once, as the batch API handles the orchestration and provides progress tracking." + ] + }, + { + "cell_type": "markdown", + "id": "cell-1", + "metadata": {}, + "source": [ + "## Setup and Installation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-2", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install llama-cloud python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-3", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "\n", + "# Load environment variables\n", + "load_dotenv()\n", + "\n", + "# Set your API key\n", + "LLAMA_CLOUD_API_KEY = os.environ.get(\"LLAMA_CLOUD_API_KEY\", \"llx-...\")\n", + "\n", + "# Optional: Set project_id if you have one, otherwise it will use your default project\n", + "PROJECT_ID = os.environ.get(\"LLAMA_CLOUD_PROJECT_ID\", None)\n", + "\n", + "print(\"āœ… API key configured\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-4", + "metadata": {}, + "source": [ + "## Setup API Configuration\n", + "\n", + "We'll use the `requests` library to make HTTP calls to the LlamaCloud API." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-5", + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "\n", + "# API configuration\n", + "API_KEY = LLAMA_CLOUD_API_KEY\n", + "BASE_URL = \"https://api.cloud.llamaindex.ai\"\n", + "\n", + "# Set up headers for API requests\n", + "headers = {\n", + " \"Authorization\": f\"Bearer {API_KEY}\",\n", + " \"Content-Type\": \"application/json\",\n", + "}\n", + "\n", + "print(\"āœ… API configuration ready\")\n", + "print(f\" Base URL: {BASE_URL}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-6", + "metadata": {}, + "source": [ + "## Step 1: Create a Directory\n", + "\n", + "First, we'll create a directory to organize our files. Directories help you group related files together for batch processing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-7", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "\n", + "# Create a directory with a timestamp in the name\n", + "timestamp = datetime.now().strftime(\"%Y%m%d-%H%M%S\")\n", + "directory_name = f\"batch-parse-demo-{timestamp}\"\n", + "\n", + "# Create directory via API\n", + "response = requests.post(\n", + " f\"{BASE_URL}/api/v1/beta/directories\",\n", + " headers=headers,\n", + " params={\"project_id\": PROJECT_ID} if PROJECT_ID else {},\n", + " json={\n", + " \"name\": directory_name,\n", + " \"description\": \"Demo directory for batch parse example\",\n", + " },\n", + ")\n", + "response.raise_for_status()\n", + "directory = response.json()\n", + "\n", + "directory_id = directory[\"id\"]\n", + "project_id = directory[\"project_id\"]\n", + "\n", + "print(f\"āœ… Created directory: {directory['name']}\")\n", + "print(f\" Directory ID: {directory_id}\")\n", + "print(f\" Project ID: {project_id}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-8", + "metadata": {}, + "source": [ + "## Step 2: Upload Files to the Directory\n", + "\n", + "Now we'll upload some files to our directory. For this demo, we'll download some sample PDFs and upload them.\n", + "\n", + "You can replace these with your own files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-9", + "metadata": {}, + "outputs": [], + "source": [ + "# Create a directory for sample files\n", + "os.makedirs(\"sample_files\", exist_ok=True)\n", + "\n", + "# Sample documents to download\n", + "sample_docs = {\n", + " \"attention.pdf\": \"https://arxiv.org/pdf/1706.03762.pdf\",\n", + " \"bert.pdf\": \"https://arxiv.org/pdf/1810.04805.pdf\",\n", + "}\n", + "\n", + "# Download sample documents\n", + "for filename, url in sample_docs.items():\n", + " filepath = f\"sample_files/{filename}\"\n", + " if not os.path.exists(filepath):\n", + " print(f\"šŸ“„ Downloading {filename}...\")\n", + " response = requests.get(url)\n", + " if response.status_code == 200:\n", + " with open(filepath, \"wb\") as f:\n", + " f.write(response.content)\n", + " print(f\" āœ… Downloaded {filename}\")\n", + " else:\n", + " print(f\" āŒ Failed to download {filename}\")\n", + " else:\n", + " print(f\"šŸ“ {filename} already exists\")\n", + "\n", + "print(\"\\nāœ… Sample files ready!\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-10", + "metadata": {}, + "source": [ + "### Upload Files to Directory\n", + "\n", + "Now let's upload the files to our directory using the `upload_file_to_directory` endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-11", + "metadata": {}, + "outputs": [], + "source": [ + "uploaded_files = []\n", + "\n", + "for filename in os.listdir(\"sample_files\"):\n", + " if filename.endswith(\".pdf\"):\n", + " filepath = f\"sample_files/{filename}\"\n", + "\n", + " print(f\"šŸ“¤ Uploading {filename}...\")\n", + "\n", + " # Upload file via API\n", + " with open(filepath, \"rb\") as f:\n", + " files = {\"upload_file\": (filename, f, \"application/pdf\")}\n", + " upload_response = requests.post(\n", + " f\"{BASE_URL}/api/v1/beta/directories/{directory_id}/files/upload\",\n", + " headers={\n", + " \"Authorization\": f\"Bearer {API_KEY}\"\n", + " }, # Don't include Content-Type for multipart\n", + " params={\"project_id\": project_id},\n", + " files=files,\n", + " )\n", + " upload_response.raise_for_status()\n", + " directory_file = upload_response.json()\n", + "\n", + " uploaded_files.append(directory_file)\n", + " print(f\" āœ… Uploaded: {directory_file['display_name']}\")\n", + " print(f\" File ID: {directory_file['id']}\")\n", + "\n", + "print(f\"\\nāœ… Uploaded {len(uploaded_files)} files to directory\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-12", + "metadata": {}, + "source": [ + "## Step 3: Create a Batch Parse Job\n", + "\n", + "Now that we have files in our directory, let's create a batch parse job to process them all at once.\n", + "\n", + "The batch processing API uses the same configuration as LlamaParse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-13", + "metadata": {}, + "outputs": [], + "source": [ + "# Configure the parse job\n", + "# This configuration will apply to all files in the directory\n", + "job_config = {\n", + " \"job_name\": \"parse_raw_file_job\", # Must match the JobNames enum value\n", + " \"partitions\": {},\n", + " \"parameters\": {\n", + " \"type\": \"parse\",\n", + " \"lang\": \"en\",\n", + " \"fast_mode\": True,\n", + " },\n", + " \"project_id\": project_id,\n", + "}\n", + "\n", + "print(\"āœ… Job configuration created\")\n", + "print(f\" Language: {job_config['parameters']['lang']}\")\n", + "print(f\" Fast mode: {job_config['parameters']['fast_mode']}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-14", + "metadata": {}, + "source": [ + "### Submit the Batch Job\n", + "\n", + "Now let's submit the batch job to process all files in the directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-15", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"šŸš€ Submitting batch parse job for directory: {directory_id}\")\n", + "print(f\" Processing {len(uploaded_files)} files...\\n\")\n", + "\n", + "# Submit batch job via API\n", + "batch_response = requests.post(\n", + " f\"{BASE_URL}/api/v1/beta/batch-processing\",\n", + " headers=headers,\n", + " params={\"project_id\": project_id},\n", + " json={\n", + " \"directory_id\": directory_id,\n", + " \"job_config\": job_config,\n", + " \"page_size\": 100, # Number of files to fetch per batch\n", + " \"continue_as_new_threshold\": 10, # Workflow continuation threshold\n", + " },\n", + ")\n", + "batch_response.raise_for_status()\n", + "batch_job = batch_response.json()\n", + "\n", + "batch_job_id = batch_job[\"id\"]\n", + "\n", + "print(\"āœ… Batch job submitted successfully!\")\n", + "print(f\" Batch Job ID: {batch_job_id}\")\n", + "print(f\" Workflow ID: {batch_job['workflow_id']}\")\n", + "print(f\" Status: {batch_job['status']}\")\n", + "print(f\" Total Items: {batch_job['total_items']}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-16", + "metadata": {}, + "source": [ + "## Step 4: Monitor Job Progress\n", + "\n", + "Now let's monitor the batch job progress. We'll poll the status endpoint to see how the job is progressing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-17", + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "\n", + "def print_job_status(status_response):\n", + " \"\"\"Helper function to print job status in a readable format.\"\"\"\n", + " # The API returns {\"job\": {...}, \"progress_percentage\": ...}\n", + " job = status_response.get(\"job\", {})\n", + " progress_pct = status_response.get(\"progress_percentage\", 0.0)\n", + "\n", + " print(f\"\\n{'='*60}\")\n", + " print(f\"Job Status: {job.get('status', 'N/A')}\")\n", + " print(f\"{'='*60}\")\n", + " print(f\"Total Items: {job.get('total_items', 0)}\")\n", + " print(f\"Completed: {job.get('processed_items', 0)}\")\n", + " print(f\"Failed: {job.get('failed_items', 0)}\")\n", + " print(f\"Skipped: {job.get('skipped_items', 0)}\")\n", + " print(f\"Progress: {progress_pct:.1f}%\")\n", + "\n", + " if job.get(\"completed_at\"):\n", + " print(f\"Completed At: {job['completed_at']}\")\n", + " elif job.get(\"started_at\"):\n", + " print(f\"Started At: {job['started_at']}\")\n", + "\n", + " print(f\"{'='*60}\")\n", + "\n", + "\n", + "# Poll for status updates\n", + "print(\"šŸ”„ Monitoring batch job progress...\")\n", + "print(\n", + " \"Note: It may take a few seconds for the workflow to initialize and count files.\\n\"\n", + ")\n", + "\n", + "max_polls = 60 # Maximum number of status checks (increased for longer jobs)\n", + "poll_interval = 10 # Seconds between checks\n", + "\n", + "for i in range(max_polls):\n", + " status_response = requests.get(\n", + " f\"{BASE_URL}/api/v1/beta/batch-processing/{batch_job_id}\",\n", + " headers=headers,\n", + " params={\"project_id\": project_id},\n", + " )\n", + " status_response.raise_for_status()\n", + " status_data = status_response.json()\n", + "\n", + " print_job_status(status_data)\n", + "\n", + " # Check if job is complete\n", + " job = status_data.get(\"job\", {})\n", + " job_status = job.get(\"status\", \"UNKNOWN\")\n", + " if job_status in [\"completed\", \"failed\", \"cancelled\"]:\n", + " print(f\"\\nāœ… Job finished with status: {job_status}\")\n", + " break\n", + "\n", + " if i < max_polls - 1:\n", + " print(f\"\\nā³ Waiting {poll_interval} seconds before next check...\")\n", + " time.sleep(poll_interval)\n", + "else:\n", + " print(f\"\\nāš ļø Reached maximum polling attempts. Job may still be running.\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-18", + "metadata": {}, + "source": [ + "## Step 5: View Job Items\n", + "\n", + "Let's look at the individual items in the batch job to see which files were processed successfully." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-19", + "metadata": {}, + "outputs": [], + "source": [ + "# Get all items in the batch job\n", + "items_response = requests.get(\n", + " f\"{BASE_URL}/api/v1/beta/batch-processing/{batch_job_id}/items\",\n", + " headers=headers,\n", + " params={\"project_id\": project_id, \"limit\": 100},\n", + ")\n", + "items_response.raise_for_status()\n", + "items_data = items_response.json()\n", + "\n", + "print(f\"\\nšŸ“‹ Batch Job Items ({items_data['total_size']} total)\")\n", + "print(f\"{'='*80}\\n\")\n", + "\n", + "for item in items_data[\"items\"]:\n", + " status_emoji = (\n", + " \"āœ…\"\n", + " if item[\"status\"] == \"completed\"\n", + " else \"āŒ\"\n", + " if item[\"status\"] == \"failed\"\n", + " else \"ā³\"\n", + " )\n", + " print(f\"{status_emoji} {item['item_name']}\")\n", + " print(f\" Status: {item['status']}\")\n", + " print(f\" Item ID: {item['item_id']}\")\n", + "\n", + " if item.get(\"error_message\"):\n", + " print(f\" Error: {item['error_message']}\")\n", + "\n", + " print()" + ] + }, + { + "cell_type": "markdown", + "id": "cell-20", + "metadata": {}, + "source": [ + "## Step 6: Retrieve Processing Results\n", + "\n", + "For each completed file, we can retrieve the processing results to see where the parsed output is stored." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-21", + "metadata": {}, + "outputs": [], + "source": [ + "# Get processing results for a specific item\n", + "if items_data[\"items\"]:\n", + " first_item = items_data[\"items\"][0]\n", + "\n", + " print(f\"\\nšŸ” Processing results for: {first_item['item_name']}\")\n", + " print(f\"{'='*80}\\n\")\n", + "\n", + " results_response = requests.get(\n", + " f\"{BASE_URL}/api/v1/beta/batch-processing/items/{first_item['item_id']}/processing-results\",\n", + " headers=headers,\n", + " params={\"project_id\": project_id},\n", + " )\n", + " results_response.raise_for_status()\n", + " results = results_response.json()\n", + "\n", + " print(f\"Item: {results['item_name']}\")\n", + " print(f\"Total processing runs: {len(results['processing_results'])}\\n\")\n", + "\n", + " for i, result in enumerate(results[\"processing_results\"], 1):\n", + " print(f\"Run {i}:\")\n", + " print(f\" Job Type: {result['job_type']}\")\n", + " print(f\" Processed At: {result['processed_at']}\")\n", + " print(f\" Parameters Hash: {result['parameters_hash']}\")\n", + "\n", + " if result.get(\"output_s3_path\"):\n", + " print(f\" Output S3 Path: {result['output_s3_path']}\")\n", + "\n", + " if result.get(\"output_metadata\"):\n", + " print(f\" Output Metadata: {result['output_metadata']}\")\n", + "\n", + " print()" + ] + }, + { + "cell_type": "markdown", + "id": "cell-22", + "metadata": {}, + "source": [ + "## Optional: List All Batch Jobs\n", + "\n", + "You can also list all batch jobs in your project to see the history of batch processing operations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-23", + "metadata": {}, + "outputs": [], + "source": [ + "# List all parse jobs in the project\n", + "jobs_response = requests.get(\n", + " f\"{BASE_URL}/api/v1/beta/batch-processing\",\n", + " headers=headers,\n", + " params={\n", + " \"project_id\": project_id,\n", + " \"job_type\": \"parse\", # Filter by job type\n", + " \"limit\": 10,\n", + " },\n", + ")\n", + "jobs_response.raise_for_status()\n", + "jobs_data = jobs_response.json()\n", + "\n", + "print(f\"\\nšŸ“Š Recent Batch Parse Jobs ({jobs_data['total_size']} total)\")\n", + "print(f\"{'='*80}\\n\")\n", + "\n", + "for job in jobs_data[\"items\"]:\n", + " status_emoji = (\n", + " \"āœ…\"\n", + " if job[\"status\"] == \"completed\"\n", + " else \"āŒ\"\n", + " if job[\"status\"] == \"failed\"\n", + " else \"ā³\"\n", + " )\n", + " print(f\"{status_emoji} Job ID: {job['id']}\")\n", + " print(f\" Status: {job['status']}\")\n", + " print(f\" Directory: {job['directory_id']}\")\n", + " print(f\" Total Items: {job['total_items']}\")\n", + " print(f\" Completed: {job['processed_items']}\")\n", + " print(f\" Created: {job['created_at']}\")\n", + " print()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}