@@ -384,6 +384,129 @@ deploymentSpec:
384384 \ for i in range(num_splits)]\n filled_splits = list(filter(None, all_splits))\n \
385385 \ return filled_splits\n\n "
386386 image : quay.io/aipcc/docling/cuda-ubi9
387+ exec-docling-chunk :
388+ container :
389+ args :
390+ - --executor_input
391+ - ' {{$}}'
392+ - --function_to_execute
393+ - docling_chunk
394+ command :
395+ - sh
396+ - -c
397+ - " \n if ! [ -x \" $(command -v pip)\" ]; then\n python3 -m ensurepip ||\
398+ \ python3 -m ensurepip --user || apt-get install python3-pip\n fi\n\n PIP_DISABLE_PIP_VERSION_CHECK=1\
399+ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\
400+ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\" 3.9\" ' && \" \
401+ $0\" \" $@\"\n "
402+ - sh
403+ - -ec
404+ - ' program_path=$(mktemp -d)
405+
406+
407+ printf "%s" "$0" > "$program_path/ephemeral_component.py"
408+
409+ _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
410+
411+ '
412+ - " \n import kfp\n from kfp import dsl\n from kfp.dsl import *\n from typing import\
413+ \ *\n\n def docling_chunk(\n input_path: dsl.Input[dsl.Artifact],\n \
414+ \ output_path: dsl.Output[dsl.Artifact],\n max_tokens: int = 512,\n \
415+ \ merge_peers: bool = True,\n ):\n \"\"\"\n Chunk Docling documents\
416+ \ using HybridChunker. Takes converted docling JSON files as input\n \
417+ \ and produces chunked JSONL files with semantic chunks suitable for RAG.\n \
418+ \n Output format is JSONL (one JSON object per line) for easy inspection\
419+ \ and streaming.\n\n Args:\n input_path: Path to the input directory\
420+ \ containing Docling JSON files\n output_path: Path to the output\
421+ \ directory for the chunked JSONL files\n max_tokens: Maximum number\
422+ \ of tokens per chunk\n merge_peers: Whether to merge smaller chunks\
423+ \ at the same level\n \"\"\"\n import json # pylint: disable=import-outside-toplevel\n \
424+ \ from datetime import datetime, timezone # pylint: disable=import-outside-toplevel\n \
425+ \ from pathlib import Path # pylint: disable=import-outside-toplevel\n \
426+ \n # HybridChunker = Docling's smart chunking class that combines:\n \
427+ \ # 1. Document structure awareness\n # 2. Token-based splitting\n \
428+ \ from docling.chunking import HybridChunker # pylint: disable=import-outside-toplevel\n \
429+ \ from docling_core.transforms.chunker.tokenizer.huggingface import (\n \
430+ \ HuggingFaceTokenizer,\n ) # pylint: disable=import-outside-toplevel\n \
431+ \ from docling_core.types import DoclingDocument # pylint: disable=import-outside-toplevel\n \
432+ \ from transformers import AutoTokenizer # pylint: disable=import-outside-toplevel\n \
433+ \n # Convert KFP artifact paths to Path objects\n input_path_p = Path(input_path.path)\n \
434+ \ output_path_p = Path(output_path.path)\n output_path_p.mkdir(parents=True,\
435+ \ exist_ok=True)\n\n # Initialize tokenizer for HybridChunker (new API)\n \
436+ \ # Using a lightweight sentence-transformer model for tokenization\n \
437+ \ EMBED_MODEL_ID = \" sentence-transformers/all-MiniLM-L6-v2\"\n try:\n \
438+ \ hf_tokenizer = AutoTokenizer.from_pretrained(\n EMBED_MODEL_ID,\n \
439+ \ resume_download=True,\n timeout=60,\n )\n \
440+ \ print(f\" docling-chunk: loaded tokenizer from {EMBED_MODEL_ID}\" \
441+ , flush=True)\n except Exception as e:\n print(f\" docling-chunk:\
442+ \ ERROR loading tokenizer: {e}\" , flush=True)\n raise RuntimeError(\n \
443+ \ f\" Failed to load tokenizer model {EMBED_MODEL_ID}. \"\n \
444+ \ \" Ensure network access to HuggingFace Hub or pre-download the\
445+ \ model.\"\n ) from e\n\n tokenizer = HuggingFaceTokenizer(\n \
446+ \ tokenizer=hf_tokenizer,\n max_tokens=max_tokens,\n )\n \
447+ \n # Initialize Hybrid chunker with user-specified parameters\n #\
448+ \ tokenizer: The tokenizer wrapper to use for counting tokens (includes\
449+ \ max_tokens)\n # merge_peers: if true, smaller adjacent chunks will\
450+ \ be merged together\n chunker = HybridChunker(\n tokenizer=tokenizer,\n \
451+ \ merge_peers=merge_peers,\n )\n\n # Find all JSON files in\
452+ \ the input directory\n json_files = list(input_path_p.glob(\" *.json\" \
453+ ))\n if not json_files:\n print(f\" docling-chunk: No JSON files\
454+ \ found in {input_path_p}\" , flush=True)\n return\n\n print(\n \
455+ \ f\" docling-chunk: processing {len(json_files)} files with max_tokens={max_tokens}\
456+ \ and merge_peers={merge_peers}\" ,\n flush=True,\n )\n\n #\
457+ \ Track processing results\n processed_count = 0\n skipped_files =\
458+ \ []\n\n # Process each file\n for json_file in json_files:\n \
459+ \ print(f\" docling-chunk: processing {json_file}\" , flush=True)\n\n \
460+ \ # Load and validate the JSON file\n try:\n with\
461+ \ open(json_file, \" r\" , encoding=\" utf-8\" ) as f:\n doc_data\
462+ \ = json.load(f)\n except json.JSONDecodeError as e:\n \
463+ \ print(\n f\" docling-chunk: skipping {json_file.name} -\
464+ \ invalid JSON: {e}\" ,\n flush=True,\n )\n \
465+ \ skipped_files.append((json_file.name, f\" invalid JSON: {e}\" ))\n \
466+ \ continue\n\n # Parse the JSON data into a DoclingDocument\
467+ \ object\n # This validates that the JSON conforms to the DoclingDocument\
468+ \ schema\n try:\n doc = DoclingDocument.model_validate(doc_data)\n \
469+ \ except Exception as e:\n # Catches pydantic.ValidationError\
470+ \ and any other validation issues\n print(\n f\" \
471+ docling-chunk: skipping {json_file.name} - not a valid DoclingDocument:\
472+ \ {e}\" ,\n flush=True,\n )\n skipped_files.append((json_file.name,\
473+ \ f\" validation failed: {e}\" ))\n continue\n\n # Chunk\
474+ \ the document using HybridChunker\n chunks = list(chunker.chunk(dl_doc=doc))\n \
475+ \n # Generate output filename: original_name_chunks.jsonl\n \
476+ \ output_filename = f\" {json_file.stem}_chunks.jsonl\"\n output_file\
477+ \ = output_path_p / output_filename\n\n # Get current timestamp in\
478+ \ ISO format\n timestamp = datetime.now(timezone.utc).isoformat()\n \
479+ \n # Chunking config (for reproducibility)\n chunking_config\
480+ \ = {\n \" max_tokens\" : max_tokens,\n \" merge_peers\" \
481+ : merge_peers,\n \" tokenizer_model\" : EMBED_MODEL_ID,\n \
482+ \ }\n\n # Write chunks as JSONL (one JSON object per line)\n \
483+ \ with open(output_file, \" w\" , encoding=\" utf-8\" ) as f:\n \
484+ \ for idx, chunk in enumerate(chunks):\n # Get contextualized\
485+ \ text for this chunk\n chunk_text = chunker.contextualize(chunk=chunk)\n \
486+ \n # Build the chunk object\n chunk_obj =\
487+ \ {\n \" timestamp\" : timestamp,\n \
488+ \ \" source_document\" : json_file.name,\n \" chunk_index\" \
489+ : idx,\n \" chunking_config\" : chunking_config,\n \
490+ \ \" text\" : chunk_text,\n }\n\n \
491+ \ # Write as a single line of JSON\n f.write(json.dumps(chunk_obj,\
492+ \ ensure_ascii=False) + \"\\ n\" )\n\n print(\n f\" docling-chunk:\
493+ \ saved {len(chunks)} chunks to {output_filename}\" ,\n flush=True,\n \
494+ \ )\n processed_count += 1\n\n # Report summary\n print(\n \
495+ \ f\" docling-chunk: done - processed {processed_count}/{len(json_files)}\
496+ \ files\" ,\n flush=True,\n )\n if skipped_files:\n print(\n \
497+ \ f\" docling-chunk: skipped {len(skipped_files)} invalid files:\" \
498+ ,\n flush=True,\n )\n for filename, reason in skipped_files:\n \
499+ \ print(f\" - {filename}: {reason}\" , flush=True)\n\n "
500+ image : quay.io/aipcc/docling/cuda-ubi9
501+ resources :
502+ cpuLimit : 2.0
503+ cpuRequest : 0.25
504+ memoryLimit : 2.0
505+ memoryRequest : 0.512
506+ resourceCpuLimit : ' 2'
507+ resourceCpuRequest : 250m
508+ resourceMemoryLimit : 2G
509+ resourceMemoryRequest : 512M
387510 exec-docling-convert-standard :
388511 container :
389512 args :
0 commit comments