diff --git a/src/huggingface_hub/_commit_api.py b/src/huggingface_hub/_commit_api.py index 9e8fa86e6c..7ed64b0e5e 100644 --- a/src/huggingface_hub/_commit_api.py +++ b/src/huggingface_hub/_commit_api.py @@ -33,6 +33,7 @@ validate_hf_hub_args, ) from .utils import tqdm as hf_tqdm +from .utils._runtime import is_xet_available if TYPE_CHECKING: @@ -353,7 +354,7 @@ def _warn_on_overwriting_operations(operations: List[CommitOperation]) -> None: @validate_hf_hub_args -def _upload_lfs_files( +def _upload_files( *, additions: List[CommitOperationAdd], repo_type: str, @@ -362,6 +363,86 @@ def _upload_lfs_files( endpoint: Optional[str] = None, num_threads: int = 5, revision: Optional[str] = None, + create_pr: Optional[bool] = None, +): + """ + Negotiates per-file transfer (LFS vs Xet) and uploads in batches. + """ + xet_additions: List[CommitOperationAdd] = [] + lfs_actions: List[Dict] = [] + lfs_oid2addop: Dict[str, CommitOperationAdd] = {} + + for chunk in chunk_iterable(additions, chunk_size=UPLOAD_BATCH_MAX_NUM_FILES): + chunk_list = [op for op in chunk] + + transfers: List[str] = ["basic", "multipart"] + has_buffered_io_data = any(isinstance(op.path_or_fileobj, io.BufferedIOBase) for op in chunk_list) + if is_xet_available(): + if not has_buffered_io_data: + transfers.append("xet") + else: + logger.warning( + "Uploading files as a binary IO buffer is not supported by Xet Storage. " + "Falling back to HTTP upload." + ) + + actions_chunk, errors_chunk, chosen_transfer = post_lfs_batch_info( + upload_infos=[op.upload_info for op in chunk_list], + repo_id=repo_id, + repo_type=repo_type, + revision=revision, + endpoint=endpoint, + headers=headers, + token=None, # already passed in 'headers' + transfers=transfers, + ) + if errors_chunk: + message = "\n".join( + [ + f"Encountered error for file with OID {err.get('oid')}: `{err.get('error', {}).get('message')}" + for err in errors_chunk + ] + ) + raise ValueError(f"LFS batch API returned errors:\n{message}") + + # If server returns a transfer we didn't offer (e.g "xet" while uploading from BytesIO), + # fall back to LFS for this chunk. + if chosen_transfer == "xet" and ("xet" in transfers): + xet_additions.extend(chunk_list) + else: + lfs_actions.extend(actions_chunk) + for op in chunk_list: + lfs_oid2addop[op.upload_info.sha256.hex()] = op + + if len(lfs_actions) > 0: + _upload_lfs_files( + actions=lfs_actions, + oid2addop=lfs_oid2addop, + headers=headers, + endpoint=endpoint, + num_threads=num_threads, + ) + + if len(xet_additions) > 0: + _upload_xet_files( + additions=xet_additions, + repo_type=repo_type, + repo_id=repo_id, + headers=headers, + endpoint=endpoint, + revision=revision, + create_pr=create_pr, + ) + + +@validate_hf_hub_args +def _upload_lfs_files( + *, + actions: List[Dict], + oid2addop: Dict[str, CommitOperationAdd], + headers: Dict[str, str], + endpoint: Optional[str] = None, + num_threads: int = 5, ): """ Uploads the content of `additions` to the Hub using the large file storage protocol. @@ -370,9 +451,21 @@ def _upload_lfs_files( - LFS Batch API: https://github.com/git-lfs/git-lfs/blob/main/docs/api/batch.md Args: - additions (`List` of `CommitOperationAdd`): - The files to be uploaded - repo_type (`str`): + actions (`List[Dict]`): + LFS batch actions returned by the server. + oid2addop (`Dict[str, CommitOperationAdd]`): + A dictionary mapping the OID of the file to the corresponding `CommitOperationAdd` object. + headers (`Dict[str, str]`): + Headers to use for the request, including authorization headers and user agent. + endpoint (`str`, *optional*): + The endpoint to use for the request. Defaults to `constants.ENDPOINT`. + num_threads (`int`, *optional*): + The number of concurrent threads to use when uploading. Defaults to 5. + + Raises: + [`EnvironmentError`](https://docs.python.org/3/library/exceptions.html#EnvironmentError) + If an upload failed for any reason + [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError) Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`. repo_id (`str`): A namespace (user or an organization) and a repo name separated @@ -392,50 +485,17 @@ def _upload_lfs_files( [`HTTPError`](https://requests.readthedocs.io/en/latest/api/#requests.HTTPError) If the LFS batch endpoint returned an HTTP error. """ - # Step 1: retrieve upload instructions from the LFS batch endpoint. - # Upload instructions are retrieved by chunk of 256 files to avoid reaching - # the payload limit. - batch_actions: List[Dict] = [] - for chunk in chunk_iterable(additions, chunk_size=UPLOAD_BATCH_MAX_NUM_FILES): - batch_actions_chunk, batch_errors_chunk = post_lfs_batch_info( - upload_infos=[op.upload_info for op in chunk], - repo_id=repo_id, - repo_type=repo_type, - revision=revision, - endpoint=endpoint, - headers=headers, - token=None, # already passed in 'headers' - ) - - # If at least 1 error, we do not retrieve information for other chunks - if batch_errors_chunk: - message = "\n".join( - [ - f"Encountered error for file with OID {err.get('oid')}: `{err.get('error', {}).get('message')}" - for err in batch_errors_chunk - ] - ) - raise ValueError(f"LFS batch endpoint returned errors:\n{message}") - - batch_actions += batch_actions_chunk - oid2addop = {add_op.upload_info.sha256.hex(): add_op for add_op in additions} - - # Step 2: ignore files that have already been uploaded + # Filter out files already present upstream filtered_actions = [] - for action in batch_actions: + for action in actions: if action.get("actions") is None: logger.debug( - f"Content of file {oid2addop[action['oid']].path_in_repo} is already" - " present upstream - skipping upload." + f"Content of file {oid2addop[action['oid']].path_in_repo} is already present upstream - skipping upload." ) else: filtered_actions.append(action) - if len(filtered_actions) == 0: - logger.debug("No LFS files to upload.") - return - - # Step 3: upload files concurrently according to these instructions + # Upload according to server-provided actions def _wrapped_lfs_upload(batch_action) -> None: try: operation = oid2addop[batch_action["oid"]] @@ -576,30 +636,30 @@ def token_refresher() -> Tuple[str, int]: progress, progress_callback = None, None try: - for i, chunk in enumerate(chunk_iterable(additions, chunk_size=UPLOAD_BATCH_MAX_NUM_FILES)): - _chunk = [op for op in chunk] - - bytes_ops = [op for op in _chunk if isinstance(op.path_or_fileobj, bytes)] - paths_ops = [op for op in _chunk if isinstance(op.path_or_fileobj, (str, Path))] - - if len(paths_ops) > 0: - upload_files( - [str(op.path_or_fileobj) for op in paths_ops], - xet_endpoint, - access_token_info, - token_refresher, - progress_callback, - repo_type, - ) - if len(bytes_ops) > 0: - upload_bytes( - [op.path_or_fileobj for op in bytes_ops], - xet_endpoint, - access_token_info, - token_refresher, - progress_callback, - repo_type, - ) + all_bytes_ops = [op for op in additions if isinstance(op.path_or_fileobj, bytes)] + all_paths_ops = [op for op in additions if isinstance(op.path_or_fileobj, (str, Path))] + + if len(all_paths_ops) > 0: + all_paths = [str(op.path_or_fileobj) for op in all_paths_ops] + upload_files( + all_paths, + xet_endpoint, + access_token_info, + token_refresher, + progress_callback, + repo_type, + ) + + if len(all_bytes_ops) > 0: + all_bytes = [op.path_or_fileobj for op in all_bytes_ops] + upload_bytes( + all_bytes, + xet_endpoint, + access_token_info, + token_refresher, + progress_callback, + repo_type, + ) finally: if progress is not None: diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index e2827a6f19..3b1e9732c9 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -15,7 +15,6 @@ from __future__ import annotations import inspect -import io import json import re import struct @@ -46,7 +45,7 @@ Union, overload, ) -from urllib.parse import quote, unquote +from urllib.parse import quote import requests from requests.exceptions import HTTPError @@ -62,8 +61,7 @@ _fetch_files_to_copy, _fetch_upload_modes, _prepare_commit_payload, - _upload_lfs_files, - _upload_xet_files, + _upload_files, _warn_on_overwriting_operations, ) from ._inference_endpoints import InferenceEndpoint, InferenceEndpointType @@ -132,13 +130,8 @@ validate_hf_hub_args, ) from .utils import tqdm as hf_tqdm -from .utils._auth import ( - _get_token_from_environment, - _get_token_from_file, - _get_token_from_google_colab, -) +from .utils._auth import _get_token_from_environment, _get_token_from_file, _get_token_from_google_colab from .utils._deprecation import _deprecate_arguments, _deprecate_method -from .utils._runtime import is_xet_available from .utils._typing import CallableT from .utils.endpoint_helpers import _is_emission_within_threshold @@ -4502,6 +4495,10 @@ def preupload_lfs_files( f"Skipped upload for {len(new_lfs_additions) - len(new_lfs_additions_to_upload)} LFS file(s) " "(ignored by gitignore file)." ) + # If no LFS files remain to upload, keep previous behavior and log explicitly + if len(new_lfs_additions_to_upload) == 0: + logger.debug("No LFS files to upload.") + return # Prepare upload parameters upload_kwargs = { "additions": new_lfs_additions_to_upload, @@ -4514,32 +4511,7 @@ def preupload_lfs_files( # PR (i.e. `revision`). "revision": revision if not create_pr else None, } - # Upload files using Xet protocol if all of the following are true: - # - xet is enabled for the repo, - # - the files are provided as str or paths objects, - # - the library is installed. - # Otherwise, default back to LFS. - xet_enabled = self.repo_info( - repo_id=repo_id, - repo_type=repo_type, - revision=unquote(revision) if revision is not None else revision, - expand="xetEnabled", - token=token, - ).xet_enabled - has_buffered_io_data = any( - isinstance(addition.path_or_fileobj, io.BufferedIOBase) for addition in new_lfs_additions_to_upload - ) - if xet_enabled and not has_buffered_io_data and is_xet_available(): - logger.debug("Uploading files using Xet Storage..") - _upload_xet_files(**upload_kwargs, create_pr=create_pr) # type: ignore [arg-type] - else: - if xet_enabled and is_xet_available(): - if has_buffered_io_data: - logger.warning( - "Uploading files as a binary IO buffer is not supported by Xet Storage. " - "Falling back to HTTP upload." - ) - _upload_lfs_files(**upload_kwargs, num_threads=num_threads) # type: ignore [arg-type] + _upload_files(**upload_kwargs, num_threads=num_threads, create_pr=create_pr) # type: ignore [arg-type] for addition in new_lfs_additions_to_upload: addition._is_uploaded = True if free_memory: diff --git a/src/huggingface_hub/lfs.py b/src/huggingface_hub/lfs.py index c2d4f36829..1697e2b479 100644 --- a/src/huggingface_hub/lfs.py +++ b/src/huggingface_hub/lfs.py @@ -108,7 +108,8 @@ def post_lfs_batch_info( revision: Optional[str] = None, endpoint: Optional[str] = None, headers: Optional[Dict[str, str]] = None, -) -> Tuple[List[dict], List[dict]]: + transfers: Optional[List[str]] = None, +) -> Tuple[List[dict], List[dict], Optional[str]]: """ Requests the LFS batch endpoint to retrieve upload instructions @@ -127,11 +128,14 @@ def post_lfs_batch_info( The git revision to upload to. headers (`dict`, *optional*): Additional headers to include in the request + transfers (`list`, *optional*): + List of transfer methods to use. Defaults to ["basic", "multipart"]. Returns: - `LfsBatchInfo`: 2-tuple: + `LfsBatchInfo`: 3-tuple: - First element is the list of upload instructions from the server - - Second element is an list of errors, if any + - Second element is a list of errors, if any + - Third element is the chosen transfer adapter if provided by the server (e.g. "basic", "multipart", "xet") Raises: [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError) @@ -146,7 +150,7 @@ def post_lfs_batch_info( batch_url = f"{endpoint}/{url_prefix}{repo_id}.git/info/lfs/objects/batch" payload: Dict = { "operation": "upload", - "transfers": ["basic", "multipart"], + "transfers": transfers if transfers is not None else ["basic", "multipart"], "objects": [ { "oid": upload.sha256.hex(), @@ -172,9 +176,13 @@ def post_lfs_batch_info( if not isinstance(objects, list): raise ValueError("Malformed response from server") + chosen_transfer = batch_info.get("transfer") + chosen_transfer = chosen_transfer if isinstance(chosen_transfer, str) else None + return ( [_validate_batch_actions(obj) for obj in objects if "error" not in obj], [_validate_batch_error(obj) for obj in objects if "error" in obj], + chosen_transfer, ) diff --git a/tests/test_xet_upload.py b/tests/test_xet_upload.py index f66a0fd850..2c418fb2ba 100644 --- a/tests/test_xet_upload.py +++ b/tests/test_xet_upload.py @@ -21,7 +21,7 @@ import pytest from huggingface_hub import HfApi, RepoUrl -from huggingface_hub._commit_api import _upload_lfs_files, _upload_xet_files +from huggingface_hub._commit_api import CommitOperationAdd, _upload_files, _upload_lfs_files, _upload_xet_files from huggingface_hub.file_download import ( _get_metadata_or_catch_error, get_hf_file_metadata, @@ -39,8 +39,8 @@ def assert_upload_mode(mode: str): if mode not in ("xet", "lfs"): raise ValueError("Mode must be either 'xet' or 'lfs'") - with patch("huggingface_hub.hf_api._upload_xet_files", wraps=_upload_xet_files) as mock_xet: - with patch("huggingface_hub.hf_api._upload_lfs_files", wraps=_upload_lfs_files) as mock_lfs: + with patch("huggingface_hub._commit_api._upload_xet_files", wraps=_upload_xet_files) as mock_xet: + with patch("huggingface_hub._commit_api._upload_lfs_files", wraps=_upload_lfs_files) as mock_lfs: yield assert mock_xet.called == (mode == "xet"), ( f"Expected {'XET' if mode == 'xet' else 'LFS'} upload to be used" @@ -65,35 +65,42 @@ def repo_url(api, repo_type: str = "model"): api.delete_repo(repo_id=repo_url.repo_id, repo_type=repo_type) +@pytest.fixture +def xet_setup(request, tmp_path): + instance = getattr(request, "instance", None) + if instance is None: + yield + return + instance.folder_path = tmp_path + # Create a regular text file + text_file = instance.folder_path / "text_file.txt" + instance.text_content = "This is a regular text file" + text_file.write_text(instance.text_content) + + # Create a binary file + instance.bin_file = instance.folder_path / "binary_file.bin" + instance.bin_content = b"0" * (1 * 1024 * 1024) + instance.bin_file.write_bytes(instance.bin_content) + + # Create nested directory structure + nested_dir = instance.folder_path / "nested" + nested_dir.mkdir() + + # Create a nested text file + nested_text_file = nested_dir / "nested_text.txt" + instance.nested_text_content = "This is a nested text file" + nested_text_file.write_text(instance.nested_text_content) + + # Create a nested binary file + nested_bin_file = nested_dir / "nested_binary.safetensors" + instance.nested_bin_content = b"1" * (1 * 1024 * 1024) + nested_bin_file.write_bytes(instance.nested_bin_content) + yield + + @requires("hf_xet") +@pytest.mark.usefixtures("xet_setup") class TestXetUpload: - @pytest.fixture(autouse=True) - def setup(self, tmp_path): - self.folder_path = tmp_path - # Create a regular text file - text_file = self.folder_path / "text_file.txt" - self.text_content = "This is a regular text file" - text_file.write_text(self.text_content) - - # Create a binary file - self.bin_file = self.folder_path / "binary_file.bin" - self.bin_content = b"0" * (1 * 1024 * 1024) - self.bin_file.write_bytes(self.bin_content) - - # Create nested directory structure - nested_dir = self.folder_path / "nested" - nested_dir.mkdir() - - # Create a nested text file - nested_text_file = nested_dir / "nested_text.txt" - self.nested_text_content = "This is a nested text file" - nested_text_file.write_text(self.nested_text_content) - - # Create a nested binary file - nested_bin_file = nested_dir / "nested_binary.safetensors" - self.nested_bin_content = b"1" * (1 * 1024 * 1024) - nested_bin_file.write_bytes(self.nested_bin_content) - def test_upload_file(self, api, tmp_path, repo_url): filename_in_repo = "binary_file.bin" repo_id = repo_url.repo_id @@ -153,7 +160,7 @@ def test_upload_file_with_byte_array(self, api, tmp_path, repo_url): def test_fallback_to_lfs_when_xet_not_available(self, api, repo_url): repo_id = repo_url.repo_id - with patch("huggingface_hub.hf_api.is_xet_available", return_value=False): + with patch("huggingface_hub._commit_api.is_xet_available", return_value=False): with assert_upload_mode("lfs"): api.upload_file( path_or_fileobj=self.bin_file, @@ -161,28 +168,65 @@ def test_fallback_to_lfs_when_xet_not_available(self, api, repo_url): repo_id=repo_id, ) - def test_upload_based_on_xet_enabled_setting(self, api, repo_url): - repo_id = repo_url.repo_id - - # Test when xet is enabled -> use Xet upload - with patch("huggingface_hub.hf_api.HfApi.repo_info") as mock_repo_info: - mock_repo_info.return_value.xet_enabled = True - with assert_upload_mode("xet"): - api.upload_file( - path_or_fileobj=self.bin_file, - path_in_repo="xet_enabled.bin", - repo_id=repo_id, - ) - - # Test when xet is disabled -> use LFS upload - with patch("huggingface_hub.hf_api.HfApi.repo_info") as mock_repo_info: - mock_repo_info.return_value.xet_enabled = False - with assert_upload_mode("lfs"): - api.upload_file( - path_or_fileobj=self.bin_file, - path_in_repo="xet_disabled.bin", - repo_id=repo_id, - ) + def test_transfers_to_xet_when_server_returns_xet(self): + addition = CommitOperationAdd(path_in_repo="xet.bin", path_or_fileobj=self.bin_file) + + def fake_batch( + upload_infos, token, repo_type, repo_id, revision=None, endpoint=None, headers=None, transfers=None + ): + action = { + "oid": upload_infos[0].sha256.hex(), + "size": upload_infos[0].size, + "actions": {"upload": {"href": "https://example.invalid", "header": {}}}, + } + return ([action], [], "xet") + + with patch("huggingface_hub._commit_api.post_lfs_batch_info", side_effect=fake_batch) as mock_batch: + with patch("huggingface_hub._commit_api._upload_lfs_files") as mock_lfs: + with patch("huggingface_hub._commit_api._upload_xet_files") as mock_xet: + _upload_files( + additions=[addition], + repo_type="model", + repo_id="dummy/user-repo", + headers={}, + endpoint="https://hub-ci.huggingface.co", + revision="main", + create_pr=False, + ) + assert mock_batch.call_count == 1 + mock_xet.assert_called_once() + mock_lfs.assert_not_called() + + def test_transfers_bytesio_renegotiates_to_lfs_when_server_returns_xet(self): + addition = CommitOperationAdd(path_in_repo="bytesio.bin", path_or_fileobj=BytesIO(self.bin_content)) + + def fake_batch( + upload_infos, token, repo_type, repo_id, revision=None, endpoint=None, headers=None, transfers=None + ): + action = { + "oid": upload_infos[0].sha256.hex(), + "size": upload_infos[0].size, + "actions": {"upload": {"href": "https://example.invalid", "header": {}}}, + } + return ([action], [], "xet") + + with patch("huggingface_hub._commit_api.post_lfs_batch_info", side_effect=fake_batch) as mock_batch: + with patch("huggingface_hub._commit_api._upload_lfs_files") as mock_lfs: + with patch("huggingface_hub._commit_api._upload_xet_files") as mock_xet: + _upload_files( + additions=[addition], + repo_type="model", + repo_id="dummy/user-repo", + headers={}, + endpoint="https://hub-ci.huggingface.co", + revision="main", + create_pr=False, + ) + + # Ensure we retried negotiation and routed to LFS, not XET + assert mock_batch.call_count == 1 + mock_xet.assert_not_called() + mock_lfs.assert_called_once() def test_upload_folder(self, api, repo_url): repo_id = repo_url.repo_id @@ -314,7 +358,8 @@ def spy_upload_files(*args, **kwargs): @requires("hf_xet") -class TestXetE2E(TestXetUpload): +@pytest.mark.usefixtures("xet_setup") +class TestXetE2E: def test_hf_xet_with_token_refresher(self, api, tmp_path, repo_url): """ Test the hf_xet.download_files function with a token refresher.