diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e6e2ba2..e3cf86b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Unreleased - The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size - Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default. - `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in. +- Introduce concurrent uploads for large `AzureBlobFileSystem.write()` calls. Maximum concurrency can be set using `max_concurrency` for `AzureBlobFileSystem`. 2024.12.0 diff --git a/adlfs/spec.py b/adlfs/spec.py index 90916e3c..bfc13bda 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -9,13 +9,14 @@ import logging import os import re +import sys import typing import warnings import weakref from collections import defaultdict from datetime import datetime, timedelta, timezone from glob import has_magic -from typing import List, Optional, Tuple +from typing import Optional, Tuple from uuid import uuid4 from azure.core.exceptions import ( @@ -2153,9 +2154,22 @@ def _get_chunks(self, data): length = len(data) while start < length: end = min(start + self.blocksize, length) - yield data[start:end] + yield start, end start = end + async def _stage_block(self, data, start, end, block_id, semaphore): + async with semaphore: + if self._sdk_supports_memoryview_for_writes(): + # Use memoryview to avoid making copies of the bytes when we splice for partitioned uploads + data = memoryview(data) + async with self.container_client.get_blob_client(blob=self.blob) as bc: + await bc.stage_block( + block_id=block_id, + data=data[start:end], + length=end - start, + ) + return block_id + async def _async_upload_chunk(self, final: bool = False, **kwargs): """ Write one part of a multi-block file upload @@ -2169,23 +2183,22 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): """ data = self.buffer.getvalue() length = len(data) - block_id = self._get_block_id(self._block_list) + block_id = self._get_block_id() commit_kw = {} if self.mode == "xb": commit_kw["headers"] = {"If-None-Match": "*"} if self.mode in {"wb", "xb"}: try: - for chunk in self._get_chunks(data): - async with self.container_client.get_blob_client( - blob=self.blob - ) as bc: - await bc.stage_block( - block_id=block_id, - data=chunk, - length=len(chunk), - ) - self._block_list.append(block_id) - block_id = self._get_block_id(self._block_list) + max_concurrency = self.fs.max_concurrency or 1 + semaphore = asyncio.Semaphore(max_concurrency) + tasks = [] + for start, end in self._get_chunks(data): + tasks.append( + self._stage_block(data, start, end, block_id, semaphore) + ) + block_id = self._get_block_id() + ids = await asyncio.gather(*tasks) + self._block_list.extend(ids) if final: block_list = [BlobBlock(_id) for _id in self._block_list] @@ -2202,7 +2215,7 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): # which is throws an InvalidHeader error from Azure, so instead # of staging a block, we directly upload the empty blob # This isn't actually tested, since Azureite behaves differently. - if block_id == self._get_block_id([]) and length == 0 and final: + if not self._block_list and length == 0 and final: async with self.container_client.get_blob_client( blob=self.blob ) as bc: @@ -2241,8 +2254,8 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): ) @staticmethod - def _get_block_id(block_list: List[str]) -> str: - return uuid4().hex if block_list else "0" * 32 + def _get_block_id() -> str: + return uuid4().hex _upload_chunk = sync_wrapper(_async_upload_chunk) @@ -2263,3 +2276,12 @@ def __setstate__(self, state): self.__dict__.update(state) self.loop = self._get_loop() self.container_client = self._get_container_client() + + def _sdk_supports_memoryview_for_writes(self) -> bool: + # The SDK validates iterable bytes objects passed to its HTTP request layer + # expose an __iter__() method. However, memoryview objects did not expose an + # __iter__() method till Python 3.10. + # + # We still want to leverage memorviews when we can to avoid unnecessary copies. So + # we check the Python version to determine if we can use memoryviews for writes. + return sys.version_info >= (3, 10) diff --git a/adlfs/tests/test_spec.py b/adlfs/tests/test_spec.py index 91f5d0df..0639c2d0 100644 --- a/adlfs/tests/test_spec.py +++ b/adlfs/tests/test_spec.py @@ -1,6 +1,8 @@ import datetime import math import os +import random +import string import tempfile from unittest import mock @@ -2140,3 +2142,36 @@ def test_blobfile_default_blocksize(storage): "data/root/a/file.txt", ) assert f.blocksize == 50 * 2**20 + + +@pytest.mark.parametrize( + "max_concurrency, blob_size, blocksize", + [ + (None, 200 * 2**20, 5 * 2**20), + (1, 51 * 2**20, 50 * 2**20), + (4, 200 * 2**20, 50 * 2**20), + (4, 49 * 2**20, 50 * 2**20), + (4, 200 * 2**20, 5 * 2**20), + ], +) +def test_write_max_concurrency(storage, max_concurrency, blob_size, blocksize): + fs = AzureBlobFileSystem( + account_name=storage.account_name, + connection_string=CONN_STR, + blocksize=blocksize, + max_concurrency=max_concurrency, + ) + data = os.urandom(blob_size) + container_name = "".join(random.choices(string.ascii_lowercase, k=10)) + fs.mkdir(container_name) + path = f"{container_name}/blob.txt" + + with fs.open(path, "wb") as f: + f.write(data) + + assert fs.exists(path) + assert fs.size(path) == blob_size + + with fs.open(path, "rb") as f: + assert f.read() == data + fs.rm(container_name, recursive=True)