Skip to content

Commit 748c9c4

Browse files
Checkpointer testing cont. (Azure#47125)
* change checkpointer API Accept header object instead of stream * move serialze closer to checkpoint write * error report bug fix * adapt tests to new interface * exportapi
1 parent b8a7f8a commit 748c9c4

14 files changed

+308
-181
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/tests/Azure.Storage.DataMovement.Blobs.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestEventsRaised.cs" LinkBase="Shared\DataMovement" />
4242
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestTransferWithTimeout.cs" LinkBase="Shared\DataMovement" />
4343
<Compile Include="$(AzureStorageDataMovementTestSharedSources)DisposingLocalDirectory.cs" LinkBase="Shared\DataMovement" />
44+
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MemoryTransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
4445
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockQueueInternalTasks.cs" LinkBase="Shared\DataMovement" />
4546
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockResourceCheckpointData.cs" LinkBase="Shared\DataMovement" />
4647
<Compile Include="$(AzureStorageDataMovementTestSharedSources)StartTransferUploadTestBase.cs" LinkBase="Shared\DataMovement" />

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
237237
}
238238
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
239239
{
240-
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
241-
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
240+
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
241+
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
242242
public System.Exception Exception { get { throw null; } }
243-
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
243+
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
244244
}
245245
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
246246
{

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
237237
}
238238
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
239239
{
240-
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
241-
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
240+
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
241+
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
242242
public System.Exception Exception { get { throw null; } }
243-
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
243+
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
244244
}
245245
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
246246
{

sdk/storage/Azure.Storage.DataMovement/src/DisabledTransferCheckpointer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public Task AddNewJobAsync(string transferId, StorageResource source, StorageRes
1616
return Task.CompletedTask;
1717
}
1818

19-
public Task AddNewJobPartAsync(string transferId, int partNumber, Stream headerStream, CancellationToken cancellationToken = default)
19+
public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default)
2020
{
2121
return Task.CompletedTask;
2222
}

sdk/storage/Azure.Storage.DataMovement/src/ITransferCheckpointer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Task AddNewJobAsync(
2020
Task AddNewJobPartAsync(
2121
string transferId,
2222
int partNumber,
23-
Stream headerStream,
23+
JobPartPlanHeader header,
2424
CancellationToken cancellationToken = default);
2525

2626
Task<bool> IsEnumerationCompleteAsync(

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -473,16 +473,11 @@ public async virtual Task CleanupAbortedJobPartAsync()
473473
/// </summary>
474474
public async virtual Task AddJobPartToCheckpointerAsync()
475475
{
476-
JobPartPlanHeader header = this.ToJobPartPlanHeader();
477-
using (Stream stream = new MemoryStream())
478-
{
479-
header.Serialize(stream);
480-
await _checkpointer.AddNewJobPartAsync(
481-
transferId: _dataTransfer.Id,
482-
partNumber: PartNumber,
483-
headerStream: stream,
484-
cancellationToken: _cancellationToken).ConfigureAwait(false);
485-
}
476+
await _checkpointer.AddNewJobPartAsync(
477+
transferId: _dataTransfer.Id,
478+
partNumber: PartNumber,
479+
header: this.ToJobPartPlanHeader(),
480+
cancellationToken: _cancellationToken).ConfigureAwait(false);
486481
}
487482

488483
internal async virtual Task SetCheckpointerStatusAsync()

sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
3838
string checkpointerPath,
3939
string id,
4040
int jobPart,
41-
Stream headerStream,
41+
JobPartPlanHeader header,
4242
CancellationToken cancellationToken = default)
4343
{
4444
Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath));
4545
Argument.AssertNotNullOrEmpty(id, nameof(id));
4646
Argument.AssertNotNull(jobPart, nameof(jobPart));
47-
Argument.AssertNotNull(headerStream, nameof(headerStream));
47+
Argument.AssertNotNull(header, nameof(header));
4848

4949
JobPartPlanFileName fileName = new JobPartPlanFileName(checkpointerPath: checkpointerPath, id: id, jobPartNumber: jobPart);
50-
return await CreateJobPartPlanFileAsync(fileName, headerStream, cancellationToken).ConfigureAwait(false);
50+
return await CreateJobPartPlanFileAsync(fileName, header, cancellationToken).ConfigureAwait(false);
5151
}
5252

