4242logger = logging .getLogger (__name__ )
4343
4444WAITING_TIME_IF_NO_TASKS = 10 # seconds
45- MAX_NB_REGULAR_FILES_PER_COMMIT = 75
46- MAX_NB_LFS_FILES_PER_COMMIT = 150
45+ MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
4746COMMIT_SIZE_SCALE : List [int ] = [20 , 50 , 75 , 100 , 125 , 200 , 250 , 400 , 600 , 1000 ]
4847
4948
@@ -404,19 +403,19 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
404403 ):
405404 status .nb_workers_commit += 1
406405 logger .debug ("Job: commit (more than 5 minutes since last commit attempt)" )
407- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
406+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
408407
409408 # 2. Commit if at least 100 files are ready to commit
410409 elif status .nb_workers_commit == 0 and status .queue_commit .qsize () >= 150 :
411410 status .nb_workers_commit += 1
412411 logger .debug ("Job: commit (>100 files ready)" )
413- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
412+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
414413
415- # 3. Get upload mode if at least 10 files
416- elif status .queue_get_upload_mode .qsize () >= 10 :
414+ # 3. Get upload mode if at least 100 files
415+ elif status .queue_get_upload_mode .qsize () >= MAX_NB_FILES_FETCH_UPLOAD_MODE :
417416 status .nb_workers_get_upload_mode += 1
418- logger .debug ("Job: get upload mode (>10 files ready)" )
419- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
417+ logger .debug (f "Job: get upload mode (>{ MAX_NB_FILES_FETCH_UPLOAD_MODE } files ready)" )
418+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
420419
421420 # 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
422421 elif status .queue_preupload_lfs .qsize () > 0 and status .nb_workers_preupload_lfs == 0 :
@@ -434,7 +433,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
434433 elif status .queue_get_upload_mode .qsize () > 0 and status .nb_workers_get_upload_mode == 0 :
435434 status .nb_workers_get_upload_mode += 1
436435 logger .debug ("Job: get upload mode (no other worker getting upload mode)" )
437- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
436+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
438437
439438 # 7. Preupload LFS file if at least 1 file
440439 # Skip if hf_transfer is enabled and there is already a worker preuploading LFS
@@ -455,7 +454,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
455454 elif status .queue_get_upload_mode .qsize () > 0 :
456455 status .nb_workers_get_upload_mode += 1
457456 logger .debug ("Job: get upload mode" )
458- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
457+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
459458
460459 # 10. Commit if at least 1 file and 1 min since last commit attempt
461460 elif (
@@ -466,7 +465,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
466465 ):
467466 status .nb_workers_commit += 1
468467 logger .debug ("Job: commit (1 min since last commit attempt)" )
469- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
468+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
470469
471470 # 11. Commit if at least 1 file all other queues are empty and all workers are waiting
472471 # e.g. when it's the last commit
@@ -482,7 +481,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
482481 ):
483482 status .nb_workers_commit += 1
484483 logger .debug ("Job: commit" )
485- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
484+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
486485
487486 # 12. If all queues are empty, exit
488487 elif all (metadata .is_committed or metadata .should_ignore for _ , metadata in status .items ):
@@ -528,6 +527,7 @@ def _get_upload_mode(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_t
528527 paths , metadata = item
529528 metadata .upload_mode = addition ._upload_mode
530529 metadata .should_ignore = addition ._should_ignore
530+ metadata .remote_oid = addition ._remote_oid
531531 metadata .save (paths )
532532
533533
@@ -580,6 +580,9 @@ def _build_hacky_operation(item: JOB_ITEM_T) -> HackyCommitOperationAdd:
580580 if metadata .sha256 is None :
581581 raise ValueError ("sha256 must have been computed by now!" )
582582 operation .upload_info = UploadInfo (sha256 = bytes .fromhex (metadata .sha256 ), size = metadata .size , sample = sample )
583+ operation ._upload_mode = metadata .upload_mode # type: ignore[assignment]
584+ operation ._should_ignore = metadata .should_ignore
585+ operation ._remote_oid = metadata .remote_oid
583586 return operation
584587
585588
@@ -596,30 +599,6 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
596599 return [queue .get () for _ in range (min (queue .qsize (), n ))]
597600
598601
599- def _get_items_to_commit (queue : "queue.Queue[JOB_ITEM_T]" ) -> List [JOB_ITEM_T ]:
600- """Special case for commit job: the number of items to commit depends on the type of files."""
601- # Can take at most 50 regular files and/or 100 LFS files in a single commit
602- items : List [JOB_ITEM_T ] = []
603- nb_lfs , nb_regular = 0 , 0
604- while True :
605- # If empty queue => commit everything
606- if queue .qsize () == 0 :
607- return items
608-
609- # If we have enough items => commit them
610- if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT :
611- return items
612-
613- # Else, get a new item and increase counter
614- item = queue .get ()
615- items .append (item )
616- _ , metadata = item
617- if metadata .upload_mode == "lfs" :
618- nb_lfs += 1
619- else :
620- nb_regular += 1
621-
622-
623602def _print_overwrite (report : str ) -> None :
624603 """Print a report, overwriting the previous lines.
625604
0 commit comments