Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 228 additions & 7 deletions docling_eval/campaign_tools/cvat_deliveries_to_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

import json
import logging
import mimetypes
import shutil
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
from typing import Dict, Iterable, List, Sequence, Set

import typer
from docling_core.types.io import DocumentStream

from docling_eval.datamodels.cvat_types import AnnotationOverview
from docling_eval.datamodels.dataset_record import DatasetRecord, FieldType
from docling_eval.dataset_builders.file_dataset_builder import FileDatasetBuilder
from docling_eval.utils.utils import get_binhash

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,6 +60,103 @@ def as_config(self, dataset_dir_name: str, split: str) -> ConfigEntry:
return ConfigEntry(name="default", split=split, path_pattern=pattern)


@dataclass(frozen=True)
class DocumentAsset:
path: Path
mime_type: str
doc_hash: str | None


def _find_overview_path(assets_root: Path) -> Path | None:
for candidate_name in ("cvat_annotation_overview.json", "cvat_overview.json"):
candidate_path = assets_root / candidate_name
if candidate_path.exists():
return candidate_path
return None


def _load_subset_assets(
subset_name: str,
datasets_root: Path,
assets_dirname: str,
*,
strict: bool,
) -> Dict[str, DocumentAsset]:
assets_root = datasets_root / subset_name / assets_dirname
if not assets_root.exists():
message = (
f"Assets root {assets_root} for subset {subset_name} does not exist; "
"provide a correct --datasets-root or omit it."
)
if strict:
raise FileNotFoundError(message)
_LOGGER.warning(message)
return {}

overview_path = _find_overview_path(assets_root)
if overview_path is None:
message = (
"No cvat_annotation_overview.json or cvat_overview.json found "
f"for subset {subset_name} under {assets_root}"
)
if strict:
raise FileNotFoundError(message)
_LOGGER.warning(message)
return {}

overview = AnnotationOverview.load_from_json(overview_path)
assets: Dict[str, DocumentAsset] = {}
for annotated_doc in overview.doc_annotations:
asset_path = annotated_doc.bin_file
if not asset_path.is_absolute():
asset_path = (assets_root / asset_path).resolve()

if not asset_path.exists():
message = (
f"Binary file {asset_path} referenced in {overview_path} "
f"is missing for subset {subset_name}"
)
if strict:
raise FileNotFoundError(message)
_LOGGER.warning(message)
continue

assets[annotated_doc.doc_name] = DocumentAsset(
path=asset_path,
mime_type=annotated_doc.mime_type,
doc_hash=annotated_doc.doc_hash if annotated_doc.doc_hash else None,
)
return assets


def _build_assets_index(
subset_names: Sequence[str],
datasets_root: Path | None,
assets_dirname: str,
*,
strict: bool,
) -> Dict[str, Dict[str, DocumentAsset]]:
if datasets_root is None:
return {}

assets_index: Dict[str, Dict[str, DocumentAsset]] = {}
for subset_name in subset_names:
subset_assets = _load_subset_assets(
subset_name,
datasets_root,
assets_dirname,
strict=strict,
)
if subset_assets:
assets_index[subset_name] = subset_assets
elif strict:
raise ValueError(
f"No assets discovered for subset {subset_name} under {datasets_root}"
)

return assets_index


