Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Unreleased
- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size
- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default.
- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in.
- Introduce concurrent uploads for large `AzureBlobFileSystem.write()` calls. Maximum concurrency can be set using `max_concurrency` for `AzureBlobFileSystem`.


2024.12.0
Expand Down
56 changes: 39 additions & 17 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import logging
import os
import re
import sys
import typing
import warnings
import weakref
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from glob import has_magic
from typing import List, Optional, Tuple
from typing import Optional, Tuple
from uuid import uuid4

from azure.core.exceptions import (
Expand Down Expand Up @@ -2153,9 +2154,22 @@ def _get_chunks(self, data):
length = len(data)
while start < length:
end = min(start + self.blocksize, length)
yield data[start:end]
yield start, end
start = end

async def _stage_block(self, data, start, end, block_id, semaphore):
async with semaphore:
if self._sdk_supports_memoryview_for_writes():
# Use memoryview to avoid making copies of the bytes when we splice for partitioned uploads
data = memoryview(data)
async with self.container_client.get_blob_client(blob=self.blob) as bc:
await bc.stage_block(
block_id=block_id,
data=data[start:end],
length=end - start,
)
return block_id

async def _async_upload_chunk(self, final: bool = False, **kwargs):
"""
Write one part of a multi-block file upload
Expand All @@ -2169,23 +2183,22 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs):
"""
data = self.buffer.getvalue()
length = len(data)
block_id = self._get_block_id(self._block_list)
block_id = self._get_block_id()
commit_kw = {}
if self.mode == "xb":
commit_kw["headers"] = {"If-None-Match": "*"}
if self.mode in {"wb", "xb"}:
try:
for chunk in self._get_chunks(data):
async with self.container_client.get_blob_client(
blob=self.blob
) as bc:
await bc.stage_block(
block_id=block_id,
data=chunk,
length=len(chunk),
)
self._block_list.append(block_id)
block_id = self._get_block_id(self._block_list)
max_concurrency = self.fs.max_concurrency or 1
semaphore = asyncio.Semaphore(max_concurrency)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this effectively makes a "pool", whereas fsspec upstream has a run-coroutines-in-chunks ability. The latter is less efficient but easier to reason about. It may be worthwhile upstreaming this at some point. See a similar idea in fsspec/filesystem_spec#1908 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstreaming makes sense to me. We originally did not look upstream in fsspec core to see if there was anything to leverage, but I think generalizing this semaphore approach upstream would definitely be useful to enable concurrency in more places and not need to reduplicate the logic.

tasks = []
for start, end in self._get_chunks(data):
tasks.append(
self._stage_block(data, start, end, block_id, semaphore)
)
block_id = self._get_block_id()
ids = await asyncio.gather(*tasks)
self._block_list.extend(ids)

if final:
block_list = [BlobBlock(_id) for _id in self._block_list]
Expand All @@ -2202,7 +2215,7 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs):
# which is throws an InvalidHeader error from Azure, so instead
# of staging a block, we directly upload the empty blob
# This isn't actually tested, since Azureite behaves differently.
if block_id == self._get_block_id([]) and length == 0 and final:
if not self._block_list and length == 0 and final:
async with self.container_client.get_blob_client(
blob=self.blob
) as bc:
Expand Down Expand Up @@ -2241,8 +2254,8 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs):
)

@staticmethod
def _get_block_id(block_list: List[str]) -> str:
return uuid4().hex if block_list else "0" * 32
def _get_block_id() -> str:
return uuid4().hex

_upload_chunk = sync_wrapper(_async_upload_chunk)

Expand All @@ -2263,3 +2276,12 @@ def __setstate__(self, state):
self.__dict__.update(state)
self.loop = self._get_loop()
self.container_client = self._get_container_client()

def _sdk_supports_memoryview_for_writes(self) -> bool:
# The SDK validates iterable bytes objects passed to its HTTP request layer
# expose an __iter__() method. However, memoryview objects did not expose an
# __iter__() method till Python 3.10.
#
# We still want to leverage memorviews when we can to avoid unnecessary copies. So
# we check the Python version to determine if we can use memoryviews for writes.
return sys.version_info >= (3, 10)
35 changes: 35 additions & 0 deletions adlfs/tests/test_spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import math
import os
import random
import string
import tempfile
from unittest import mock

Expand Down Expand Up @@ -2140,3 +2142,36 @@ def test_blobfile_default_blocksize(storage):
"data/root/a/file.txt",
)
assert f.blocksize == 50 * 2**20


@pytest.mark.parametrize(
"max_concurrency, blob_size, blocksize",
[
(None, 200 * 2**20, 5 * 2**20),
(1, 51 * 2**20, 50 * 2**20),
(4, 200 * 2**20, 50 * 2**20),
(4, 49 * 2**20, 50 * 2**20),
(4, 200 * 2**20, 5 * 2**20),
],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some good initial cases. I'm not sure how long these tests currently take but it would be interesting to add a case that sets the blocksize to 5MiB and a total blob size of 200 MiB and then have cases that:

  1. Use the default max_concurrency (set it to None) to make sure the default settings work out of the box
  2. Use a concurrency of 4 to test scenario where concurrency gets saturated and has to wait

I also like setting the block size lower for some of these cases to better stress the system to help catch any concurrency issues that could arise in the future.

)
def test_write_max_concurrency(storage, max_concurrency, blob_size, blocksize):
fs = AzureBlobFileSystem(
account_name=storage.account_name,
connection_string=CONN_STR,
blocksize=blocksize,
max_concurrency=max_concurrency,
)
data = os.urandom(blob_size)
container_name = "".join(random.choices(string.ascii_lowercase, k=10))
fs.mkdir(container_name)
path = f"{container_name}/blob.txt"

with fs.open(path, "wb") as f:
f.write(data)

assert fs.exists(path)
assert fs.size(path) == blob_size

with fs.open(path, "rb") as f:
assert f.read() == data
fs.rm(container_name, recursive=True)