Skip to content

Commit c105d6f

Browse files
[Storage] Optimize single-shot Block Blob upload (#34676)
1 parent 581fac8 commit c105d6f

File tree

12 files changed

+391
-60
lines changed

12 files changed

+391
-60
lines changed

sdk/storage/azure-storage-blob/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Python 3.12.
1515
using async OAuth credentials.
1616
- Fixed an typing issue which incorrectly typed the `readinto` API. The correct input type is `IO[bytes]`.
1717
- Fixed a typo in the initialization of `completion_time` for the `CopyProperties` model.
18+
- Fixed a couple of issues with `upload_blob` when using Iterators/Generators as the data input.
1819

1920
### Other Changes
2021
- Passing `prefix` to the following `ContainerClient` APIs now raises a `ValueError`:

sdk/storage/azure-storage-blob/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "python",
44
"TagPrefix": "python/storage/azure-storage-blob",
5-
"Tag": "python/storage/azure-storage-blob_1b66da54e8"
5+
"Tag": "python/storage/azure-storage-blob_20d52f0450"
66
}

sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,6 @@ def _upload_blob_options( # pylint:disable=too-many-statements
449449

450450
if blob_type == BlobType.BlockBlob:
451451
kwargs['client'] = self._client.block_blob
452-
kwargs['data'] = data
453452
elif blob_type == BlobType.PageBlob:
454453
if self.encryption_version == '2.0' and (self.require_encryption or self.key_encryption_key is not None):
455454
raise ValueError("Encryption version 2.0 does not currently support page blobs.")
@@ -615,7 +614,7 @@ def upload_blob_from_url(self, source_url, **kwargs):
615614

616615
@distributed_trace
617616
def upload_blob(
618-
self, data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]],
617+
self, data: Union[bytes, str, Iterable[AnyStr], IO[bytes]],
619618
blob_type: Union[str, BlobType] = BlobType.BlockBlob,
620619
length: Optional[int] = None,
621620
metadata: Optional[Dict[str, str]] = None,

sdk/storage/azure-storage-blob/azure/storage/blob/_encryption.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
dumps,
1616
loads,
1717
)
18-
from typing import Any, BinaryIO, Callable, Dict, Optional, Tuple, TYPE_CHECKING
18+
from typing import Any, Callable, Dict, IO, Optional, Tuple, TYPE_CHECKING
1919
from typing import OrderedDict as TypedOrderedDict
2020
from typing_extensions import Protocol
2121

@@ -220,11 +220,11 @@ class GCMBlobEncryptionStream:
220220
"""
221221
def __init__(
222222
self, content_encryption_key: bytes,
223-
data_stream: BinaryIO,
223+
data_stream: IO[bytes],
224224
) -> None:
225225
"""
226226
:param bytes content_encryption_key: The encryption key to use.
227-
:param BinaryIO data_stream: The data stream to read data from.
227+
:param IO[bytes] data_stream: The data stream to read data from.
228228
"""
229229
self.content_encryption_key = content_encryption_key
230230
self.data_stream = data_stream
@@ -261,28 +261,30 @@ def read(self, size: int = -1) -> bytes:
261261
# No more data to read
262262
break
263263

264-
self.current = self._encrypt_region(data)
264+
self.current = encrypt_data_v2(data, self.nonce_counter, self.content_encryption_key)
265+
# IMPORTANT: Must increment the nonce each time.
266+
self.nonce_counter += 1
265267

266268
return result.getvalue()
267269

268-
def _encrypt_region(self, data: bytes) -> bytes:
269-
"""
270-
Encrypt the given region of data using AES-GCM. The result
271-
includes the data in the form: nonce + ciphertext + tag.
272270