5353
public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
5454
JobPartPlanFileName fileName,
55-
Stream headerStream,
55+
JobPartPlanHeader header,
5656
CancellationToken cancellationToken = default)
5757
{
5858
JobPartPlanFile result = new JobPartPlanFile()
@@ -63,8 +63,11 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
6363
try
6464
{
6565
using (FileStream fileStream = File.Create(result.FileName.ToString()))
66+
using (MemoryStream ms = new())
6667
{
67-
await headerStream.CopyToAsync(
68+
header.Serialize(ms);
69+
ms.Position = 0;
70+
await ms.CopyToAsync(
6871
fileStream,
6972
DataMovementConstants.DefaultStreamCopyBufferSize,
7073
cancellationToken).ConfigureAwait(false);

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,13 @@ public override async Task AddNewJobAsync(
104104
public override async Task AddNewJobPartAsync(
105105
string transferId,
106106
int partNumber,
107-
Stream headerStream,
107+
JobPartPlanHeader header,
108108
CancellationToken cancellationToken = default)
109109
{
110110
Argument.AssertNotNullOrEmpty(transferId, nameof(transferId));
111111
Argument.AssertNotNull(partNumber, nameof(partNumber));
112-
Argument.AssertNotNull(headerStream, nameof(headerStream));
112+
Argument.AssertNotNull(header, nameof(header));
113113
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
114-
headerStream.Position = 0;
115114

116115
if (!_transferStates.ContainsKey(transferId))
117116
{
@@ -124,7 +123,7 @@ public override async Task AddNewJobPartAsync(
124123
_pathToCheckpointer,
125124
transferId,
126125
partNumber,
127-
headerStream,
126+
header,
128127
cancellationToken).ConfigureAwait(false);
129128

130129
// Add the job part into the current state

sdk/storage/Azure.Storage.DataMovement/src/SerializerTransferCheckpointer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ public abstract Task AddNewJobAsync(
4545
/// </summary>
4646
/// <param name="transferId">The transfer ID.</param>
4747
/// <param name="partNumber">The job part number.</param>
48-
/// <param name="headerStream">A <see cref="Stream"/> to the job part plan header.</param>
48+
/// <param name="header">A <see cref="Stream"/> to the job part plan header.</param>
4949
/// <param name="cancellationToken">
5050
/// Optional <see cref="CancellationToken"/> to propagate
5151
/// notifications that the operation should be canceled.
5252
/// </param>
5353
public abstract Task AddNewJobPartAsync(
5454
string transferId,
5555
int partNumber,
56-
Stream headerStream,
56+
JobPartPlanHeader header,
5757
CancellationToken cancellationToken = default);
5858

5959
/// <summary>

sdk/storage/Azure.Storage.DataMovement/src/TransferItemFailedEventArgs.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ namespace Azure.Storage.DataMovement
1313
public class TransferItemFailedEventArgs : DataTransferEventArgs
1414
{
1515
/// <summary>
16-
/// Gets the <see cref="StorageResourceItem"/> that was the source resource for the transfer.
16+
/// Gets the <see cref="StorageResource"/> that was the source resource for the transfer.
1717
/// </summary>
18-
public StorageResourceItem SourceResource { get; }
18+
public StorageResource SourceResource { get; }
1919

2020
/// <summary>
21-
/// Gets the <see cref="StorageResourceItem"/> that was the destination resource for the transfer.
21+
/// Gets the <see cref="StorageResource"/> that was the destination resource for the transfer.
2222
/// </summary>
23-
public StorageResourceItem DestinationResource { get; }
23+
public StorageResource DestinationResource { get; }
2424

2525
/// <summary>
2626
/// Gets the <see cref="Exception"/> that was thrown during the job.
@@ -53,8 +53,8 @@ public class TransferItemFailedEventArgs : DataTransferEventArgs
5353
/// </exception>
5454
public TransferItemFailedEventArgs(
5555
string transferId,
56-
StorageResourceItem sourceResource,
57-
StorageResourceItem destinationResource,
56+
StorageResource sourceResource,
57+
StorageResource destinationResource,
5858
Exception exception,
5959
bool isRunningSynchronously,
6060
CancellationToken cancellationToken)

0 commit comments

Comments
 (0)