diff --git a/CHANGELOG.md b/CHANGELOG.md index 38febc6e..5e6e2ba2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ Unreleased - Fix issue where ``AzureBlobFile`` did not respect ``location_mode`` parameter from parent ``AzureBlobFileSystem`` when using SAS credentials and connecting to new SDK clients. +- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size +- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default. +- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in. 2024.12.0 diff --git a/adlfs/spec.py b/adlfs/spec.py index 4c443db1..90916e3c 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -69,7 +69,7 @@ "is_current_version", ] _ROOT_PATH = "/" -_DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024 +_DEFAULT_BLOCK_SIZE = 50 * 2**20 _SOCKET_TIMEOUT_DEFAULT = object() @@ -177,8 +177,7 @@ class AzureBlobFileSystem(AsyncFileSystem): The credentials with which to authenticate. Optional if the account URL already has a SAS token. Can include an instance of TokenCredential class from azure.identity.aio. blocksize: int - The block size to use for download/upload operations. Defaults to hardcoded value of - ``BlockBlobService.MAX_BLOCK_SIZE`` + The block size to use for download/upload operations. Defaults to 50 MiB client_id: str Client ID to use when authenticating using an AD Service Principal client/secret. client_secret: str @@ -1863,7 +1862,8 @@ def _open( What mode to open the file in - defaults to "rb" block_size: int - Size per block for multi-part downloads. + Size per block for multi-part uploads and downloads. This overrides the block_size parameter + and if not provided, defaults to the filesystem blocksize. autocommit: bool Whether or not to write to the destination directly @@ -1879,6 +1879,8 @@ def _open( is versioning aware and blob versioning is enabled on the releveant container. """ logger.debug(f"_open: {path}") + if block_size is None: + block_size = self.blocksize if not self.version_aware and version_id: raise ValueError( "version_id cannot be specified if the filesystem " @@ -1901,7 +1903,7 @@ def _open( class AzureBlobFile(AbstractBufferedFile): """File-like operations on Azure Blobs""" - DEFAULT_BLOCK_SIZE = 5 * 2**20 + DEFAULT_BLOCK_SIZE = _DEFAULT_BLOCK_SIZE def __init__( self, @@ -2146,11 +2148,11 @@ async def _async_initiate_upload(self, **kwargs): _initiate_upload = sync_wrapper(_async_initiate_upload) - def _get_chunks(self, data, chunk_size=1024**3): # Keeping the chunk size as 1 GB + def _get_chunks(self, data): start = 0 length = len(data) while start < length: - end = min(start + chunk_size, length) + end = min(start + self.blocksize, length) yield data[start:end] start = end diff --git a/adlfs/tests/test_spec.py b/adlfs/tests/test_spec.py index 7b6dd548..91f5d0df 100644 --- a/adlfs/tests/test_spec.py +++ b/adlfs/tests/test_spec.py @@ -1,4 +1,5 @@ import datetime +import math import os import tempfile from unittest import mock @@ -2045,3 +2046,97 @@ def test_open_file_x(storage: azure.storage.blob.BlobServiceClient, tmpdir): with fs.open("data/afile", "xb") as f: pass assert fs.cat_file("data/afile") == b"data" + + +def test_uses_block_size_for_partitioned_uploads(storage, mocker): + from azure.storage.blob.aio import BlobClient + + blocksize = 5 * 2**20 + fs = AzureBlobFileSystem( + account_name=storage.account_name, + connection_string=CONN_STR, + blocksize=blocksize, + ) + + content = b"1" * (blocksize * 4 + 1) + with fs.open("data/root/a/file.txt", "wb") as f: + mock_stage_block = mocker.patch.object(BlobClient, "stage_block") + mock_commit_block_list = mocker.patch.object(BlobClient, "commit_block_list") + f.write(content) + expected_blocks = math.ceil(len(content) / blocksize) + actual_blocks = mock_stage_block.call_count + assert actual_blocks == expected_blocks + block_lengths = [ + call.kwargs["length"] for call in mock_stage_block.call_args_list + ] + assert sum(block_lengths) == len(content) + + assert len(mock_commit_block_list.call_args.kwargs["block_list"]) == expected_blocks + + +@pytest.mark.parametrize( + "filesystem_blocksize, file_blocksize, expected_blocksize, expected_filesystem_blocksize", + [ + (None, None, 50 * 2**20, 50 * 2**20), + (7 * 2**20, None, 7 * 2**20, 7 * 2**20), + (None, 5 * 2**20, 5 * 2**20, 50 * 2**20), + (40 * 2**20, 7 * 2**20, 7 * 2**20, 40 * 2**20), + ], +) +def test_block_size( + storage, + filesystem_blocksize, + file_blocksize, + expected_blocksize, + expected_filesystem_blocksize, +): + filesystem_kwargs = { + "account_name": storage.account_name, + "connection_string": CONN_STR, + } + if filesystem_blocksize is not None: + filesystem_kwargs["blocksize"] = filesystem_blocksize + + file_kwargs = { + "path": "data/root/a/file.txt", + "mode": "wb", + } + if file_blocksize is not None: + file_kwargs["block_size"] = file_blocksize + + fs = AzureBlobFileSystem(**filesystem_kwargs) + with fs.open(**file_kwargs) as f: + assert f.blocksize == expected_blocksize + assert fs.blocksize == expected_filesystem_blocksize + + +@pytest.mark.parametrize( + "file_blocksize, expected_blocksize", + [ + (None, 50 * 2**20), + (8 * 2**20, 8 * 2**20), + ], +) +def test_blocksize_from_blobfile(storage, file_blocksize, expected_blocksize): + fs = AzureBlobFileSystem( + account_name=storage.account_name, connection_string=CONN_STR + ) + f = AzureBlobFile( + fs, + "data/root/a/file.txt", + block_size=file_blocksize, + ) + assert f.blocksize == expected_blocksize + + +def test_blobfile_default_blocksize(storage): + fs = AzureBlobFileSystem( + account_name=storage.account_name, + connection_string=CONN_STR, + blocksize=20 * 2**20, + ) + f = AzureBlobFile( + fs, + "data/root/a/file.txt", + ) + assert f.blocksize == 50 * 2**20