Skip to content

Commit 0ab1e0e

Browse files
[Storage] Add progress tracking callback for blob upload (Azure#23965)
1 parent f83f148 commit 0ab1e0e

28 files changed

+2703
-14
lines changed

sdk/storage/azure-storage-blob/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 12.12.0 (Unreleased)
44

55
### Features Added
6+
- Added an optional callback `progress_hook` to `upload_blob()` that can be used to track progress of a large upload.
67

78
### Bugs Fixed:
89
- Fixed a bug in `BlobClient.from_blob_url()` such that users will receive a more helpful error

sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,10 @@ def upload_blob( # pylint: disable=too-many-locals
697697
698698
:keyword str encoding:
699699
Defaults to UTF-8.
700+
:keyword Callable[[int, Optional[int]], None] progress_hook:
701+
A callback to track the progress of a long running upload. The signature is
702+
function(current: int, total: Optional[int]) where current is the number of bytes transfered
703+
so far, and total is the size of the blob or None if the size is unknown.
700704
:keyword int timeout:
701705
The timeout parameter is expressed in seconds. This method may make
702706
multiple calls to the Azure service and the timeout will apply to

sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def upload_data_chunks(
5353
stream=None,
5454
validate_content=None,
5555
encryption_options=None,
56+
progress_hook=None,
5657
**kwargs):
5758

5859
if encryption_options:
@@ -75,6 +76,7 @@ def upload_data_chunks(
7576
stream=stream,
7677
parallel=parallel,
7778
validate_content=validate_content,
79+
progress_hook=progress_hook,
7880
**kwargs)
7981
if parallel:
8082
with futures.ThreadPoolExecutor(max_concurrency) as executor:
@@ -98,6 +100,7 @@ def upload_substream_blocks(
98100
chunk_size=None,
99101
max_concurrency=None,
100102
stream=None,
103+
progress_hook=None,
101104
**kwargs):
102105
parallel = max_concurrency > 1
103106
if parallel and 'modified_access_conditions' in kwargs:
@@ -109,6 +112,7 @@ def upload_substream_blocks(
109112
chunk_size=chunk_size,
110113
stream=stream,
111114
parallel=parallel,
115+
progress_hook=progress_hook,
112116
**kwargs)
113117

114118
if parallel:
@@ -128,7 +132,16 @@ def upload_substream_blocks(
128132

129133
class _ChunkUploader(object): # pylint: disable=too-many-instance-attributes
130134

131-
def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=None, padder=None, **kwargs):
135+
def __init__(
136+
self, service,
137+
total_size,
138+
chunk_size,
139+
stream,
140+
parallel,
141+
encryptor=None,
142+
padder=None,
143+
progress_hook=None,
144+
**kwargs):
132145
self.service = service
133146
self.total_size = total_size
134147
self.chunk_size = chunk_size
@@ -142,6 +155,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=
142155
# Progress feedback
143156
self.progress_total = 0
144157
self.progress_lock = Lock() if parallel else None
158+
self.progress_hook = progress_hook
145159

146160
# Encryption
147161
self.encryptor = encryptor
@@ -199,6 +213,9 @@ def _update_progress(self, length):
199213
else:
200214
self.progress_total += length
201215

216+
if self.progress_hook:
217+
self.progress_hook(self.progress_total, self.total_size)
218+
202219
def _upload_chunk(self, chunk_offset, chunk_data):
203220
raise NotImplementedError("Must be implemented by child class.")
204221

sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads_async.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async def upload_data_chunks(
5353
max_concurrency=None,
5454
stream=None,
5555
encryption_options=None,
56+
progress_hook=None,
5657
**kwargs):
5758

5859
if encryption_options:
@@ -74,6 +75,7 @@ async def upload_data_chunks(
7475
chunk_size=chunk_size,
7576
stream=stream,
7677
parallel=parallel,
78+
progress_hook=progress_hook,
7779
**kwargs)
7880

7981
if parallel:
@@ -100,6 +102,7 @@ async def upload_substream_blocks(
100102
chunk_size=None,
101103
max_concurrency=None,
102104
stream=None,
105+
progress_hook=None,
103106
**kwargs):
104107
parallel = max_concurrency > 1
105108
if parallel and 'modified_access_conditions' in kwargs:
@@ -111,6 +114,7 @@ async def upload_substream_blocks(
111114
chunk_size=chunk_size,
112115
stream=stream,
113116
parallel=parallel,
117+
progress_hook=progress_hook,
114118
**kwargs)
115119

116120
if parallel:
@@ -131,7 +135,16 @@ async def upload_substream_blocks(
131135

132136
class _ChunkUploader(object): # pylint: disable=too-many-instance-attributes
133137

134-
def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=None, padder=None, **kwargs):
138+
def __init__(
139+
self, service,
140+
total_size,
141+
chunk_size,
142+
stream,
143+
parallel,
144+
encryptor=None,
145+
padder=None,
146+
progress_hook=None,
147+
**kwargs):
135148
self.service = service
136149
self.total_size = total_size
137150
self.chunk_size = chunk_size
@@ -145,6 +158,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=
145158
# Progress feedback
146159
self.progress_total = 0
147160
self.progress_lock = Lock() if parallel else None
161+
self.progress_hook = progress_hook
148162

149163
# Encryption
150164
self.encryptor = encryptor
@@ -202,6 +216,9 @@ async def _update_progress(self, length):
202216
else:
203217
self.progress_total += length
204218

219+
if self.progress_hook:
220+
self.progress_hook(self.progress_total, self.total_size)
221+
205222
async def _upload_chunk(self, chunk_offset, chunk_data):
206223
raise NotImplementedError("Must be implemented by child class.")
207224

sdk/storage/azure-storage-blob/azure/storage/blob/_upload_helpers.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
8484
immutability_policy_expiry = None if immutability_policy is None else immutability_policy.expiry_time
8585
immutability_policy_mode = None if immutability_policy is None else immutability_policy.policy_mode
8686
legal_hold = kwargs.pop('legal_hold', None)
87+
progress_hook = kwargs.pop('progress_hook', None)
8788

8889
# Do single put if the size is smaller than or equal config.max_single_put_size
8990
if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
@@ -96,7 +97,8 @@ def upload_block_blob( # pylint: disable=too-many-locals
9697
if encryption_options.get('key'):
9798
encryption_data, data = encrypt_blob(data, encryption_options['key'])
9899
headers['x-ms-meta-encryptiondata'] = encryption_data
99-
return client.upload(
100+
101+
response = client.upload(
100102
body=data,
101103
content_length=adjusted_count,
102104
blob_http_headers=blob_headers,
@@ -112,6 +114,11 @@ def upload_block_blob( # pylint: disable=too-many-locals
112114
legal_hold=legal_hold,
113115
**kwargs)
114116

117+
if progress_hook:
118+
progress_hook(adjusted_count, adjusted_count)
119+
120+
return response
121+
115122
use_original_upload_path = blob_settings.use_byte_buffer or \
116123
validate_content or encryption_options.get('required') or \
117124
blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \
@@ -133,6 +140,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
133140
stream=stream,
134141
validate_content=validate_content,
135142
encryption_options=encryption_options,
143+
progress_hook=progress_hook,
136144
headers=headers,
137145
**kwargs
138146
)
@@ -145,6 +153,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
145153
max_concurrency=max_concurrency,
146154
stream=stream,
147155
validate_content=validate_content,
156+
progress_hook=progress_hook,
148157
headers=headers,
149158
**kwargs
150159
)
@@ -200,6 +209,7 @@ def upload_page_blob(
200209
if encryption_options and encryption_options.get('data'):
201210
headers['x-ms-meta-encryptiondata'] = encryption_options['data']
202211
blob_tags_string = kwargs.pop('blob_tags_string', None)
212+
progress_hook = kwargs.pop('progress_hook', None)
203213

204214
response = client.create(
205215
content_length=0,
@@ -223,6 +233,7 @@ def upload_page_blob(
223233
max_concurrency=max_concurrency,
224234
validate_content=validate_content,
225235
encryption_options=encryption_options,
236+
progress_hook=progress_hook,
226237
headers=headers,
227238
**kwargs)
228239

@@ -254,6 +265,7 @@ def upload_append_blob( # pylint: disable=unused-argument
254265
max_size=kwargs.pop('maxsize_condition', None),
255266
append_position=None)
256267
blob_tags_string = kwargs.pop('blob_tags_string', None)
268+
progress_hook = kwargs.pop('progress_hook', None)
257269

258270
try:
259271
if overwrite:
@@ -272,6 +284,7 @@ def upload_append_blob( # pylint: disable=unused-argument
272284
max_concurrency=max_concurrency,
273285
validate_content=validate_content,
274286
append_position_access_conditions=append_conditions,
287+
progress_hook=progress_hook,
275288
headers=headers,
276289
**kwargs)
277290
except HttpResponseError as error:
@@ -300,6 +313,7 @@ def upload_append_blob( # pylint: disable=unused-argument
300313
max_concurrency=max_concurrency,
301314
validate_content=validate_content,
302315
append_position_access_conditions=append_conditions,
316+
progress_hook=progress_hook,
303317
headers=headers,
304318
**kwargs)
305319
except HttpResponseError as error:

sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ async def upload_blob(
366366
367367
:keyword str encoding:
368368
Defaults to UTF-8.
369+
:keyword Callable[[int, Optional[int]], None] progress_hook:
370+
A callback to track the progress of a long running upload. The signature is
371+
function(current: int, total: Optional[int]) where current is the number of bytes transfered
372+
so far, and total is the size of the blob or None if the size is unknown.
369373
:keyword int timeout:
370374
The timeout parameter is expressed in seconds. This method may make
371375
multiple calls to the Azure service and the timeout will apply to

sdk/storage/azure-storage-blob/azure/storage/blob/aio/_upload_helpers.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async def upload_block_blob( # pylint: disable=too-many-locals
5959
immutability_policy_expiry = None if immutability_policy is None else immutability_policy.expiry_time
6060
immutability_policy_mode = None if immutability_policy is None else immutability_policy.policy_mode
6161
legal_hold = kwargs.pop('legal_hold', None)
62+
progress_hook = kwargs.pop('progress_hook', None)
6263

6364
# Do single put if the size is smaller than config.max_single_put_size
6465
if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
@@ -71,7 +72,7 @@ async def upload_block_blob( # pylint: disable=too-many-locals
7172
if encryption_options.get('key'):
7273
encryption_data, data = encrypt_blob(data, encryption_options['key'])
7374
headers['x-ms-meta-encryptiondata'] = encryption_data
74-
return await client.upload(
75+
response = await client.upload(
7576
body=data,
7677
content_length=adjusted_count,
7778
blob_http_headers=blob_headers,
@@ -87,6 +88,11 @@ async def upload_block_blob( # pylint: disable=too-many-locals
8788
legal_hold=legal_hold,
8889
**kwargs)
8990

91+
if progress_hook:
92+
progress_hook(adjusted_count, adjusted_count)
93+
94+
return response
95+
9096
use_original_upload_path = blob_settings.use_byte_buffer or \
9197
validate_content or encryption_options.get('required') or \
9298
blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \
@@ -108,6 +114,7 @@ async def upload_block_blob( # pylint: disable=too-many-locals
108114
stream=stream,
109115
validate_content=validate_content,
110116
encryption_options=encryption_options,
117+
progress_hook=progress_hook,
111118
headers=headers,
112119
**kwargs
113120
)
@@ -120,6 +127,7 @@ async def upload_block_blob( # pylint: disable=too-many-locals
120127
max_concurrency=max_concurrency,
121128
stream=stream,
122129
validate_content=validate_content,
130+
progress_hook=progress_hook,
123131
headers=headers,
124132
**kwargs
125133
)
@@ -175,6 +183,7 @@ async def upload_page_blob(
175183
if encryption_options and encryption_options.get('data'):
176184
headers['x-ms-meta-encryptiondata'] = encryption_options['data']
177185
blob_tags_string = kwargs.pop('blob_tags_string', None)
186+
progress_hook = kwargs.pop('progress_hook', None)
178187

179188
response = await client.create(
180189
content_length=0,
@@ -198,6 +207,7 @@ async def upload_page_blob(
198207
max_concurrency=max_concurrency,
199208
validate_content=validate_content,
200209
encryption_options=encryption_options,
210+
progress_hook=progress_hook,
201211
headers=headers,
202212
**kwargs)
203213

@@ -229,6 +239,7 @@ async def upload_append_blob( # pylint: disable=unused-argument
229239
max_size=kwargs.pop('maxsize_condition', None),
230240
append_position=None)
231241
blob_tags_string = kwargs.pop('blob_tags_string', None)
242+
progress_hook = kwargs.pop('progress_hook', None)
232243

233244
try:
234245
if overwrite:
@@ -247,6 +258,7 @@ async def upload_append_blob( # pylint: disable=unused-argument
247258
max_concurrency=max_concurrency,
248259
validate_content=validate_content,
249260
append_position_access_conditions=append_conditions,
261+
progress_hook=progress_hook,
250262
headers=headers,
251263
**kwargs)
252264
except HttpResponseError as error:
@@ -275,6 +287,7 @@ async def upload_append_blob( # pylint: disable=unused-argument
275287
max_concurrency=max_concurrency,
276288
validate_content=validate_content,
277289
append_position_access_conditions=append_conditions,
290+
progress_hook=progress_hook,
278291
headers=headers,
279292
**kwargs)
280293
except HttpResponseError as error:

0 commit comments

Comments
 (0)