Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Unreleased
- Fix issue where ``AzureBlobFile`` did not respect ``location_mode`` parameter
from parent ``AzureBlobFileSystem`` when using SAS credentials and connecting to
new SDK clients.
- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size
- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default.
- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in.


2024.12.0
Expand Down
16 changes: 9 additions & 7 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"is_current_version",
]
_ROOT_PATH = "/"
_DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024
_DEFAULT_BLOCK_SIZE = 50 * 2**20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure to update the changelog in this PR. I'm thinking we can just use the three bullet points from this comment I made: #509 (comment) and make the wording a bit more succinct.


_SOCKET_TIMEOUT_DEFAULT = object()

Expand Down Expand Up @@ -177,8 +177,7 @@ class AzureBlobFileSystem(AsyncFileSystem):
The credentials with which to authenticate. Optional if the account URL already has a SAS token.
Can include an instance of TokenCredential class from azure.identity.aio.
blocksize: int
The block size to use for download/upload operations. Defaults to hardcoded value of
``BlockBlobService.MAX_BLOCK_SIZE``
The block size to use for download/upload operations. Defaults to 50 MiB
client_id: str
Client ID to use when authenticating using an AD Service Principal client/secret.
client_secret: str
Expand Down Expand Up @@ -1863,7 +1862,8 @@ def _open(
What mode to open the file in - defaults to "rb"

block_size: int
Size per block for multi-part downloads.
Size per block for multi-part uploads and downloads. This overrides the block_size parameter
and if not provided, defaults to the filesystem blocksize.

autocommit: bool
Whether or not to write to the destination directly
Expand All @@ -1879,6 +1879,8 @@ def _open(
is versioning aware and blob versioning is enabled on the releveant container.
"""
logger.debug(f"_open: {path}")
if block_size is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure to update the block_size docstring for this _open() method to:

  1. Include "uploads" in the wording. Look like it only refers downloads
  2. Update the wording to say that the parameter is an override and when not provided defaults to the blocksize on the file system.

block_size = self.blocksize
if not self.version_aware and version_id:
raise ValueError(
"version_id cannot be specified if the filesystem "
Expand All @@ -1901,7 +1903,7 @@ def _open(
class AzureBlobFile(AbstractBufferedFile):
"""File-like operations on Azure Blobs"""

DEFAULT_BLOCK_SIZE = 5 * 2**20
DEFAULT_BLOCK_SIZE = _DEFAULT_BLOCK_SIZE

def __init__(
self,
Expand Down Expand Up @@ -2146,11 +2148,11 @@ async def _async_initiate_upload(self, **kwargs):

_initiate_upload = sync_wrapper(_async_initiate_upload)

def _get_chunks(self, data, chunk_size=1024**3): # Keeping the chunk size as 1 GB
def _get_chunks(self, data):
start = 0
length = len(data)
while start < length:
end = min(start + chunk_size, length)
end = min(start + self.blocksize, length)
yield data[start:end]
start = end

Expand Down
95 changes: 95 additions & 0 deletions adlfs/tests/test_spec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import math
import os
import tempfile
from unittest import mock
Expand Down Expand Up @@ -2045,3 +2046,97 @@ def test_open_file_x(storage: azure.storage.blob.BlobServiceClient, tmpdir):
with fs.open("data/afile", "xb") as f:
pass
assert fs.cat_file("data/afile") == b"data"


def test_uses_block_size_for_partitioned_uploads(storage, mocker):
from azure.storage.blob.aio import BlobClient

blocksize = 5 * 2**20
fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
blocksize=blocksize,
)

content = b"1" * (blocksize * 4 + 1)
with fs.open("data/root/a/file.txt", "wb") as f:
mock_stage_block = mocker.patch.object(BlobClient, "stage_block")
mock_commit_block_list = mocker.patch.object(BlobClient, "commit_block_list")
f.write(content)
expected_blocks = math.ceil(len(content) / blocksize)
actual_blocks = mock_stage_block.call_count
assert actual_blocks == expected_blocks
block_lengths = [
call.kwargs["length"] for call in mock_stage_block.call_args_list
]
assert sum(block_lengths) == len(content)

assert len(mock_commit_block_list.call_args.kwargs["block_list"]) == expected_blocks


@pytest.mark.parametrize(
"filesystem_blocksize, file_blocksize, expected_blocksize, expected_filesystem_blocksize",
[
(None, None, 50 * 2**20, 50 * 2**20),
(7 * 2**20, None, 7 * 2**20, 7 * 2**20),
(None, 5 * 2**20, 5 * 2**20, 50 * 2**20),
(40 * 2**20, 7 * 2**20, 7 * 2**20, 40 * 2**20),
],
)
def test_block_size(
storage,
filesystem_blocksize,
file_blocksize,
expected_blocksize,
expected_filesystem_blocksize,
):
filesystem_kwargs = {
"account_name": storage.account_name,
"connection_string": CONN_STR,
}
if filesystem_blocksize is not None:
filesystem_kwargs["blocksize"] = filesystem_blocksize

file_kwargs = {
"path": "data/root/a/file.txt",
"mode": "wb",
}
if file_blocksize is not None:
file_kwargs["block_size"] = file_blocksize

fs = AzureBlobFileSystem(**filesystem_kwargs)
with fs.open(**file_kwargs) as f:
assert f.blocksize == expected_blocksize
assert fs.blocksize == expected_filesystem_blocksize


@pytest.mark.parametrize(
"file_blocksize, expected_blocksize",
[
(None, 50 * 2**20),
(8 * 2**20, 8 * 2**20),
],
)
def test_blocksize_from_blobfile(storage, file_blocksize, expected_blocksize):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)
f = AzureBlobFile(
fs,
"data/root/a/file.txt",
block_size=file_blocksize,
)
assert f.blocksize == expected_blocksize


def test_blobfile_default_blocksize(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
blocksize=20 * 2**20,
)
f = AzureBlobFile(
fs,
"data/root/a/file.txt",
)
assert f.blocksize == 50 * 2**20