Skip to content

Commit 4487c9b

Browse files
authored
[Storage] [DataMovement] Added support for TransferManager.GetTransfers (Azure#36014)
* Added support for TransferManager.GetTransfers * WIP * Fixed issue with instantiating empty checkpointer * Fix TransferManager which is populated by saved transfer state from the checkpointer * PR comments * Fixed download transfer tests pause and resume and undid api change of datatransfer object * Undo reneabled pause resume tests
1 parent bb676d8 commit 4487c9b

17 files changed

+836
-122
lines changed

sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66
- `TransferManager` new API `PauseAllRunningTransfersAsync`.
7+
- Added support for `TransferManager.GetTransfers`, to retrieve the list of transfers in the `TransferManager`.
78

89
### Breaking Changes
910
- [BREAKING CHANGE] Altered API signatures on `TransferManager` and `DataTransfer` for pausing.

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public partial class TransferManager : System.IAsyncDisposable
117117
{
118118
protected TransferManager() { }
119119
public TransferManager(Azure.Storage.DataMovement.TransferManagerOptions options = null) { }
120+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(Azure.Storage.DataMovement.StorageTransferStatus[] filterByStatus = null) { throw null; }
120121
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(Azure.Storage.DataMovement.DataTransfer transfer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
121122
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(string transferId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
122123
public virtual System.Threading.Tasks.Task<Azure.Storage.DataMovement.DataTransfer> StartTransferAsync(Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, Azure.Storage.DataMovement.Models.TransferOptions transferOptions = null) { throw null; }

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public partial class TransferManager : System.IAsyncDisposable
117117
{
118118
protected TransferManager() { }
119119
public TransferManager(Azure.Storage.DataMovement.TransferManagerOptions options = null) { }
120+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(Azure.Storage.DataMovement.StorageTransferStatus[] filterByStatus = null) { throw null; }
120121
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(Azure.Storage.DataMovement.DataTransfer transfer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
121122
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(string transferId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
122123
public virtual System.Threading.Tasks.Task<Azure.Storage.DataMovement.DataTransfer> StartTransferAsync(Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, Azure.Storage.DataMovement.Models.TransferOptions transferOptions = null) { throw null; }

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.1.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public partial class TransferManager : System.IAsyncDisposable
117117
{
118118
protected TransferManager() { }
119119
public TransferManager(Azure.Storage.DataMovement.TransferManagerOptions options = null) { }
120+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(Azure.Storage.DataMovement.StorageTransferStatus[] filterByStatus = null) { throw null; }
120121
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(Azure.Storage.DataMovement.DataTransfer transfer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
121122
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(string transferId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
122123
public virtual System.Threading.Tasks.Task<Azure.Storage.DataMovement.DataTransfer> StartTransferAsync(Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, Azure.Storage.DataMovement.Models.TransferOptions transferOptions = null) { throw null; }

sdk/storage/Azure.Storage.DataMovement/src/JobPlanExtensions.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.IO;
66
using System.IO.MemoryMappedFiles;
7+
using System.Runtime.InteropServices.ComTypes;
78
using System.Threading.Tasks;
89
using Azure.Storage.DataMovement.Models.JobPlan;
910

@@ -290,6 +291,19 @@ internal static JobPartPlanHeader GetJobPartPlanHeader(this JobPartPlanFileName
290291
/// <param name="header">The header which holds the state of the job when it was stopped/paused.</param>
291292
internal static void VerifyJobPartPlanHeader(this JobPartInternal jobPart, JobPartPlanHeader header)
292293
{
294+
// Check schema version
295+
string schemaVersion = header.Version;
296+
if (!DataMovementConstants.PlanFile.SchemaVersion.Equals(schemaVersion))
297+
{
298+
throw Errors.MismatchSchemaVersionHeader(schemaVersion);
299+
}
300+
301+
// Check transfer id
302+
if (!header.TransferId.Equals(jobPart._dataTransfer.Id))
303+
{
304+
throw Errors.MismatchTransferId(jobPart._dataTransfer.Id, header.TransferId);
305+
}
306+
293307
// Check source path
294308
string passedSourcePath;
295309
if (jobPart._sourceResource.CanProduceUri == ProduceUriType.ProducesUri)
@@ -310,7 +324,7 @@ internal static void VerifyJobPartPlanHeader(this JobPartInternal jobPart, JobPa
310324
throw Errors.MismatchResumeTransferArguments(nameof(header.SourcePath), header.SourcePath, passedSourcePath);
311325
}
312326

313-
// Check destinationPath
327+
// Check destination path
314328
string passedDestinationPath;
315329
if (jobPart._destinationResource.CanProduceUri == ProduceUriType.ProducesUri)
316330
{

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ internal class LocalTransferCheckpointer : TransferCheckpointer
3232
/// <param name="folderPath">Path to the folder containing the checkpointing information to resume from.</param>
3333
public LocalTransferCheckpointer(string folderPath)
3434
{
35+
_transferStates = new Dictionary<string, Dictionary<int, JobPartPlanFile>>();
3536
if (string.IsNullOrEmpty(folderPath))
3637
{
3738
_pathToCheckpointer = Path.Combine(Environment.CurrentDirectory, DataMovementConstants.DefaultCheckpointerPath);
@@ -48,8 +49,8 @@ public LocalTransferCheckpointer(string folderPath)
4849
else
4950
{
5051
_pathToCheckpointer = folderPath;
52+
InitializeExistingCheckpointer();
5153
}
52-
_transferStates = new Dictionary<string, Dictionary<int, JobPartPlanFile>>();
5354
}
5455

5556
/// <inheritdoc/>
@@ -109,7 +110,7 @@ public override async Task AddNewJobPartAsync(
109110
}
110111

111112
/// <inheritdoc/>
112-
public override async Task AddExistingJobAsync(
113+
public override Task AddExistingJobAsync(
113114
string transferId,
114115
CancellationToken cancellationToken = default)
115116
{
@@ -119,9 +120,9 @@ public override async Task AddExistingJobAsync(
119120
{
120121
// Keep track of the correlating job part plan files
121122
List<JobPartPlanFileName> fileNames = new List<JobPartPlanFileName>();
122-
123-
// Search for existing plan files with the correlating transfer id
124123
string searchPattern = string.Concat(transferId, '*');
124+
125+
// Enumerate all the job parts with the transfer id
125126
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer, searchPattern, SearchOption.TopDirectoryOnly)
126127
.Where(f => Path.HasExtension(string.Concat(
127128
DataMovementConstants.PlanFile.FileExtension,
@@ -140,32 +141,21 @@ public override async Task AddExistingJobAsync(
140141
}
141142

142143
// Verify each existing file and then add it to our transfer states.
143-
// TODO: move verification to transfer manager to prevent from opening job plan file
144-
// more than once.
145144
Dictionary<int, JobPartPlanFile> jobParts = new Dictionary<int, JobPartPlanFile>();
146145
foreach (JobPartPlanFileName partFileName in fileNames)
147146
{
148147
// Grab the header info
149148
JobPartPlanHeader header = partFileName.GetJobPartPlanHeader();
150149

151-
// Verify the job part plan header
152-
CheckInputWithHeader(transferId, header);
153-
154150
// Add to list of job parts
155-
JobPartPlanFile jobFile;
156-
using (Stream stream = new MemoryStream())
157-
{
158-
header.Serialize(stream);
159-
jobFile = await JobPartPlanFile.CreateJobPartPlanFileAsync(
160-
fileName: partFileName,
161-
headerStream: stream).ConfigureAwait(false);
162-
}
151+
JobPartPlanFile jobFile = JobPartPlanFile.CreateExistingPartPlanFile(partFileName);
163152
jobParts.Add(partFileName.JobPartNumber, jobFile);
164153
}
165154

166155
// Add new transfer id to the list of memory mapped files
167156
_transferStates.Add(transferId, jobParts);
168157
}
158+
return Task.CompletedTask;
169159
}
170160

171161
/// <inheritdoc/>
@@ -397,5 +387,49 @@ public override async Task SetJobPartTransferStatusAsync(
397387
throw Errors.MissingTransferIdCheckpointer(transferId);
398388
}
399389
}
390+
391+
/// <summary>
392+
/// Takes the path of the checkpointer reads all the files in the top directory level
393+
/// and populates the _transferStates
394+
/// </summary>
395+
private void InitializeExistingCheckpointer()
396+
{
397+
// Retrieve all valid checkpointer files stored in the checkpointer path.
398+
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer, "*", SearchOption.TopDirectoryOnly)
399+
.Where(f => Path.HasExtension(string.Concat(
400+
DataMovementConstants.PlanFile.FileExtension,
401+
DataMovementConstants.PlanFile.SchemaVersion))))
402+
{
403+
// Ensure each file has the correct format
404+
if (JobPartPlanFileName.TryParseJobPartPlanFileName(path, out JobPartPlanFileName partPlanFileName))
405+
{
406+
// Check if the transfer Id already exists
407+
if (_transferStates.ContainsKey(partPlanFileName.Id))
408+
{
409+
// If the transfer Id already exists, then add the job part plan file
410+
// with the rest of the job part plan files in the respective
411+
// transfer id.
412+
_transferStates[partPlanFileName.Id].Add(
413+
partPlanFileName.JobPartNumber,
414+
JobPartPlanFile.CreateExistingPartPlanFile(partPlanFileName));
415+
}
416+
else
417+
{
418+
// If the transfer id has not been seen yet, add it and add
419+
// the job part plan file as well.
420+
Dictionary<int, JobPartPlanFile> newTransfer = new Dictionary<int, JobPartPlanFile>
421+
{
422+
{
423+
partPlanFileName.JobPartNumber,
424+
JobPartPlanFile.CreateExistingPartPlanFile(partPlanFileName)
425+
}
426+
};
427+
_transferStates.Add(
428+
partPlanFileName.Id,
429+
newTransfer);
430+
}
431+
}
432+
}
433+
}
400434
}
401435
}

sdk/storage/Azure.Storage.DataMovement/src/Models/JobPlan/JobPartPlanFile.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
7272
return result;
7373
}
7474

75+
public static JobPartPlanFile CreateExistingPartPlanFile(
76+
JobPartPlanFileName fileName)
77+
{
78+
return new JobPartPlanFile()
79+
{
80+
FileName = fileName
81+
};
82+
}
83+
7584
public void Dispose()
7685
{
7786
WriteLock.Dispose();

sdk/storage/Azure.Storage.DataMovement/src/Models/JobPlan/JobPartPlanHeader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ public static JobPartPlanHeader Deserialize(Stream stream)
611611
byte[] ttlAfterCompletionBuffer = reader.ReadBytes(DataMovementConstants.PlanFile.LongSizeInBytes);
612612
DateTimeOffset ttlAfterCompletion = new DateTimeOffset(ttlAfterCompletionBuffer.ToLong(), new TimeSpan(0, 0, 0));
613613

614-
// FromTo
614+
// JobPlanOperation
615615
byte fromToByte = reader.ReadByte();
616616
JobPlanOperation fromTo = (JobPlanOperation)fromToByte;
617617

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransfer.cs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,23 @@ public class DataTransfer
3838
internal DataTransferState _state;
3939

4040
/// <summary>
41-
/// Only to be created internally by the transfer manager.
41+
/// For mocking.
4242
/// </summary>
4343
internal DataTransfer()
4444
{
45-
_state = new DataTransferState();
4645
}
4746

4847
/// <summary>
49-
/// For mocking
48+
/// Constructing a DataTransfer object.
5049
/// </summary>
51-
/// <param name="status"></param>
52-
internal DataTransfer(StorageTransferStatus status)
50+
/// <param name="id">The transfer ID of the transfer object.</param>
51+
/// <param name="status">The Transfer Status of the Transfer. See <see cref="StorageTransferStatus"/>.</param>
52+
internal DataTransfer(
53+
string id,
54+
StorageTransferStatus status = StorageTransferStatus.Queued)
5355
{
54-
_state = new DataTransferState(status);
55-
}
56-
57-
/// <summary>
58-
/// Only to be created internally by the transfer manager when someone
59-
/// provides a valid job plan file to resume from.
60-
/// </summary>
61-
internal DataTransfer(string id, long bytesTransferred = 0)
62-
{
63-
_state = new DataTransferState(id, bytesTransferred);
56+
Argument.AssertNotNullOrEmpty(id, nameof(id));
57+
_state = new DataTransferState(id, status);
6458
}
6559

6660
/// <summary>

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferState.cs

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,15 @@ internal class DataTransferState
2929
public StorageTransferStatus Status => _status;
3030

3131
/// <summary>
32-
/// constructor
33-
/// </summary>
34-
public DataTransferState()
35-
: this(StorageTransferStatus.Queued)
36-
{
37-
}
38-
39-
/// <summary>
40-
/// constructor for mocking
32+
/// Constructor to resume current jobs
4133
/// </summary>
42-
public DataTransferState(StorageTransferStatus status)
34+
/// <param name="id">The transfer ID of the transfer object.</param>
35+
/// <param name="status">The Transfer Status of the Transfer. See <see cref="StorageTransferStatus"/>.</param>
36+
public DataTransferState(
37+
string id = default,
38+
StorageTransferStatus status = StorageTransferStatus.Queued)
4339
{
44-
_id = Guid.NewGuid().ToString();
40+
_id = string.IsNullOrEmpty(id) ? Guid.NewGuid().ToString() : id;
4541
_status = status;
4642
_currentTransferredBytes = 0;
4743
CompletionSource = new TaskCompletionSource<StorageTransferStatus>(
@@ -56,20 +52,6 @@ public DataTransferState(StorageTransferStatus status)
5652
CancellationTokenSource = new CancellationTokenSource();
5753
}
5854

59-
/// <summary>
60-
/// Constructor to resume current jobs
61-
/// </summary>
62-
public DataTransferState(string id, long bytesTransferred)
63-
{
64-
_id = id;
65-
_status = StorageTransferStatus.Queued;
66-
_currentTransferredBytes = bytesTransferred;
67-
CompletionSource = new TaskCompletionSource<StorageTransferStatus>(
68-
_status,
69-
TaskCreationOptions.RunContinuationsAsynchronously);
70-
CancellationTokenSource = new CancellationTokenSource();
71-
}
72-
7355
/// <summary>
7456
/// Gets the identifier of the transfer state
7557
/// </summary>

0 commit comments

Comments
 (0)