Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 149 additions & 37 deletions docling_eval/campaign_tools/cvat_deliveries_to_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from fnmatch import fnmatch
from io import BytesIO
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Set
Expand Down Expand Up @@ -55,9 +56,12 @@ class CombinedBuildStats:
submission_count: int
subsets: List[str]

def as_config(self, dataset_dir_name: str, split: str) -> ConfigEntry:
def as_config(
self, dataset_dir_name: str, split: str, config_name: str | None = None
) -> ConfigEntry:
name = config_name if config_name is not None else "default"
pattern = f"{dataset_dir_name}/{split}/shard_*.parquet"
return ConfigEntry(name="default", split=split, path_pattern=pattern)
return ConfigEntry(name=name, split=split, path_pattern=pattern)


@dataclass(frozen=True)
Expand All @@ -67,6 +71,18 @@ class DocumentAsset:
doc_hash: str | None


@dataclass(frozen=True)
class SplitRule:
pattern: str
split: str


@dataclass(frozen=True)
class BuildResult:
config: ConfigEntry
stats: CombinedBuildStats


def _find_overview_path(assets_root: Path) -> Path | None:
for candidate_name in ("cvat_annotation_overview.json", "cvat_overview.json"):
candidate_path = assets_root / candidate_name
Expand Down Expand Up @@ -229,6 +245,57 @@ def discover_subset_sources(
return subset_dirs


def _parse_subset_split_rules(rules: List[str]) -> List[SplitRule]:
parsed_rules: List[SplitRule] = []
for rule in rules:
if "=" not in rule:
raise typer.BadParameter(
f"Invalid --subset-split value '{rule}'. Expected format pattern=split."
)
pattern, split = rule.split("=", maxsplit=1)
if not pattern or not split:
raise typer.BadParameter(
f"Invalid --subset-split value '{rule}'. Both pattern and split are required."
)
parsed_rules.append(SplitRule(pattern=pattern, split=split))
return parsed_rules


def _route_subsets_to_splits(
subset_sources: Dict[str, List[Path]],
rules: List[SplitRule],
default_split: str,
*,
fail_on_unmatched: bool,
) -> Dict[str, Dict[str, List[Path]]]:
split_map: Dict[str, Dict[str, List[Path]]] = defaultdict(dict)

for subset_name, sources in subset_sources.items():
matched_split = None
for rule in rules:
if fnmatch(subset_name, rule.pattern):
matched_split = rule.split
break

if matched_split is None:
if fail_on_unmatched:
raise RuntimeError(
f"Subset '{subset_name}' did not match any --subset-split rule "
"and --fail-on-unmatched was provided."
)
matched_split = default_split

split_map[matched_split][subset_name] = sources

return split_map


def _dataset_dir_for_split(base_name: str, split: str, multi_split: bool) -> str:
if not multi_split:
return base_name
return f"{base_name}-{split}"


def ensure_clean_dir(path: Path) -> None:
if path.exists():
shutil.rmtree(path)
Expand Down Expand Up @@ -448,38 +515,40 @@ def build_combined_dataset(
)


def render_configs_block(config: ConfigEntry) -> List[str]:
def render_configs_block(configs: Sequence[ConfigEntry]) -> List[str]:
lines: List[str] = ["configs:"]
lines.append(f"- config_name: {config.name}")
lines.append(" data_files:")
lines.append(f" - split: {config.split}")
lines.append(f" path: {config.path_pattern}")
for config in configs:
lines.append(f"- config_name: {config.name}")
lines.append(" data_files:")
lines.append(f" - split: {config.split}")
lines.append(f" path: {config.path_pattern}")
return lines


def render_dataset_info_block(config: ConfigEntry) -> List[str]:
def render_dataset_info_block(configs: Sequence[ConfigEntry]) -> List[str]:
lines: List[str] = ["dataset_info:"]
feature_rows = iter_dataset_features()
lines.append(f"- config_name: {config.name}")
lines.append(" features:")
for feature_name, attr, value in feature_rows:
lines.append(f" - name: {feature_name}")
lines.append(f" {attr}: {value}")
lines.append("")
for config in configs:
feature_rows = iter_dataset_features()
lines.append(f"- config_name: {config.name}")
lines.append(" features:")
for feature_name, attr, value in feature_rows:
lines.append(f" - name: {feature_name}")
lines.append(f" {attr}: {value}")
lines.append("")
return lines


def write_readme(
output_root: Path,
config: ConfigEntry,
stats: CombinedBuildStats,
builds: Sequence[BuildResult],
license_name: str,
export_kind: DeliveryExportKind,
custom_dirname: str | None = None,
) -> None:
lines: List[str] = ["---"]
lines.extend(render_configs_block(config))
lines.extend(render_dataset_info_block(config))
configs = [build.config for build in builds]
lines.extend(render_configs_block(configs))
lines.extend(render_dataset_info_block(configs))
lines.append(f"license: {license_name}")
lines.append("---")
lines.append("")
Expand All @@ -504,9 +573,12 @@ def write_readme(
)
lines.append("")
lines.append("## Dataset Statistics")
lines.append(f"- Total records: {stats.record_count}")
lines.append(f"- Total submissions: {stats.submission_count}")
lines.append(f"- Subsets included: {', '.join(f'`{s}`' for s in stats.subsets)}")
for build in builds:
lines.append(
f"- `{build.config.name}` ({build.config.split}): "
f"{build.stats.record_count} records from {build.stats.submission_count} submissions; "
f"subsets: {', '.join(f'`{s}`' for s in build.stats.subsets)}"
)
readme_path = output_root / "README.md"
with readme_path.open("w", encoding="utf-8") as handle:
handle.write("\n".join(lines).rstrip() + "\n")
Expand Down Expand Up @@ -565,6 +637,22 @@ def main(
"(default: predictions_json). Only used when --export-kind is predictions."
),
),
subset_split: List[str] | None = typer.Option(
None,
"--subset-split",
help=(
"Route subsets to splits using pattern=split (fnmatch). "
"Example: --subset-split pdf_val=validation --subset-split 'pdf_train_*'=train"
),
),
fail_on_unmatched: bool = typer.Option(
False,
"--fail-on-unmatched/--allow-unmatched",
help=(
"Error if a subset does not match any --subset-split rule instead of "
"falling back to the default --split."
),
),
datasets_root: Path | None = typer.Option(
None,
"--datasets-root",
Expand Down Expand Up @@ -594,6 +682,7 @@ def main(
output_dir = output_dir.expanduser().resolve()
datasets_root = datasets_root.expanduser().resolve() if datasets_root else None
staging_root = output_dir / "_staging"
subset_split_rules = _parse_subset_split_rules(subset_split) if subset_split else []

if not deliveries_root.exists():
raise typer.BadParameter(f"{deliveries_root} does not exist.")
Expand Down Expand Up @@ -645,30 +734,53 @@ def main(
)

staging_dir = staging_root / "combined"
stats = build_combined_dataset(
subset_sources=sorted_subset_sources,
staging_dir=staging_dir,
output_root=output_dir,
dataset_dir_name=dataset_dir_name,
split=split,
chunk_size=chunk_size,
export_kind=export_kind,
force=force,
subset_assets=subset_assets,
assets_required=datasets_root is not None,
split_map = (
_route_subsets_to_splits(
subset_sources=sorted_subset_sources,
rules=subset_split_rules,
default_split=split,
fail_on_unmatched=fail_on_unmatched,
)
if subset_split_rules
else {split: sorted_subset_sources}
)

build_results: List[BuildResult] = []
multi_split = len(split_map) > 1

for split_name in sorted(split_map.keys()):
staging_dir = staging_root / f"combined-{split_name}"
dataset_dir = _dataset_dir_for_split(dataset_dir_name, split_name, multi_split)
stats = build_combined_dataset(
subset_sources=split_map[split_name],
staging_dir=staging_dir,
output_root=output_dir,
dataset_dir_name=dataset_dir,
split=split_name,
chunk_size=chunk_size,
export_kind=export_kind,
force=force,
subset_assets=subset_assets,
assets_required=datasets_root is not None,
)
if stats is not None:
config_name = split_name if multi_split or subset_split_rules else None
config = stats.as_config(
dataset_dir_name=dataset_dir,
split=split_name,
config_name=config_name,
)
build_results.append(BuildResult(config=config, stats=stats))

shutil.rmtree(staging_root, ignore_errors=True)

if not stats:
if not build_results:
typer.echo("No datasets were produced.", err=True)
raise typer.Exit(code=1)

config = stats.as_config(dataset_dir_name, split)
write_readme(
output_root=output_dir,
config=config,
stats=stats,
builds=build_results,
license_name=license_name,
export_kind=export_kind,
custom_dirname=custom_dirname,
Expand Down
Loading