Skip to content

Commit 3f29c3e

Browse files
authored
[Storage] Directory Pause and Resume Tests with directory resume fixes (Azure#35640)
* WIP * Fix to incorrect download failed message tests; Update to iterate through paused job parts for directories * WIP - tests pass for directory pause, except pause then resume * Pause then resume directory upload/download/copy * Correct test name BlockBlobClientTests/SyncUploadFromUriAsync_OverwriteSourceBlobProperties * Disable pause and resume tests to prevent flaking * Playback pause and resume tests * Smaller size to make smaller test recordings * Change uploaded blob to forward slash in the tests and enable childstorageresource tests * Disable pause and resume tests
1 parent f6e2fdd commit 3f29c3e

File tree

89 files changed

+18640
-554
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+18640
-554
lines changed

sdk/storage/Azure.Storage.Blobs/tests/BlockBlobClientTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2788,7 +2788,7 @@ await TestHelper.AssertExpectedExceptionAsync<RequestFailedException>(
27882788

27892789
[RecordedTest]
27902790
[ServiceVersion(Min = BlobClientOptions.ServiceVersion.V2020_04_08)]
2791-
public async Task SyncUploadFromUriAsync_OverwiteSourceBlobProperties()
2791+
public async Task SyncUploadFromUriAsync_OverwriteSourceBlobProperties()
27922792
{
27932793
// Arrange
27942794
var constants = TestConstants.Create(this);

sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDirectoryStorageResourceContainer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,19 @@ public override StorageResource GetChildStorageResource(string path)
8383
if (_options?.BlobType == BlobType.Append)
8484
{
8585
return new AppendBlobStorageResource(
86-
_blobContainerClient.GetAppendBlobClient(System.IO.Path.Combine(_directoryPrefix, path)),
86+
_blobContainerClient.GetAppendBlobClient(string.Join("/", _directoryPrefix, path)),
8787
_options?.ToAppendBlobStorageResourceOptions());
8888
}
8989
else if (_options?.BlobType == BlobType.Page)
9090
{
9191
return new PageBlobStorageResource(
92-
_blobContainerClient.GetPageBlobClient(System.IO.Path.Combine(_directoryPrefix, path)),
92+
_blobContainerClient.GetPageBlobClient(string.Join("/", _directoryPrefix, path)),
9393
_options?.ToPageBlobStorageResourceOptions());
9494
}
9595
else // BlobType.Block or null
9696
{
9797
return new BlockBlobStorageResource(
98-
_blobContainerClient.GetBlockBlobClient(System.IO.Path.Combine(_directoryPrefix, path)),
98+
_blobContainerClient.GetBlockBlobClient(string.Join("/", _directoryPrefix, path)),
9999
_options?.ToBlockBlobStorageResourceOptions());
100100
}
101101
}

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ internal abstract class JobPartInternal
103103
/// </summary>
104104
internal long? Length;
105105

106+
/// <summary>
107+
/// Defines whether or not this was the final part in the list call. This would determine
108+
/// whether or not we needed to keep listing in the job.
109+
/// </summary>
110+
public bool IsFinalPart { get; internal set; }
111+
106112
/// <summary>
107113
/// If the transfer status of the job changes then the event will get added to this handler.
108114
/// </summary>
@@ -151,6 +157,7 @@ internal JobPartInternal(
151157
StorageResourceCreateMode createMode,
152158
TransferCheckpointer checkpointer,
153159
ArrayPool<byte> arrayPool,
160+
bool isFinalPart,
154161
SyncAsyncEventHandler<TransferStatusEventArgs> jobPartEventHandler,
155162
SyncAsyncEventHandler<TransferStatusEventArgs> statusEventHandler,
156163
SyncAsyncEventHandler<TransferFailedEventArgs> failedEventHandler,
@@ -171,6 +178,7 @@ internal JobPartInternal(
171178
_checkpointer = checkpointer;
172179
_cancellationToken = cancellationToken;
173180
_arrayPool = arrayPool;
181+
IsFinalPart = isFinalPart;
174182
PartTransferStatusEventHandler = jobPartEventHandler;
175183
TransferStatusEventHandler = statusEventHandler;
176184
TransferFailedEventHandler = failedEventHandler;
@@ -208,7 +216,7 @@ public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate)
208216
/// when we're looking to stop/pause the job part.
209217
/// </summary>
210218
/// <returns></returns>
211-
public async Task QueueChunkToChannelAsync(Task chunkTask)
219+
public async Task QueueChunkToChannelAsync(Func<Task> chunkTask)
212220
{
213221
// Attach TaskCompletionSource
214222
TaskCompletionSource<bool> chunkCompleted = new TaskCompletionSource<bool>(
@@ -220,7 +228,7 @@ public async Task QueueChunkToChannelAsync(Task chunkTask)
220228
await QueueChunk(
221229
async () =>
222230
{
223-
await chunkTask.ConfigureAwait(false);
231+
await Task.Run(chunkTask).ConfigureAwait(false);
224232
chunkCompleted.SetResult(true);
225233
await CheckAndUpdateCancellationStatusAsync().ConfigureAwait(false);
226234
}).ConfigureAwait(false);
@@ -243,7 +251,7 @@ await QueueChunk(
243251
/// and any chunks is still processing to be cancelled is will be set to <see cref="StorageTransferStatus.CancellationInProgress"/>
244252
/// until the chunks finish then it will be set to <see cref="StorageTransferStatus.CompletedWithFailedTransfers"/>.
245253
/// </summary>
246-
/// <returns></returns>
254+
/// <returns>The task to wait until the cancellation has been triggered.</returns>
247255
internal async Task TriggerCancellationAsync()
248256
{
249257
if (!_cancellationToken.IsCancellationRequested)
@@ -389,9 +397,17 @@ public async virtual Task CleanupAbortedJobPartAsync()
389397
}
390398
}
391399

392-
public async virtual Task AddJobPartToCheckpointer(int chunksTotal)
400+
/// <summary>
401+
/// Serializes the respective job part and adds it to the checkpointer.
402+
/// </summary>
403+
/// <param name="chunksTotal">Number of chunks in the job part.</param>
404+
/// <param name="isFinalPart">Defines if this part is the last job part of the job.</param>
405+
/// <returns></returns>
406+
public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal, bool isFinalPart)
393407
{
394-
JobPartPlanHeader header = this.ToJobPartPlanHeader(StorageTransferStatus.InProgress);
408+
JobPartPlanHeader header = this.ToJobPartPlanHeader(
409+
jobStatus: StorageTransferStatus.InProgress,
410+
isFinalPart: isFinalPart);
395411
using (Stream stream = new MemoryStream())
396412
{
397413
header.Serialize(stream);

sdk/storage/Azure.Storage.DataMovement/src/JobPlanExtensions.cs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public static async Task<StreamToUriJobPart> ToJobPartAsync(
2828
jobPartStatus: jobPartStatus,
2929
sourceResource: sourceResource,
3030
destinationResource: destinationResource,
31-
partPlanFileExists: true).ConfigureAwait(false);
31+
partPlanFileExists: true,
32+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
3233

3334
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
3435
return jobPart;
@@ -44,14 +45,15 @@ public static async Task<ServiceToServiceJobPart> ToJobPartAsync(
4445
JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream);;
4546

4647
// Apply credentials to the saved transfer job path
47-
StorageTransferStatus jobPartStatus = (StorageTransferStatus) header.AtomicJobStatus;
48+
StorageTransferStatus jobPartStatus = header.AtomicJobStatus;
4849
ServiceToServiceJobPart jobPart = await ServiceToServiceJobPart.CreateJobPartAsync(
4950
job: baseJob,
5051
partNumber: Convert.ToInt32(header.PartNumber),
5152
jobPartStatus: jobPartStatus,
5253
sourceResource: sourceResource,
5354
destinationResource: destinationResource,
54-
partPlanFileExists: true).ConfigureAwait(false);
55+
partPlanFileExists: true,
56+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
5557

5658
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
5759
return jobPart;
@@ -67,14 +69,15 @@ public static async Task<UriToStreamJobPart> ToJobPartAsync(
6769
JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream);;
6870

6971
// Apply credentials to the saved transfer job path
70-
StorageTransferStatus jobPartStatus = (StorageTransferStatus)header.AtomicJobStatus;
72+
StorageTransferStatus jobPartStatus = header.AtomicJobStatus;
7173
UriToStreamJobPart jobPart = await UriToStreamJobPart.CreateJobPartAsync(
7274
job: baseJob,
7375
partNumber: Convert.ToInt32(header.PartNumber),
7476
jobPartStatus: jobPartStatus,
7577
sourceResource: sourceResource,
7678
destinationResource: destinationResource,
77-
partPlanFileExists: true).ConfigureAwait(false);
79+
partPlanFileExists: true,
80+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
7881

7982
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
8083
return jobPart;
@@ -87,19 +90,22 @@ public static async Task<StreamToUriJobPart> ToJobPartAsync(
8790
StorageResourceContainer destinationResource)
8891
{
8992
// Convert stream to job plan header
90-
JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream);;
93+
JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream);
9194

9295
// Apply credentials to the saved transfer job path
9396
string childSourcePath = header.SourcePath;
94-
string childDestinationPath = header.SourcePath;
97+
string childSourceName = childSourcePath.Substring(sourceResource.Path.Length + 1);
98+
string childDestinationPath = header.DestinationPath;
99+
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
95100
StorageTransferStatus jobPartStatus = header.AtomicJobStatus;
96101
StreamToUriJobPart jobPart = await StreamToUriJobPart.CreateJobPartAsync(
97102
job: baseJob,
98103
partNumber: Convert.ToInt32(header.PartNumber),
99104
jobPartStatus: jobPartStatus,
100-
sourceResource: sourceResource.GetChildStorageResource(childSourcePath.Substring(sourceResource.Path.Length)),
101-
destinationResource: destinationResource.GetChildStorageResource(childDestinationPath.Substring(destinationResource.Path.Length)),
102-
partPlanFileExists: true).ConfigureAwait(false);
105+
sourceResource: sourceResource.GetChildStorageResource(childSourceName),
106+
destinationResource: destinationResource.GetChildStorageResource(childDestinationName),
107+
partPlanFileExists: true,
108+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
103109

104110
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
105111
return jobPart;
@@ -116,15 +122,16 @@ public static async Task<ServiceToServiceJobPart> ToJobPartAsync(
116122

117123
// Apply credentials to the saved transfer job path
118124
string childSourcePath = header.SourcePath;
119-
string childDestinationPath = header.SourcePath;
125+
string childDestinationPath = header.DestinationPath;
120126
StorageTransferStatus jobPartStatus = header.AtomicJobStatus;
121127
ServiceToServiceJobPart jobPart = await ServiceToServiceJobPart.CreateJobPartAsync(
122128
job: baseJob,
123129
partNumber: Convert.ToInt32(header.PartNumber),
124130
jobPartStatus: jobPartStatus,
125-
sourceResource: sourceResource.GetChildStorageResource(childSourcePath.Substring(sourceResource.Path.Length)),
126-
destinationResource: destinationResource.GetChildStorageResource(childDestinationPath.Substring(destinationResource.Path.Length)),
127-
partPlanFileExists: true).ConfigureAwait(false);
131+
sourceResource: sourceResource.GetChildStorageResource(childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1)),
132+
destinationResource: destinationResource.GetChildStorageResource(childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1)),
133+
partPlanFileExists: true,
134+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
128135

129136
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
130137
return jobPart;
@@ -141,15 +148,18 @@ public static async Task<UriToStreamJobPart> ToJobPartAsync(
141148

142149
// Apply credentials to the saved transfer job path
143150
string childSourcePath = header.SourcePath;
144-
string childDestinationPath = header.SourcePath;
151+
string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
152+
string childDestinationPath = header.DestinationPath;
153+
string childDestinationName = childDestinationPath.Substring(destinationResource.Path.Length + 1);
145154
StorageTransferStatus jobPartStatus = header.AtomicJobStatus;
146155
UriToStreamJobPart jobPart = await UriToStreamJobPart.CreateJobPartAsync(
147156
job: baseJob,
148157
partNumber: Convert.ToInt32(header.PartNumber),
149158
jobPartStatus: jobPartStatus,
150-
sourceResource: sourceResource.GetChildStorageResource(childSourcePath.Substring(sourceResource.Path.Length)),
151-
destinationResource: destinationResource.GetChildStorageResource(childDestinationPath.Substring(destinationResource.Path.Length)),
152-
partPlanFileExists: true).ConfigureAwait(false);
159+
sourceResource: sourceResource.GetChildStorageResource(childSourceName),
160+
destinationResource: destinationResource.GetChildStorageResource(childDestinationName),
161+
partPlanFileExists: true,
162+
isFinalPart: header.IsFinalPart).ConfigureAwait(false);
153163

154164
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
155165
return jobPart;
@@ -158,7 +168,9 @@ public static async Task<UriToStreamJobPart> ToJobPartAsync(
158168
/// <summary>
159169
/// Translate the initial job part header to a job plan format file
160170
/// </summary>
161-
internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPart, StorageTransferStatus jobStatus)
171+
internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPart,
172+
StorageTransferStatus jobStatus,
173+
bool isFinalPart)
162174
{
163175
JobPartPlanDestinationBlob dstBlobData = new JobPartPlanDestinationBlob(
164176
blobType: JobPlanBlobType.Detect, // TODO: update when supported
@@ -186,11 +198,15 @@ internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPa
186198
startTime: DateTimeOffset.UtcNow, // TODO: update to job start time
187199
transferId: jobPart._dataTransfer.Id,
188200
partNumber: (uint)jobPart.PartNumber,
189-
sourcePath: jobPart._sourceResource.Path,
201+
sourcePath: jobPart._sourceResource.CanProduceUri == ProduceUriType.ProducesUri ?
202+
jobPart._sourceResource.Uri.AbsoluteUri :
203+
jobPart._sourceResource.Path,
190204
sourceExtraQuery: "", // TODO: convert options to string
191-
destinationPath: jobPart._destinationResource.Path,
205+
destinationPath: jobPart._destinationResource.CanProduceUri == ProduceUriType.ProducesUri ?
206+
jobPart._destinationResource.Uri.AbsoluteUri :
207+
jobPart._destinationResource.Path,
192208
destinationExtraQuery: "", // TODO: convert options to string
193-
isFinalPart: false, // TODO: change but we might remove this param
209+
isFinalPart: isFinalPart,
194210
forceWrite: jobPart._createMode == StorageResourceCreateMode.Overwrite, // TODO: change to enum value
195211
forceIfReadOnly: false, // TODO: revisit for Azure Files
196212
autoDecompress: false, // TODO: revisit if we want to support this feature

0 commit comments

Comments
 (0)