-
Notifications
You must be signed in to change notification settings - Fork 110
Faster file-like writes #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
ab37ff9
21fb656
df76154
208b3e7
2d277c3
d26d6c3
6686c93
5efa176
96573d5
7023bf4
7a05f94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,14 @@ | |
import logging | ||
import os | ||
import re | ||
import sys | ||
import typing | ||
import warnings | ||
import weakref | ||
from collections import defaultdict | ||
from datetime import datetime, timedelta, timezone | ||
from glob import has_magic | ||
from typing import List, Optional, Tuple | ||
from typing import Optional, Tuple | ||
from uuid import uuid4 | ||
|
||
from azure.core.exceptions import ( | ||
|
@@ -2153,9 +2154,22 @@ def _get_chunks(self, data): | |
length = len(data) | ||
while start < length: | ||
end = min(start + self.blocksize, length) | ||
yield data[start:end] | ||
yield start, end | ||
start = end | ||
|
||
async def _stage_block(self, data, start, end, block_id, semaphore): | ||
if self._sdk_supports_memoryview_for_writes(): | ||
# Use memoryview to avoid making copies of the bytes when we splice for partitioned uploads | ||
data = memoryview(data) | ||
async with semaphore: | ||
async with self.container_client.get_blob_client(blob=self.blob) as bc: | ||
await bc.stage_block( | ||
block_id=block_id, | ||
data=data[start:end], | ||
length=end - start, | ||
) | ||
return block_id | ||
|
||
async def _async_upload_chunk(self, final: bool = False, **kwargs): | ||
""" | ||
Write one part of a multi-block file upload | ||
|
@@ -2169,23 +2183,22 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): | |
""" | ||
data = self.buffer.getvalue() | ||
length = len(data) | ||
block_id = self._get_block_id(self._block_list) | ||
block_id = self._get_block_id() | ||
commit_kw = {} | ||
if self.mode == "xb": | ||
commit_kw["headers"] = {"If-None-Match": "*"} | ||
if self.mode in {"wb", "xb"}: | ||
try: | ||
for chunk in self._get_chunks(data): | ||
async with self.container_client.get_blob_client( | ||
blob=self.blob | ||
) as bc: | ||
await bc.stage_block( | ||
block_id=block_id, | ||
data=chunk, | ||
length=len(chunk), | ||
) | ||
self._block_list.append(block_id) | ||
block_id = self._get_block_id(self._block_list) | ||
max_concurrency = self.fs.max_concurrency or 1 | ||
semaphore = asyncio.Semaphore(max_concurrency) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: this effectively makes a "pool", whereas fsspec upstream has a run-coroutines-in-chunks ability. The latter is less efficient but easier to reason about. It may be worthwhile upstreaming this at some point. See a similar idea in fsspec/filesystem_spec#1908 . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ( the chunks version: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py#L204 ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Upstreaming makes sense to me. We originally did not look upstream in fsspec core to see if there was anything to leverage, but I think generalizing this semaphore approach upstream would definitely be useful to enable concurrency in more places and not need to reduplicate the logic. |
||
tasks = [] | ||
for start, end in self._get_chunks(data): | ||
tasks.append( | ||
self._stage_block(data, start, end, block_id, semaphore) | ||
) | ||
block_id = self._get_block_id() | ||
ids = await asyncio.gather(*tasks) | ||
self._block_list.extend(ids) | ||
|
||
if final: | ||
block_list = [BlobBlock(_id) for _id in self._block_list] | ||
|
@@ -2202,7 +2215,7 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): | |
# which is throws an InvalidHeader error from Azure, so instead | ||
# of staging a block, we directly upload the empty blob | ||
# This isn't actually tested, since Azureite behaves differently. | ||
if block_id == self._get_block_id([]) and length == 0 and final: | ||
if len(self._block_list) == 0 and length == 0 and final: | ||
|
||
async with self.container_client.get_blob_client( | ||
blob=self.blob | ||
) as bc: | ||
|
@@ -2241,8 +2254,8 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): | |
) | ||
|
||
@staticmethod | ||
def _get_block_id(block_list: List[str]) -> str: | ||
return uuid4().hex if block_list else "0" * 32 | ||
def _get_block_id() -> str: | ||
return uuid4().hex | ||
|
||
_upload_chunk = sync_wrapper(_async_upload_chunk) | ||
|
||
|
@@ -2263,3 +2276,12 @@ def __setstate__(self, state): | |
self.__dict__.update(state) | ||
self.loop = self._get_loop() | ||
self.container_client = self._get_container_client() | ||
|
||
def _sdk_supports_memoryview_for_writes(self) -> bool: | ||
# The SDK validates iterable bytes objects passed to its HTTP request layer | ||
# expose an __iter__() method. However, memoryview objects did not expose an | ||
# __iter__() method till Python 3.10. | ||
# | ||
# We still want to leverage memorviews when we can to avoid unnecessary copies. So | ||
# we check the Python version to determine if we can use memoryviews for writes. | ||
return sys.version_info >= (3, 10) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import datetime | ||
import math | ||
import os | ||
import random | ||
import string | ||
import tempfile | ||
from unittest import mock | ||
|
||
|
@@ -2140,3 +2142,36 @@ def test_blobfile_default_blocksize(storage): | |
"data/root/a/file.txt", | ||
) | ||
assert f.blocksize == 50 * 2**20 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"max_concurrency, blob_size, blocksize", | ||
[ | ||
(None, 200 * 2**20, 5 * 2**20), | ||
(1, 51 * 2**20, 50 * 2**20), | ||
(4, 200 * 2**20, 50 * 2**20), | ||
(4, 49 * 2**20, 50 * 2**20), | ||
(4, 200 * 2**20, 5 * 2**20), | ||
], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are some good initial cases. I'm not sure how long these tests currently take but it would be interesting to add a case that sets the blocksize to 5MiB and a total blob size of 200 MiB and then have cases that:
I also like setting the block size lower for some of these cases to better stress the system to help catch any concurrency issues that could arise in the future. |
||
) | ||
def test_write_max_concurrency(storage, max_concurrency, blob_size, blocksize): | ||
fs = AzureBlobFileSystem( | ||
account_name=storage.account_name, | ||
connection_string=CONN_STR, | ||
blocksize=blocksize, | ||
max_concurrency=max_concurrency, | ||
) | ||
data = os.urandom(blob_size) | ||
container_name = "".join(random.choices(string.ascii_lowercase, k=10)) | ||
fs.mkdir(container_name) | ||
path = f"{container_name}/blob.txt" | ||
|
||
with fs.open(path, "wb") as f: | ||
f.write(data) | ||
|
||
assert fs.exists(path) | ||
assert fs.size(path) == blob_size | ||
|
||
with fs.open(path, "rb") as f: | ||
assert f.read() == data | ||
fs.rm(container_name, recursive=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wrap the data in a memoryview after we've entered the semaphore context so we upload the property that no logic is run until there is availability in the semaphore as we are effectively treating this method as a worker now.