FEATURE_FIELD_RENDER: Dict[FieldType, tuple[str, str]] = {
FieldType.STRING: ("dtype", "string"),
FieldType.BINARY: ("dtype", "binary"),
Expand All @@ -81,6 +183,7 @@ def discover_subset_sources(
deliveries_root: Path,
export_kind: DeliveryExportKind,
custom_dirname: str | None = None,
allowed_submissions: Set[str] | None = None,
) -> Dict[str, List[Path]]:
"""
Discover subset source directories containing JSON exports.
Expand All @@ -89,6 +192,7 @@ def discover_subset_sources(
deliveries_root: Root directory containing submission folders
export_kind: Type of export to discover (ground truth or predictions)
custom_dirname: Optional custom directory name to use instead of the default
allowed_submissions: Optional set of submission directory names to include

Returns:
Dictionary mapping subset names to lists of source directories
Expand All @@ -98,16 +202,30 @@ def discover_subset_sources(
custom_dirname if custom_dirname is not None else export_kind.folder_name()
)

discovered_submissions: Set[str] = set()

for submission_dir in sorted(
p for p in deliveries_root.glob(SUBMISSION_DIR_GLOB) if p.is_dir()
p
for p in deliveries_root.glob(SUBMISSION_DIR_GLOB)
if p.is_dir() and (allowed_submissions is None or p.name in allowed_submissions)
):
discovered_submissions.add(submission_dir.name)
for subset_dir in sorted(p for p in submission_dir.iterdir() if p.is_dir()):
candidate = subset_dir / dirname
if candidate.is_dir():
subset_dirs[subset_dir.name].append(candidate)
else:
_LOGGER.debug("Skipping %s (no %s)", candidate, dirname)

if allowed_submissions is not None:
missing = allowed_submissions.difference(discovered_submissions)
if missing:
_LOGGER.warning(
"Requested submissions not found under %s: %s",
deliveries_root,
", ".join(sorted(missing)),
)

return subset_dirs


Expand Down Expand Up @@ -173,20 +291,65 @@ def read_num_rows(dataset_root: Path) -> int:


def iter_records_with_tags(
builder: FileDatasetBuilder, file_to_subset: Dict[str, str]
builder: FileDatasetBuilder,
file_to_subset: Dict[str, str],
subset_assets: Dict[str, Dict[str, DocumentAsset]] | None = None,
*,
assets_required: bool = False,
) -> Iterable[DatasetRecord]:
"""
Iterate over records from FileDatasetBuilder and add subset tags.

The builder creates records with doc_id from filename.stem (original filename),
so we match it to our file_to_subset mapping to find the subset name.
so we match it to our file_to_subset mapping to find the subset name. When
subset_assets is provided, the original binary and mime_type are pulled
from the CVAT overview instead of the JSON payload.
"""
subset_assets = subset_assets or {}
for record in builder.iterate():
# FileDatasetBuilder sets doc_id to filename.stem (original filename)
# Match it to our file_to_subset mapping
subset_name = file_to_subset.get(record.doc_id)
if subset_name:
record.tags.append(f"subset:{subset_name}")

assets_for_subset = subset_assets.get(subset_name)
if assets_required and assets_for_subset is None:
raise ValueError(
f"No assets loaded for subset {subset_name}; "
"ensure --datasets-root points to the base datasets."
)

if assets_for_subset:
asset = assets_for_subset.get(record.doc_id)
if asset is None and assets_required:
raise ValueError(
f"Document {record.doc_id} missing from overview for subset {subset_name}"
)
if asset:
try:
file_bytes = asset.path.read_bytes()
except OSError as exc:
if assets_required:
raise
_LOGGER.warning(
"Unable to read binary for %s (%s): %s",
record.doc_id,
asset.path,
exc,
)
else:
record.doc_path = asset.path
record.doc_hash = asset.doc_hash or get_binhash(file_bytes)
resolved_mime = (
asset.mime_type or mimetypes.guess_type(asset.path.name)[0]
)
if resolved_mime:
record.mime_type = resolved_mime
record.original = DocumentStream(
name=asset.path.name,
stream=BytesIO(file_bytes),
)
yield record


Expand All @@ -199,9 +362,14 @@ def build_combined_dataset(
chunk_size: int,
export_kind: DeliveryExportKind,
force: bool,
subset_assets: Dict[str, Dict[str, DocumentAsset]] | None = None,
*,
assets_required: bool = False,
) -> CombinedBuildStats | None:
"""
Build a single combined dataset from all subsets with subset tags.
Build a single combined dataset from all subsets with subset tags. When
subset_assets is provided, originals are populated from the referenced
binaries described in the CVAT overviews.
"""
target_root = output_root / dataset_dir_name
if target_root.exists():
Expand Down Expand Up @@ -241,7 +409,13 @@ def build_combined_dataset(
chunk_count = 0

for record_chunk in chunkify(
iter_records_with_tags(builder, file_to_subset), chunk_size
iter_records_with_tags(
builder=builder,
file_to_subset=file_to_subset,
subset_assets=subset_assets,
assets_required=assets_required,
),
chunk_size,
):
record_list = [r.as_record_dict() for r in record_chunk]
save_shard_to_disk(
Expand Down Expand Up @@ -391,10 +565,34 @@ def main(
"(default: predictions_json). Only used when --export-kind is predictions."
),
),
datasets_root: Path | None = typer.Option(
None,
"--datasets-root",
help=(
"Root containing <subset>/<assets-dirname>/cvat_annotation_overview.json "
"or cvat_overview.json. When provided, originals are pulled from the "
"referenced PDF/image binaries instead of the JSON payload."
),
),
assets_dirname: str = typer.Option(
"cvat_dataset_preannotated",
"--assets-dirname",
help="Name of the directory under each subset that holds the CVAT assets tree.",
),
include_submissions: List[str] | None = typer.Option(
None,
"--include-submission",
"-s",
help=(
"Restrict processing to these submission directories "
"(by folder name, e.g., submission-01). Can be provided multiple times."
),
),
) -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
deliveries_root = deliveries_root.expanduser().resolve()
output_dir = output_dir.expanduser().resolve()
datasets_root = datasets_root.expanduser().resolve() if datasets_root else None
staging_root = output_dir / "_staging"

if not deliveries_root.exists():
Expand All @@ -413,7 +611,10 @@ def main(
custom_dirname if custom_dirname is not None else export_kind.folder_name()
)
subset_sources = discover_subset_sources(
deliveries_root, export_kind, custom_dirname
deliveries_root,
export_kind,
custom_dirname,
allowed_submissions=set(include_submissions) if include_submissions else None,
)
if not subset_sources:
typer.echo(
Expand All @@ -424,6 +625,24 @@ def main(

# Sort subset sources for deterministic ordering
sorted_subset_sources = {k: sorted(v) for k, v in sorted(subset_sources.items())}
subset_assets = _build_assets_index(
subset_names=list(sorted_subset_sources.keys()),
datasets_root=datasets_root,
assets_dirname=assets_dirname,
strict=datasets_root is not None,
)
if datasets_root is not None:
missing_subsets = [
subset
for subset in sorted_subset_sources.keys()
if subset not in subset_assets
]
if missing_subsets:
raise RuntimeError(
"Missing assets for subsets: "
+ ", ".join(missing_subsets)
+ ". Ensure datasets-root contains matching subsets."
)

staging_dir = staging_root / "combined"
stats = build_combined_dataset(
Expand All @@ -435,6 +654,8 @@ def main(
chunk_size=chunk_size,
export_kind=export_kind,
force=force,
subset_assets=subset_assets,
assets_required=datasets_root is not None,
)

shutil.rmtree(staging_root, ignore_errors=True)
Expand Down
Loading