Skip to content

Commit 2442a62

Browse files
[Storage][DataMovement] Ensure single call downloads are queued to chunk channel (Azure#48557)
1 parent 6187fa7 commit 2442a62

File tree

1 file changed

+50
-47
lines changed

1 file changed

+50
-47
lines changed

sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,14 @@ public override async Task ProcessPartToChunkAsync()
204204
return;
205205
}
206206
await OnTransferStateChangedAsync(TransferState.InProgress).ConfigureAwait(false);
207+
207208
if (!_sourceResource.Length.HasValue)
208209
{
209-
await UnknownDownloadInternal().ConfigureAwait(false);
210+
await UnknownLengthDownloadAsync().ConfigureAwait(false);
210211
}
211212
else
212213
{
213-
await LengthKnownDownloadInternal().ConfigureAwait(false);
214+
await KnownLengthDownloadAsync().ConfigureAwait(false);
214215
}
215216
}
216217
catch (Exception ex)
@@ -220,36 +221,34 @@ public override async Task ProcessPartToChunkAsync()
220221
}
221222
}
222223

223-
internal async Task UnknownDownloadInternal()
224+
internal async Task UnknownLengthDownloadAsync()
224225
{
225-
Task<StorageResourceReadStreamResult> initialTask = _sourceResource.ReadStreamAsync(
226-
position: 0,
227-
length: _initialTransferSize,
228-
_cancellationToken);
229-
230226
try
231227
{
232228
StorageResourceReadStreamResult initialResult = default;
233229
try
234230
{
235-
initialResult = await initialTask.ConfigureAwait(false);
231+
initialResult = await _sourceResource.ReadStreamAsync(
232+
position: 0,
233+
length: _initialTransferSize,
234+
_cancellationToken).ConfigureAwait(false);
236235
}
237236
catch
238237
{
239238
// Range not accepted, we need to attempt to use a default range
239+
// This can happen if the source is empty.
240240
initialResult = await _sourceResource.ReadStreamAsync(
241241
cancellationToken: _cancellationToken)
242242
.ConfigureAwait(false);
243243
}
244-
// If the initial request returned no content (i.e., a 304),
245-
// we'll pass that back to the user immediately
244+
246245
long? initialLength = initialResult?.ContentLength;
247246

248247
// There needs to be at least 1 chunk to create the blob even if the
249248
// length is 0 bytes.
250249
if (initialResult == default || (initialLength ?? 0) == 0)
251250
{
252-
await CreateZeroLengthDownload().ConfigureAwait(false);
251+
await QueueChunkToChannelAsync(ZeroLengthDownloadAsync).ConfigureAwait(false);
253252
return;
254253
}
255254

@@ -285,41 +284,20 @@ internal async Task UnknownDownloadInternal()
285284
}
286285
}
287286

288-
internal async Task LengthKnownDownloadInternal()
287+
internal async Task KnownLengthDownloadAsync()
289288
{
290289
long totalLength = _sourceResource.Length.Value;
291290
if (totalLength == 0)
292291
{
293-
await CreateZeroLengthDownload().ConfigureAwait(false);
292+
await QueueChunkToChannelAsync(ZeroLengthDownloadAsync).ConfigureAwait(false);
294293
}
295294
// Download with a single GET
296295
else if (_initialTransferSize >= totalLength)
297296
{
298-
// To prevent requesting a range that is invalid when
299-
// we already know the length we can just make one get blob request.
300-
StorageResourceReadStreamResult result = await _sourceResource.
301-
ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
297+
await QueueChunkToChannelAsync(
298+
async () =>
299+
await DownloadSingleAsync(totalLength).ConfigureAwait(false))
302300
.ConfigureAwait(false);
303-
304-
long downloadLength = result.ContentLength.Value;
305-
// This should not occur but add a check just in case
306-
if (downloadLength != totalLength)
307-
{
308-
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
309-
}
310-
311-
bool successfulCopy = await CopyToStreamInternal(
312-
offset: 0,
313-
sourceLength: downloadLength,
314-
source: result.Content,
315-
expectedLength: totalLength,
316-
initial: true).ConfigureAwait(false);
317-
if (successfulCopy)
318-
{
319-
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
320-
// Queue the work to end the download
321-
await QueueCompleteFileDownload().ConfigureAwait(false);
322-
}
323301
}
324302
// Download in chunks
325303
else
@@ -328,7 +306,6 @@ internal async Task LengthKnownDownloadInternal()
328306
}
329307
}
330308

