77from dataclasses import dataclass
88from enum import Enum
99from pathlib import Path
10- from typing import Dict , List , Sequence
10+ from typing import Dict , Iterable , List , Sequence
1111
1212import typer
1313
1919app = typer .Typer (
2020 help = (
2121 "Aggregate DoclingDocument JSON exports produced by the CVAT delivery "
22- "pipeline into HuggingFace-ready parquet datasets, grouped by subset name ."
22+ "pipeline into a single HuggingFace-ready parquet dataset with subset tags ."
2323 ),
2424 no_args_is_help = True ,
2525 add_completion = False ,
@@ -42,14 +42,14 @@ class ConfigEntry:
4242
4343
4444@dataclass (frozen = True )
45- class SubsetBuildStats :
46- name : str
47- submissions : int
45+ class CombinedBuildStats :
4846 record_count : int
47+ submission_count : int
48+ subsets : List [str ]
4949
5050 def as_config (self , dataset_dir_name : str , split : str ) -> ConfigEntry :
51- pattern = f"{ self . name } / { dataset_dir_name } /{ split } /shard_*.parquet"
52- return ConfigEntry (name = self . name , split = split , path_pattern = pattern )
51+ pattern = f"{ dataset_dir_name } /{ split } /shard_*.parquet"
52+ return ConfigEntry (name = "default" , split = split , path_pattern = pattern )
5353
5454
5555FEATURE_FIELD_RENDER : Dict [FieldType , tuple [str , str ]] = {
@@ -109,23 +109,37 @@ def link_file(source: Path, destination: Path) -> None:
109109
110110def populate_staging_dir (
111111 staging_dir : Path ,
112- source_dirs : Sequence [Path ],
113- ) -> int :
112+ subset_sources : Dict [str , List [Path ]],
113+ ) -> tuple [int , Dict [str , str ]]:
114+ """
115+ Populate staging directory with files from all subsets.
116+
117+ Returns:
118+ Tuple of (total_file_count, mapping from staged_filename to subset_name)
119+ """
114120 file_index = 0
121+ file_to_subset : Dict [str , str ] = {}
115122
116- for dir_idx , source_dir in enumerate (source_dirs ):
117- json_files = sorted (p for p in source_dir .glob ("*.json" ) if p .is_file ())
118- if not json_files :
119- _LOGGER .warning ("No JSON files found under %s" , source_dir )
120- continue
121-
122- for json_file in json_files :
123- link_name = f"{ dir_idx :03d} _{ file_index :06d} _{ json_file .name } "
124- destination = staging_dir / link_name
125- link_file (json_file , destination )
126- file_index += 1
123+ for subset_name in sorted (subset_sources .keys ()):
124+ for dir_idx , source_dir in enumerate (subset_sources [subset_name ]):
125+ json_files = sorted (p for p in source_dir .glob ("*.json" ) if p .is_file ())
126+ if not json_files :
127+ _LOGGER .warning ("No JSON files found under %s" , source_dir )
128+ continue
129+
130+ for json_file in json_files :
131+ # Prefix with subset name so we can extract it later
132+ link_name = (
133+ f"{ subset_name } _{ dir_idx :03d} _{ file_index :06d} _{ json_file .name } "
134+ )
135+ destination = staging_dir / link_name
136+ link_file (json_file , destination )
137+ # Map the staged filename (without extension) to subset name
138+ staged_stem = destination .stem
139+ file_to_subset [staged_stem ] = subset_name
140+ file_index += 1
127141
128- return file_index
142+ return file_index , file_to_subset
129143
130144
131145def read_num_rows (dataset_root : Path ) -> int :
@@ -144,18 +158,38 @@ def read_num_rows(dataset_root: Path) -> int:
144158 return 0
145159
146160
147- def build_subset_dataset (
148- subset_name : str ,
149- subset_dirs : Sequence [Path ],
150- staging_root : Path ,
161+ def iter_records_with_tags (
162+ builder : FileDatasetBuilder , file_to_subset : Dict [str , str ]
163+ ) -> Iterable [DatasetRecord ]:
164+ """
165+ Iterate over records from FileDatasetBuilder and add subset tags.
166+
167+ The builder creates records with doc_id from filename.stem, so we match
168+ the staged filename stem to find the subset name.
169+ """
170+ for record in builder .iterate ():
171+ # FileDatasetBuilder sets doc_id to filename.stem
172+ # Match it to our file_to_subset mapping
173+ subset_name = file_to_subset .get (record .doc_id )
174+ if subset_name :
175+ record .tags .append (f"subset:{ subset_name } " )
176+ yield record
177+
178+
179+ def build_combined_dataset (
180+ subset_sources : Dict [str , List [Path ]],
181+ staging_dir : Path ,
151182 output_root : Path ,
152183 dataset_dir_name : str ,
153184 split : str ,
154185 chunk_size : int ,
155186 export_kind : DeliveryExportKind ,
156187 force : bool ,
157- ) -> SubsetBuildStats | None :
158- target_root = output_root / subset_name / dataset_dir_name
188+ ) -> CombinedBuildStats | None :
189+ """
190+ Build a single combined dataset from all subsets with subset tags.
191+ """
192+ target_root = output_root / dataset_dir_name
159193 if target_root .exists ():
160194 if not force :
161195 raise RuntimeError (
@@ -166,85 +200,117 @@ def build_subset_dataset(
166200 )
167201 shutil .rmtree (target_root )
168202
169- ensure_clean_dir (staging_root / subset_name )
170- staging_dir = staging_root / subset_name
171- processed = populate_staging_dir (staging_dir , subset_dirs )
203+ ensure_clean_dir (staging_dir )
204+ processed , file_to_subset = populate_staging_dir (staging_dir , subset_sources )
172205 if processed == 0 :
173- _LOGGER .warning ("Subset %s had no JSON payloads . Skipping." , subset_name )
206+ _LOGGER .warning ("No JSON payloads found across all subsets . Skipping." )
174207 shutil .rmtree (staging_dir )
175208 return None
176209
177210 builder = FileDatasetBuilder (
178- name = f"{ subset_name } -{ export_kind .value } " ,
211+ name = f"combined -{ export_kind .value } " ,
179212 dataset_source = staging_dir ,
180213 target = target_root ,
181214 split = split ,
182215 file_extensions = ["json" ],
183216 )
184- builder .save_to_disk (chunk_size = chunk_size )
217+
218+ # Custom save logic that adds tags
219+ from docling .utils .utils import chunkify
220+
221+ from docling_eval .utils .utils import save_shard_to_disk , write_datasets_info
222+
223+ test_dir = target_root / split
224+ test_dir .mkdir (parents = True , exist_ok = True )
225+
226+ count = 0
227+ chunk_count = 0
228+
229+ for record_chunk in chunkify (
230+ iter_records_with_tags (builder , file_to_subset ), chunk_size
231+ ):
232+ record_list = [r .as_record_dict () for r in record_chunk ]
233+ save_shard_to_disk (
234+ items = record_list ,
235+ dataset_path = test_dir ,
236+ schema = DatasetRecord .pyarrow_schema (),
237+ shard_id = chunk_count ,
238+ )
239+ count += len (record_list )
240+ chunk_count += 1
241+
242+ write_datasets_info (
243+ name = builder .name ,
244+ output_dir = target_root ,
245+ num_train_rows = 0 ,
246+ num_test_rows = count ,
247+ features = DatasetRecord .features (),
248+ )
249+
185250 shutil .rmtree (staging_dir )
186251
187252 record_count = read_num_rows (target_root )
188- submission_count = len (subset_dirs )
253+ submission_count = sum (len (dirs ) for dirs in subset_sources .values ())
254+ subset_names = sorted (subset_sources .keys ())
189255
190- return SubsetBuildStats (
191- name = subset_name ,
192- submissions = submission_count ,
256+ return CombinedBuildStats (
193257 record_count = record_count if record_count else processed ,
258+ submission_count = submission_count ,
259+ subsets = subset_names ,
194260 )
195261
196262
197- def render_configs_block (configs : Sequence [ ConfigEntry ] ) -> List [str ]:
263+ def render_configs_block (config : ConfigEntry ) -> List [str ]:
198264 lines : List [str ] = ["configs:" ]
199- for entry in configs :
200- lines .append (f"- config_name: { entry .name } " )
201- lines .append (" data_files:" )
202- lines .append (f" - split: { entry .split } " )
203- lines .append (f" path: { entry .path_pattern } " )
265+ lines .append (f"- config_name: { config .name } " )
266+ lines .append (" data_files:" )
267+ lines .append (f" - split: { config .split } " )
268+ lines .append (f" path: { config .path_pattern } " )
204269 return lines
205270
206271
207- def render_dataset_info_block (configs : Sequence [ ConfigEntry ] ) -> List [str ]:
272+ def render_dataset_info_block (config : ConfigEntry ) -> List [str ]:
208273 lines : List [str ] = ["dataset_info:" ]
209274 feature_rows = iter_dataset_features ()
210- for entry in configs :
211- lines .append (f"- config_name: { entry .name } " )
212- lines .append (" features:" )
213- for feature_name , attr , value in feature_rows :
214- lines .append (f" - name: { feature_name } " )
215- lines .append (f" { attr } : { value } " )
216- lines .append ("" )
275+ lines .append (f"- config_name: { config .name } " )
276+ lines .append (" features:" )
277+ for feature_name , attr , value in feature_rows :
278+ lines .append (f" - name: { feature_name } " )
279+ lines .append (f" { attr } : { value } " )
280+ lines .append ("" )
217281 return lines
218282
219283
220284def write_readme (
221285 output_root : Path ,
222- configs : Sequence [ ConfigEntry ] ,
223- stats : Sequence [ SubsetBuildStats ] ,
286+ config : ConfigEntry ,
287+ stats : CombinedBuildStats ,
224288 license_name : str ,
225289 export_kind : DeliveryExportKind ,
226290) -> None :
227291 lines : List [str ] = ["---" ]
228- lines .extend (render_configs_block (configs ))
229- lines .extend (render_dataset_info_block (configs ))
292+ lines .extend (render_configs_block (config ))
293+ lines .extend (render_dataset_info_block (config ))
230294 lines .append (f"license: { license_name } " )
231295 lines .append ("---" )
232296 lines .append ("" )
233297 lines .append ("# CVAT Delivery Aggregation" )
234298 lines .append (
235299 "This repository consolidates DoclingDocument exports produced by the "
236- "CVAT delivery pipeline into HuggingFace-ready parquet datasets ."
300+ "CVAT delivery pipeline into a single HuggingFace-ready parquet dataset ."
237301 )
238302 lines .append ("" )
239303 lines .append (f"- Source payloads: `{ export_kind .folder_name ()} `" )
240304 lines .append ("- Builder: `FileDatasetBuilder` with JSON inputs" )
305+ lines .append (
306+ "- Subset information: Each record includes a `tags` field with "
307+ "`subset:<name>` entries indicating the source subset"
308+ )
241309 lines .append ("" )
242- lines .append ("## Subsets" )
243- for subset in stats :
244- lines .append (
245- f"- `{ subset .name } `: { subset .record_count } documents from "
246- f"{ subset .submissions } submissions"
247- )
310+ lines .append ("## Dataset Statistics" )
311+ lines .append (f"- Total records: { stats .record_count } " )
312+ lines .append (f"- Total submissions: { stats .submission_count } " )
313+ lines .append (f"- Subsets included: { ', ' .join (f'`{ s } `' for s in stats .subsets )} " )
248314 readme_path = output_root / "README.md"
249315 with readme_path .open ("w" , encoding = "utf-8" ) as handle :
250316 handle .write ("\n " .join (lines ).rstrip () + "\n " )
@@ -270,7 +336,7 @@ def main(
270336 dataset_dir_name : str = typer .Option (
271337 "gt_dataset" ,
272338 "--dataset-dir-name" ,
273- help = "Name of the dataset directory created inside each subset folder." ,
339+ help = "Name of the dataset directory created in the output folder." ,
274340 ),
275341 chunk_size : int = typer .Option (
276342 80 ,
@@ -285,7 +351,7 @@ def main(
285351 force : bool = typer .Option (
286352 False ,
287353 "--force/--no-force" ,
288- help = "Overwrite any existing subset dataset directories and staging data." ,
354+ help = "Overwrite any existing dataset directory and staging data." ,
289355 ),
290356) -> None :
291357 logging .basicConfig (level = logging .INFO , format = "%(levelname)s: %(message)s" )
@@ -308,33 +374,32 @@ def main(
308374 )
309375 raise typer .Exit (code = 1 )
310376
311- built_stats : List [SubsetBuildStats ] = []
312- for subset_name in sorted (subset_sources .keys ()):
313- stats = build_subset_dataset (
314- subset_name = subset_name ,
315- subset_dirs = sorted (subset_sources [subset_name ]),
316- staging_root = staging_root ,
317- output_root = output_dir ,
318- dataset_dir_name = dataset_dir_name ,
319- split = split ,
320- chunk_size = chunk_size ,
321- export_kind = export_kind ,
322- force = force ,
323- )
324- if stats :
325- built_stats .append (stats )
377+ # Sort subset sources for deterministic ordering
378+ sorted_subset_sources = {k : sorted (v ) for k , v in sorted (subset_sources .items ())}
379+
380+ staging_dir = staging_root / "combined"
381+ stats = build_combined_dataset (
382+ subset_sources = sorted_subset_sources ,
383+ staging_dir = staging_dir ,
384+ output_root = output_dir ,
385+ dataset_dir_name = dataset_dir_name ,
386+ split = split ,
387+ chunk_size = chunk_size ,
388+ export_kind = export_kind ,
389+ force = force ,
390+ )
326391
327392 shutil .rmtree (staging_root , ignore_errors = True )
328393
329- if not built_stats :
394+ if not stats :
330395 typer .echo ("No datasets were produced." , err = True )
331396 raise typer .Exit (code = 1 )
332397
333- configs = [ stat .as_config (dataset_dir_name , split ) for stat in built_stats ]
398+ config = stats .as_config (dataset_dir_name , split )
334399 write_readme (
335400 output_root = output_dir ,
336- configs = configs ,
337- stats = built_stats ,
401+ config = config ,
402+ stats = stats ,
338403 license_name = license_name ,
339404 export_kind = export_kind ,
340405 )
0 commit comments