3333from .constants import DEFAULT_REVISION , REPO_TYPES
3434from .utils import DEFAULT_IGNORE_PATTERNS , filter_repo_objects , tqdm
3535from .utils ._cache_manager import _format_size
36+ from .utils ._runtime import is_xet_available
3637from .utils .sha import sha_fileobj
3738
3839
4546MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
4647COMMIT_SIZE_SCALE : List [int ] = [20 , 50 , 75 , 100 , 125 , 200 , 250 , 400 , 600 , 1000 ]
4748
49+ UPLOAD_BATCH_SIZE_XET = 256 # Max 256 files per upload batch for XET-enabled repos
50+ UPLOAD_BATCH_SIZE_LFS = 1 # Otherwise, batches of 1 for regular LFS upload
51+
4852
4953def upload_large_folder_internal (
5054 api : "HfApi" ,
@@ -93,6 +97,17 @@ def upload_large_folder_internal(
9397 repo_url = api .create_repo (repo_id = repo_id , repo_type = repo_type , private = private , exist_ok = True )
9498 logger .info (f"Repo created: { repo_url } " )
9599 repo_id = repo_url .repo_id
100+ # 2.1 Check if xet is enabled to set batch file upload size
101+ is_xet_enabled = (
102+ is_xet_available ()
103+ and api .repo_info (
104+ repo_id = repo_id ,
105+ repo_type = repo_type ,
106+ revision = revision ,
107+ expand = "xetEnabled" ,
108+ ).xet_enabled
109+ )
110+ upload_batch_size = UPLOAD_BATCH_SIZE_XET if is_xet_enabled else UPLOAD_BATCH_SIZE_LFS
96111
97112 # 3. List files to upload
98113 filtered_paths_list = filter_repo_objects (
@@ -110,7 +125,7 @@ def upload_large_folder_internal(
110125 ]
111126
112127 # 4. Start workers
113- status = LargeUploadStatus (items )
128+ status = LargeUploadStatus (items , upload_batch_size )
114129 threads = [
115130 threading .Thread (
116131 target = _worker_job ,
@@ -168,7 +183,7 @@ class WorkerJob(enum.Enum):
168183class LargeUploadStatus :
169184 """Contains information, queues and tasks for a large upload process."""
170185
171- def __init__ (self , items : List [JOB_ITEM_T ]):
186+ def __init__ (self , items : List [JOB_ITEM_T ], upload_batch_size : int = 1 ):
172187 self .items = items
173188 self .queue_sha256 : "queue.Queue[JOB_ITEM_T]" = queue .Queue ()
174189 self .queue_get_upload_mode : "queue.Queue[JOB_ITEM_T]" = queue .Queue ()
@@ -179,6 +194,7 @@ def __init__(self, items: List[JOB_ITEM_T]):
179194 self .nb_workers_sha256 : int = 0
180195 self .nb_workers_get_upload_mode : int = 0
181196 self .nb_workers_preupload_lfs : int = 0
197+ self .upload_batch_size : int = upload_batch_size
182198 self .nb_workers_commit : int = 0
183199 self .nb_workers_waiting : int = 0
184200 self .last_commit_attempt : Optional [float ] = None
@@ -353,16 +369,17 @@ def _worker_job(
353369 status .nb_workers_get_upload_mode -= 1
354370
355371 elif job == WorkerJob .PREUPLOAD_LFS :
356- item = items [0 ] # single item
357372 try :
358- _preupload_lfs (item , api = api , repo_id = repo_id , repo_type = repo_type , revision = revision )
359- status .queue_commit .put (item )
373+ _preupload_lfs (items , api = api , repo_id = repo_id , repo_type = repo_type , revision = revision )
374+ for item in items :
375+ status .queue_commit .put (item )
360376 except KeyboardInterrupt :
361377 raise
362378 except Exception as e :
363379 logger .error (f"Failed to preupload LFS: { e } " )
364380 traceback .format_exc ()
365- status .queue_preupload_lfs .put (item )
381+ for item in items :
382+ status .queue_preupload_lfs .put (item )
366383
367384 with status .lock :
368385 status .nb_workers_preupload_lfs -= 1
@@ -417,11 +434,11 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
417434 logger .debug (f"Job: get upload mode (>{ MAX_NB_FILES_FETCH_UPLOAD_MODE } files ready)" )
418435 return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
419436
420- # 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
421- elif status .queue_preupload_lfs .qsize () > 0 and status .nb_workers_preupload_lfs == 0 :
437+ # 4. Preupload LFS file if at least `status.upload_batch_size` files and no worker is preuploading LFS
438+ elif status .queue_preupload_lfs .qsize () >= status . upload_batch_size and status .nb_workers_preupload_lfs == 0 :
422439 status .nb_workers_preupload_lfs += 1
423440 logger .debug ("Job: preupload LFS (no other worker preuploading LFS)" )
424- return (WorkerJob .PREUPLOAD_LFS , _get_one (status .queue_preupload_lfs ))
441+ return (WorkerJob .PREUPLOAD_LFS , _get_n (status .queue_preupload_lfs , status . upload_batch_size ))
425442
426443 # 5. Compute sha256 if at least 1 file and no worker is computing sha256
427444 elif status .queue_sha256 .qsize () > 0 and status .nb_workers_sha256 == 0 :
@@ -435,14 +452,14 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
435452 logger .debug ("Job: get upload mode (no other worker getting upload mode)" )
436453 return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
437454
438- # 7. Preupload LFS file if at least 1 file
455+ # 7. Preupload LFS file if at least `status.upload_batch_size` files
439456 # Skip if hf_transfer is enabled and there is already a worker preuploading LFS
440- elif status .queue_preupload_lfs .qsize () > 0 and (
457+ elif status .queue_preupload_lfs .qsize () >= status . upload_batch_size and (
441458 status .nb_workers_preupload_lfs == 0 or not constants .HF_HUB_ENABLE_HF_TRANSFER
442459 ):
443460 status .nb_workers_preupload_lfs += 1
444461 logger .debug ("Job: preupload LFS" )
445- return (WorkerJob .PREUPLOAD_LFS , _get_one (status .queue_preupload_lfs ))
462+ return (WorkerJob .PREUPLOAD_LFS , _get_n (status .queue_preupload_lfs , status . upload_batch_size ))
446463
447464 # 8. Compute sha256 if at least 1 file
448465 elif status .queue_sha256 .qsize () > 0 :
@@ -456,7 +473,13 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
456473 logger .debug ("Job: get upload mode" )
457474 return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
458475
459- # 10. Commit if at least 1 file and 1 min since last commit attempt
476+ # 10. Preupload LFS file if at least 1 file
477+ elif status .queue_preupload_lfs .qsize () > 0 :
478+ status .nb_workers_preupload_lfs += 1
479+ logger .debug ("Job: preupload LFS" )
480+ return (WorkerJob .PREUPLOAD_LFS , _get_n (status .queue_preupload_lfs , status .upload_batch_size ))
481+
482+ # 11. Commit if at least 1 file and 1 min since last commit attempt
460483 elif (
461484 status .nb_workers_commit == 0
462485 and status .queue_commit .qsize () > 0
@@ -467,7 +490,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
467490 logger .debug ("Job: commit (1 min since last commit attempt)" )
468491 return (WorkerJob .COMMIT , _get_n (status .queue_commit , status .target_chunk ()))
469492
470- # 11 . Commit if at least 1 file all other queues are empty and all workers are waiting
493+ # 12 . Commit if at least 1 file all other queues are empty and all workers are waiting
471494 # e.g. when it's the last commit
472495 elif (
473496 status .nb_workers_commit == 0
@@ -483,12 +506,12 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
483506 logger .debug ("Job: commit" )
484507 return (WorkerJob .COMMIT , _get_n (status .queue_commit , status .target_chunk ()))
485508
486- # 12 . If all queues are empty, exit
509+ # 13 . If all queues are empty, exit
487510 elif all (metadata .is_committed or metadata .should_ignore for _ , metadata in status .items ):
488511 logger .info ("All files have been processed! Exiting worker." )
489512 return None
490513
491- # 13 . If no task is available, wait
514+ # 14 . If no task is available, wait
492515 else :
493516 status .nb_workers_waiting += 1
494517 logger .debug (f"No task available, waiting... ({ WAITING_TIME_IF_NO_TASKS } s)" )
@@ -531,19 +554,19 @@ def _get_upload_mode(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_t
531554 metadata .save (paths )
532555
533556
534- def _preupload_lfs (item : JOB_ITEM_T , api : "HfApi" , repo_id : str , repo_type : str , revision : str ) -> None :
535- """Preupload LFS file and update metadata."""
536- paths , metadata = item
537- addition = _build_hacky_operation (item )
557+ def _preupload_lfs (items : List [JOB_ITEM_T ], api : "HfApi" , repo_id : str , repo_type : str , revision : str ) -> None :
558+ """Preupload LFS files and update metadata."""
559+ additions = [_build_hacky_operation (item ) for item in items ]
538560 api .preupload_lfs_files (
539561 repo_id = repo_id ,
540562 repo_type = repo_type ,
541563 revision = revision ,
542- additions = [ addition ] ,
564+ additions = additions ,
543565 )
544566
545- metadata .is_uploaded = True
546- metadata .save (paths )
567+ for paths , metadata in items :
568+ metadata .is_uploaded = True
569+ metadata .save (paths )
547570
548571
549572def _commit (items : List [JOB_ITEM_T ], api : "HfApi" , repo_id : str , repo_type : str , revision : str ) -> None :
0 commit comments