Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Unreleased
- Fix issue where ``AzureBlobFile`` did not respect ``location_mode`` parameter
from parent ``AzureBlobFileSystem`` when using SAS credentials and connecting to
new SDK clients.
- The block size is now used for uploads. Previously, it was always 1 GiB irrespective of the block size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's slightly tweak this phrasing to:

- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size

Mainly technically was used previous in uploads but it was really only used for when the in-memory buffer was flushed.

- Updated default block size to be 50 MiB
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a sentence to this point to say how to revert back to the previous default. So something like:

Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening an `AzureBlobFile` to revert back to 5 MiB default.

Mainly, this will help anyone trying to understand how to go back to the previous default find it more easily than needing to dig through this PR.

- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in.


2024.12.0
Expand Down
16 changes: 9 additions & 7 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"is_current_version",
]
_ROOT_PATH = "/"
_DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024
_DEFAULT_BLOCK_SIZE = 50 * 2**20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure to update the changelog in this PR. I'm thinking we can just use the three bullet points from this comment I made: #509 (comment) and make the wording a bit more succinct.


_SOCKET_TIMEOUT_DEFAULT = object()

Expand Down Expand Up @@ -177,8 +177,7 @@ class AzureBlobFileSystem(AsyncFileSystem):
The credentials with which to authenticate. Optional if the account URL already has a SAS token.
Can include an instance of TokenCredential class from azure.identity.aio.
blocksize: int
The block size to use for download/upload operations. Defaults to hardcoded value of
``BlockBlobService.MAX_BLOCK_SIZE``
The block size to use for download/upload operations. Defaults to 50 MiB
client_id: str
Client ID to use when authenticating using an AD Service Principal client/secret.
client_secret: str
Expand Down Expand Up @@ -1863,7 +1862,8 @@ def _open(
What mode to open the file in - defaults to "rb"

block_size: int
Size per block for multi-part downloads.
Size per block for multi-part uploads and downloads. This overrides the block_size parameter
and if not provided, defaults to the filesystem blocksize.

autocommit: bool
Whether or not to write to the destination directly
Expand All @@ -1879,6 +1879,8 @@ def _open(
is versioning aware and blob versioning is enabled on the releveant container.
"""
logger.debug(f"_open: {path}")
if block_size is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure to update the block_size docstring for this _open() method to:

  1. Include "uploads" in the wording. Look like it only refers downloads
  2. Update the wording to say that the parameter is an override and when not provided defaults to the blocksize on the file system.

block_size = self.blocksize
if not self.version_aware and version_id:
raise ValueError(
"version_id cannot be specified if the filesystem "
Expand All @@ -1901,7 +1903,7 @@ def _open(
class AzureBlobFile(AbstractBufferedFile):
"""File-like operations on Azure Blobs"""

DEFAULT_BLOCK_SIZE = 5 * 2**20
DEFAULT_BLOCK_SIZE = _DEFAULT_BLOCK_SIZE

def __init__(
self,
Expand Down Expand Up @@ -2146,11 +2148,11 @@ async def _async_initiate_upload(self, **kwargs):

_initiate_upload = sync_wrapper(_async_initiate_upload)

def _get_chunks(self, data, chunk_size=1024**3): # Keeping the chunk size as 1 GB
def _get_chunks(self, data):
start = 0
length = len(data)
while start < length:
end = min(start + chunk_size, length)
end = min(start + self.blocksize, length)
yield data[start:end]
start = end

Expand Down
80 changes: 80 additions & 0 deletions adlfs/tests/test_spec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import math
import os
import tempfile
from unittest import mock
Expand Down Expand Up @@ -2045,3 +2046,82 @@ def test_open_file_x(storage: azure.storage.blob.BlobServiceClient, tmpdir):
with fs.open("data/afile", "xb") as f:
pass
assert fs.cat_file("data/afile") == b"data"


def test_number_of_blocks(storage, mocker):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this test name to be test_uses_block_size_for_partitioned_uploads in order to be a bit more self-descriptive of what we are trying to test.

from azure.storage.blob.aio import BlobClient

blocksize = 5 * 2**20
fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
blocksize=blocksize,
)

content = b"1" * (blocksize * 4 + 1)
with fs.open("data/root/a/file.txt", "wb") as f:
mock_stage_block = mocker.patch.object(BlobClient, "stage_block")
mock_commit_block_list = mocker.patch.object(BlobClient, "commit_block_list")
f.write(content)
expected_blocks = math.ceil(len(content) / blocksize)
actual_blocks = mock_stage_block.call_count
assert actual_blocks == expected_blocks
block_lengths = [
call.kwargs["length"] for call in mock_stage_block.call_args_list
]
assert sum(block_lengths) == len(content)

assert len(mock_commit_block_list.call_args.kwargs["block_list"]) == expected_blocks


@pytest.mark.parametrize(
"filesystem_blocksize, file_blocksize, expected_blocksize",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also probably worth adding an expected_filesystem_blocksize to make the blocksize are expected at the filesystem level as well in this test case.

[
(None, None, 50 * 2**20),
(50 * 2**20, None, 50 * 2**20),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, it might make sense to make it a different value like that 7 * 2 ** 20 value to disambiguate from the default value.

(None, 5 * 2**20, 5 * 2**20),
(50 * 2**20, 7 * 2**20, 7 * 2**20),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, instead of using 50 * 2 ** 20 let's use a non-default value like 40 * 2 ** 20 to disambiguate from any defaults.

],
)
def test_block_size(storage, filesystem_blocksize, file_blocksize, expected_blocksize):
fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
blocksize=filesystem_blocksize,
)

with fs.open("data/root/a/file.txt", "wb", block_size=file_blocksize) as f:
assert f.blocksize == expected_blocksize


@pytest.mark.parametrize(
"file_blocksize, expected_blocksize",
[
(None, 50 * 2**20),
(8 * 2**20, 8 * 2**20),
],
)
def test_blocksize_from_blobfile(storage, file_blocksize, expected_blocksize):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)
f = AzureBlobFile(
fs,
"data/root/a/file.txt",
block_size=file_blocksize,
)
assert f.blocksize == expected_blocksize
assert fs.blocksize == 50 * 2**20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably remove this assertion assuming we add the expected_filesystem_blocksize parameter in the other parameterized test case.



def test_override_blocksize(storage):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so for this test case I was thinking, we'd:

  1. Set the blocksize on the AzureBlobFileSystem
  2. Assert that the AzureBlobFile without setting block_size does not inherit the the blocksize from 1. So after instantiation, we would just assert it is 50 * 2 ** 20, which should be its default.
  3. Update the test name to better reflect that we are testing this code path does not inherit from the file system.

fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)
f = AzureBlobFile(
fs,
"data/root/a/file.txt",
)
assert f.blocksize == 50 * 2**20
f.blocksize = 2 * 2**20
assert f.blocksize == 2 * 2**20