273-
:param bytes data: The data to encrypt.
274-
:return: The encrypted bytes.
275-
:rtype: bytes
276-
"""
277-
# Each region MUST use a different nonce
278-
nonce = self.nonce_counter.to_bytes(_GCM_NONCE_LENGTH, 'big')
279-
self.nonce_counter += 1
271+
def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:
272+
"""
273+
Encrypts the given data using the given nonce and key using AES-GCM.
274+
The result includes the data in the form: nonce + ciphertext + tag.
280275
281-
aesgcm = AESGCM(self.content_encryption_key)
276+
:param bytes data: The raw data to encrypt.
277+
:param int nonce: The nonce to use for encryption.
278+
:param bytes key: The encryption key to use for encryption.
279+
:return: The encrypted bytes in the form: nonce + ciphertext + tag.
280+
:rtype: bytes
281+
"""
282+
nonce_bytes = nonce.to_bytes(_GCM_NONCE_LENGTH, 'big')
283+
aesgcm = AESGCM(key)
282284

283-
# Returns ciphertext + tag
284-
ciphertext_with_tag = aesgcm.encrypt(nonce, data, None)
285-
return nonce + ciphertext_with_tag
285+
# Returns ciphertext + tag
286+
ciphertext_with_tag = aesgcm.encrypt(nonce_bytes, data, None)
287+
return nonce_bytes + ciphertext_with_tag
286288

287289

288290
def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:

sdk/storage/azure-storage-blob/azure/storage/blob/_upload_helpers.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ def _any_conditions(modified_access_conditions=None, **kwargs): # pylint: disab
6464

6565
def upload_block_blob( # pylint: disable=too-many-locals, too-many-statements
6666
client=None,
67-
data=None,
6867
stream=None,
6968
length=None,
7069
overwrite=None,
@@ -92,12 +91,10 @@ def upload_block_blob( # pylint: disable=too-many-locals, too-many-statements
9291

9392
# Do single put if the size is smaller than or equal config.max_single_put_size
9493
if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
95-
try:
96-
data = data.read(length)
97-
if not isinstance(data, bytes):
98-
raise TypeError('Blob data should be of type bytes.')
99-
except AttributeError:
100-
pass
94+
data = stream.read(length)
95+
if not isinstance(data, bytes):
96+
raise TypeError('Blob data should be of type bytes.')
97+
10198
if encryption_options.get('key'):
10299
encryption_data, data = encrypt_blob(data, encryption_options['key'], encryption_options['version'])
103100
headers['x-ms-meta-encryptiondata'] = encryption_data
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# -------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
# --------------------------------------------------------------------------
6+
7+
import inspect
8+
import sys
9+
from io import BytesIO
10+
from typing import IO
11+
12+
from .._encryption import _GCM_REGION_DATA_LENGTH, encrypt_data_v2
13+
14+
15+
class GCMBlobEncryptionStream:
16+
"""
17+
An async stream that performs AES-GCM encryption on the given data as
18+
it's streamed. Data is read and encrypted in regions. The stream
19+
will use the same encryption key and will generate a guaranteed unique
20+
nonce for each encryption region.
21+
"""
22+
def __init__(
23+
self, content_encryption_key: bytes,
24+
data_stream: IO[bytes],
25+
) -> None:
26+
"""
27+
:param bytes content_encryption_key: The encryption key to use.
28+
:param IO[bytes] data_stream: The data stream to read data from.
29+
"""
30+
self.content_encryption_key = content_encryption_key
31+
self.data_stream = data_stream
32+
33+
self.offset = 0
34+
self.current = b''
35+
self.nonce_counter = 0
36+
37+
async def read(self, size: int = -1) -> bytes:
38+
"""
39+
Read data from the stream. Specify -1 to read all available data.
40+
41+
:param int size: The amount of data to read. Defaults to -1 for all data.
42+
:return: The bytes read.
43+
:rtype: bytes
44+
"""
45+
result = BytesIO()
46+
remaining = sys.maxsize if size == -1 else size
47+
48+
while remaining > 0:
49+
# Start by reading from current
50+
if len(self.current) > 0:
51+
read = min(remaining, len(self.current))
52+
result.write(self.current[:read])
53+
54+
self.current = self.current[read:]
55+
self.offset += read
56+
remaining -= read
57+
58+
if remaining > 0:
59+
# Read one region of data and encrypt it
60+
data = self.data_stream.read(_GCM_REGION_DATA_LENGTH)
61+
if inspect.isawaitable(data):
62+
data = await data
63+
64+
if len(data) == 0:
65+
# No more data to read
66+
break
67+
68+
self.current = encrypt_data_v2(data, self.nonce_counter, self.content_encryption_key)
69+
# IMPORTANT: Must increment the nonce each time.
70+
self.nonce_counter += 1
71+
72+
return result.getvalue()

sdk/storage/azure-storage-blob/azure/storage/blob/aio/_upload_helpers.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
AppendPositionAccessConditions,
2424
ModifiedAccessConditions,
2525
)
26+
from ._encryption_async import GCMBlobEncryptionStream
2627
from .._encryption import (
27-
GCMBlobEncryptionStream,
2828
encrypt_blob,
2929
get_adjusted_upload_size,
3030
get_blob_encryptor_and_padder,
@@ -40,7 +40,6 @@
4040

4141
async def upload_block_blob( # pylint: disable=too-many-locals, too-many-statements
4242
client=None,
43-
data=None,
4443
stream=None,
4544
length=None,
4645
overwrite=None,
@@ -68,17 +67,16 @@ async def upload_block_blob( # pylint: disable=too-many-locals, too-many-statem
6867

6968
# Do single put if the size is smaller than config.max_single_put_size
7069
if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
71-
try:
72-
data = data.read(length)
73-
if inspect.isawaitable(data):
74-
data = await data
75-
if not isinstance(data, bytes):
76-
raise TypeError('Blob data should be of type bytes.')
77-
except AttributeError:
78-
pass
70+
data = stream.read(length)
71+
if inspect.isawaitable(data):
72+
data = await data
73+
if not isinstance(data, bytes):
74+
raise TypeError('Blob data should be of type bytes.')
75+
7976
if encryption_options.get('key'):
8077
encryption_data, data = encrypt_blob(data, encryption_options['key'], encryption_options['version'])
8178
headers['x-ms-meta-encryptiondata'] = encryption_data
79+
8280
response = await client.upload(
8381
body=data,
8482
content_length=adjusted_count,

sdk/storage/azure-storage-blob/tests/test_blob_encryption_v2.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,88 @@ def test_put_blob_multi_region_chunked_size_greater_region(self, **kwargs):
580580
# Assert
581581
assert content == data
582582

583+
@pytest.mark.live_test_only
584+
@BlobPreparer()
585+
def test_put_blob_other_data_types(self, **kwargs):
586+
storage_account_name = kwargs.pop("storage_account_name")
587+
storage_account_key = kwargs.pop("storage_account_key")
588+
589+
self._setup(storage_account_name, storage_account_key)
590+
kek = KeyWrapper('key1')
591+
bsc = BlobServiceClient(
592+
self.account_url(storage_account_name, "blob"),
593+
credential=storage_account_key,
594+
require_encryption=True,
595+
encryption_version='2.0',
596+
key_encryption_key=kek)
597+
598+
blob = bsc.get_blob_client(self.container_name, self._get_blob_reference())
599+
600+
content = b'Hello World Encrypted!'
601+
length = len(content)
602+
byte_io = BytesIO(content)
603+
604+
def generator():
605+
yield b'Hello '
606+
yield b'World '
607+
yield b'Encrypted!'
608+
609+
def text_generator():
610+
yield 'Hello '
611+
yield 'World '
612+
yield 'Encrypted!'
613+
614+
data_list = [byte_io, generator(), text_generator()]
615+
616+
# Act
617+
for data in data_list:
618+
blob.upload_blob(data, length=length, overwrite=True)
619+
result = blob.download_blob().readall()
620+
621+
# Assert
622+
assert content == result
623+
624+
@pytest.mark.live_test_only
625+
@BlobPreparer()
626+
def test_put_blob_other_data_types_chunked(self, **kwargs):
627+
storage_account_name = kwargs.pop("storage_account_name")
628+
storage_account_key = kwargs.pop("storage_account_key")
629+
630+
self._setup(storage_account_name, storage_account_key)
631+
kek = KeyWrapper('key1')
632+
bsc = BlobServiceClient(
633+
self.account_url(storage_account_name, "blob"),
634+
credential=storage_account_key,
635+
max_single_put_size=1024,
636+
max_block_size=1024,
637+
require_encryption=True,
638+
encryption_version='2.0',
639+
key_encryption_key=kek)
640+
641+
blob = bsc.get_blob_client(self.container_name, self._get_blob_reference())
642+
643+
content = b'abcde' * 1030 # 5 KiB + 30
644+
byte_io = BytesIO(content)
645+
646+
def generator():
647+
for i in range(0, len(content), 500):
648+
yield content[i: i + 500]
649+
650+
def text_generator():
651+
s_content = str(content, encoding='utf-8')
652+
for i in range(0, len(s_content), 500):
653+
yield s_content[i: i + 500]
654+
655+
data_list = [byte_io, generator(), text_generator()]
656+
657+
# Act
658+
for data in data_list:
659+
blob.upload_blob(data, overwrite=True)
660+
result = blob.download_blob().readall()
661+
662+
# Assert
663+
assert content == result
664+
583665
@pytest.mark.live_test_only
584666
@BlobPreparer()
585667
def test_get_blob_range_single_region(self, **kwargs):

0 commit comments

Comments
 (0)