331-
#region PartitionedDownloader
332309
private async Task QueueChunksToChannel(long initialLength, long totalLength)
333310
{
334311
// Create Download Chunk event handler to manage when the ranges finish downloading
@@ -352,7 +329,7 @@ private async Task QueueChunksToChannel(long initialLength, long totalLength)
352329
// return before it's completed downloading)
353330
await QueueChunkToChannelAsync(
354331
async () =>
355-
await DownloadStreamingInternal(range: httpRange).ConfigureAwait(false))
332+
await DownloadChunkAsync(range: httpRange).ConfigureAwait(false))
356333
.ConfigureAwait(false);
357334
chunkCount++;
358335
}
@@ -366,7 +343,7 @@ await DownloadStreamingInternal(range: httpRange).ConfigureAwait(false))
366343
}
367344
}
368345

369-
internal async Task CompleteFileDownload()
346+
internal async Task CompleteFileDownloadAsync()
370347
{
371348
try
372349
{
@@ -389,7 +366,35 @@ await _destinationResource.CompleteTransferAsync(
389366
}
390367
}
391368

392-
internal async Task DownloadStreamingInternal(HttpRange range)
369+
private async Task DownloadSingleAsync(long totalLength)
370+
{
371+
// To prevent requesting a range that is invalid when
372+
// we already know the length we can just make one get blob request.
373+
StorageResourceReadStreamResult result = await _sourceResource
374+
.ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
375+
.ConfigureAwait(false);
376+
377+
long downloadLength = result.ContentLength.Value;
378+
// This should not occur but add a check just in case
379+
if (downloadLength != totalLength)
380+
{
381+
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
382+
}
383+
384+
bool successfulCopy = await CopyToStreamInternal(
385+
offset: 0,
386+
sourceLength: downloadLength,
387+
source: result.Content,
388+
expectedLength: totalLength,
389+
initial: true).ConfigureAwait(false);
390+
if (successfulCopy)
391+
{
392+
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
393+
await CompleteFileDownloadAsync().ConfigureAwait(false);
394+
}
395+
}
396+
397+
private async Task DownloadChunkAsync(HttpRange range)
393398
{
394399
try
395400
{
@@ -478,7 +483,7 @@ private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(U
478483

479484
private Task QueueCompleteFileDownload()
480485
{
481-
return QueueChunkToChannelAsync(CompleteFileDownload);
486+
return QueueChunkToChannelAsync(CompleteFileDownloadAsync);
482487
}
483488

484489
private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLength, long rangeSize)
@@ -488,7 +493,6 @@ private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLe
488493
yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize));
489494
}
490495
}
491-
#endregion PartitionedDownloader
492496

493497
public override async Task InvokeSkippedArgAsync()
494498
{
@@ -509,7 +513,7 @@ public override async Task DisposeHandlersAsync()
509513
}
510514
}
511515

512-
private async Task CreateZeroLengthDownload()
516+
private async Task ZeroLengthDownloadAsync()
513517
{
514518
// We just need to at minimum create the file
515519
bool successfulCreation = await CopyToStreamInternal(
@@ -520,8 +524,7 @@ private async Task CreateZeroLengthDownload()
520524
initial: true).ConfigureAwait(false);
521525
if (successfulCreation)
522526
{
523-
// Queue the work to end the download
524-
await QueueCompleteFileDownload().ConfigureAwait(false);
527+
await CompleteFileDownloadAsync().ConfigureAwait(false);
525528
}
526529
else
527530
{

0 commit comments

Comments
 (0)