Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Unreleased
- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size
- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default.
- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in.
- Added concurrency for `_async_upload_chunk`. Can be set using `max_concurrency` for `AzureBlobFileSystem`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the changelog entry, let's avoid mentioning private methods and instead make it worded in the perspective of someone who is consuming adlfs. For example:

Introduce concurrent uploads for large `AzureBlobFileSystem.write()` calls. Maximum concurrency can be set using  `max_concurrency` for `AzureBlobFileSystem`.



2024.12.0
Expand Down
36 changes: 25 additions & 11 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,15 @@ def _get_chunks(self, data):
yield data[start:end]
start = end

async def _upload(self, chunk, block_id, semaphore):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this method to _stage_block to be more self-descriptive of what the underlying logic is here.

async with semaphore:
async with self.container_client.get_blob_client(blob=self.blob) as bc:
await bc.stage_block(
block_id=block_id,
data=chunk,
length=len(chunk),
)

async def _async_upload_chunk(self, final: bool = False, **kwargs):
"""
Write one part of a multi-block file upload
Expand All @@ -2175,17 +2184,22 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs):
commit_kw["headers"] = {"If-None-Match": "*"}
if self.mode in {"wb", "xb"}:
try:
for chunk in self._get_chunks(data):
async with self.container_client.get_blob_client(
blob=self.blob
) as bc:
await bc.stage_block(
block_id=block_id,
data=chunk,
length=len(chunk),
)
self._block_list.append(block_id)
block_id = self._get_block_id(self._block_list)
max_concurrency = self.fs.max_concurrency
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make sure to check whether max_concurrency is None here. It looks like it can end up being resolved to None based on the logic in the __init__. We should be able to handle it doing something like this to default to 1 when it is not set.

semaphore = asyncio.Semaphore(max_concurrency)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this effectively makes a "pool", whereas fsspec upstream has a run-coroutines-in-chunks ability. The latter is less efficient but easier to reason about. It may be worthwhile upstreaming this at some point. See a similar idea in fsspec/filesystem_spec#1908 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstreaming makes sense to me. We originally did not look upstream in fsspec core to see if there was anything to leverage, but I think generalizing this semaphore approach upstream would definitely be useful to enable concurrency in more places and not need to reduplicate the logic.

tasks = []
block_ids = self._block_list or []
start_idx = len(block_ids)
chunks = list(self._get_chunks(data))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still hesitant in exhausting the entire _get_chunks() iterator here as we'd be pulling in the entire buffer again but as chunks in this list instead of lazily iterating through it. Could we instead:

  1. Make sure we are using memoryviews on the data so that the chunks are not necessarily copied on splicing.
  2. Refactor _get_chunks to yield start and end boundaries instead of byte chunks similar to how s3fs does it. And we'd then update our internal _stage_block method to splice from the data buffer once passed its semaphore.

I like this approach because the memoryview will help reduce any unnecessary copies as part of adlfs code and the semaphore will protect us from loading more chunks than there are tasks available to be running concurrently.

for _ in range(len(chunks)):
block_ids.append(block_id)
block_id = self._get_block_id(block_ids)

if chunks:
self._block_list = block_ids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding the block_id upfront before the coroutines run, could we instead just return the block_id from _stage_block() and just extend the block_list from the return result from asyncio.gather()? I noticed this pattern in s3fs when organizing Etags and think it can help here too.

I mainly suggest this because:

  1. It simplifies the logic to the point that I think we can actually just follow reuse the existing for loop prior to this change.
  2. I adds the property that any block ids that are in the block_list have been successfully uploaded. It's hard to tell if a downstream adlfs user will run into this but if they are catching exceptions as part of write() and still commit the block list, the commit could fail because they will be non existent blocks in that list.

for chunk, block_id in zip(chunks, block_ids[start_idx:]):
tasks.append(self._upload(chunk, block_id, semaphore))

await asyncio.gather(*tasks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that would be interesting to check (maybe tricky to reproduce) is what does the error look like if one of the stage blocks fail, resulting in cancellations of in-flight stage blocks. Supposedly a CancellationError will be raised in in-progress coroutines so I'm curious how/if it gets propagated correctly (e.g. we do not want cancellation errors to show up to mask the initial underlying error).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like as long as return_exceptions for asyncio.gather() is false (which is the default), the actual error shows up. Otherwise, the error just gets collected as part of the results list and then eventually gives the invalid block list error when it tries to commit the block list.


if final:
block_list = [BlobBlock(_id) for _id in self._block_list]
Expand Down
28 changes: 28 additions & 0 deletions adlfs/tests/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2140,3 +2140,31 @@ def test_blobfile_default_blocksize(storage):
"data/root/a/file.txt",
)
assert f.blocksize == 50 * 2**20


@pytest.mark.parametrize(
"max_concurrency, blob_size",
[
(1, 51 * 2**20),
(4, 200 * 2**20),
(4, 49 * 2**20),
],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some good initial cases. I'm not sure how long these tests currently take but it would be interesting to add a case that sets the blocksize to 5MiB and a total blob size of 200 MiB and then have cases that:

  1. Use the default max_concurrency (set it to None) to make sure the default settings work out of the box
  2. Use a concurrency of 4 to test scenario where concurrency gets saturated and has to wait

I also like setting the block size lower for some of these cases to better stress the system to help catch any concurrency issues that could arise in the future.

)
def test_max_concurrency(storage, max_concurrency, blob_size):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to test_write_max_concurrency to better indicate we are just testing concurrency for writes.

fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
max_concurrency=max_concurrency,
)
data = os.urandom(blob_size)
fs.mkdir("large-file-container")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be worth using a randomized name for the container so that we are uploading a new blob each time and guarantee there is not state left over from a previous parameterized case. We should also make sure to delete the container at the end of the test to avoid having leftover state.

Another thing we should consider eventually doing (but might require a decent amount of scaffolding) is introducing a pytest fixture that automatically handles the creation and cleanup of containers. It seems like there is a fair amount of cases that could leverage this and probably would make sense to do as a follow up PR.

path = "large-file-container/blob.txt"

with fs.open(path, "wb") as f:
f.write(data)

assert fs.exists(path)
assert fs.size(path) == blob_size

with fs.open(path, "rb") as f:
assert f.read() == data