Skip to content

Commit 2bf298a

Browse files
[Storage][DataMovement] Containerize single item transfers to simplify internal code (Azure#49107)
1 parent 4cea261 commit 2bf298a

15 files changed

+170
-449
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ await checkpointer.AddNewJobAsync(
115115
sourceMock.Object,
116116
destinationMock.Object,
117117
ServiceToServiceJobPart.CreateJobPartAsync,
118-
ServiceToServiceJobPart.CreateJobPartAsync,
119118
new TransferOptions(),
120119
checkpointer,
121120
TransferErrorMode.StopOnAnyFailure,
@@ -171,7 +170,6 @@ await checkpointer.AddNewJobAsync(
171170
sourceMock.Object,
172171
destinationMock.Object,
173172
ServiceToServiceJobPart.CreateJobPartAsync,
174-
ServiceToServiceJobPart.CreateJobPartAsync,
175173
new TransferOptions(),
176174
checkpointer,
177175
TransferErrorMode.StopOnAnyFailure,

sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs

Lines changed: 11 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System.Threading.Tasks;
55
using System.Threading;
6-
using System.IO;
76
using System.Buffers;
87
using Azure.Core.Pipeline;
98
using System;
@@ -51,26 +50,20 @@ internal JobBuilder(
5150
bool resumeJob,
5251
CancellationToken cancellationToken)
5352
{
54-
JobBuilder.ValidateTransferOptions(transferOptions);
53+
ValidateTransferOptions(transferOptions);
5554

5655
TransferOperation transferOperation = new(id: transferId);
5756
TransferJobInternal transferJobInternal;
5857

59-
// Single transfer
58+
// For single item transfers, wrap in single item container
6059
if (sourceResource is StorageResourceItem sourceItem &&
6160
destinationResource is StorageResourceItem destationItem)
6261
{
63-
transferJobInternal = await BuildSingleTransferJob(
64-
sourceItem,
65-
destationItem,
66-
transferOptions,
67-
checkpointer,
68-
transferOperation,
69-
resumeJob,
70-
cancellationToken).ConfigureAwait(false);
62+
sourceResource = new SingleItemStorageResourceContainer(sourceItem);
63+
destinationResource = new SingleItemStorageResourceContainer(destationItem);
7164
}
72-
// Container transfer
73-
else if (sourceResource is StorageResourceContainer sourceContainer &&
65+
66+
if (sourceResource is StorageResourceContainer sourceContainer &&
7467
destinationResource is StorageResourceContainer destinationContainer)
7568
{
7669
transferJobInternal = await BuildContainerTransferJob(
@@ -91,69 +84,6 @@ internal JobBuilder(
9184
return (transferOperation, transferJobInternal);
9285
}
9386

94-
private async Task<TransferJobInternal> BuildSingleTransferJob(
95-
StorageResourceItem sourceResource,
96-
StorageResourceItem destinationResource,
97-
TransferOptions transferOptions,
98-
ITransferCheckpointer checkpointer,
99-
TransferOperation transferOperation,
100-
bool resumeJob,
101-
CancellationToken cancellationToken)
102-
{
103-
TransferJobInternal.CreateJobPartSingleAsync single;
104-
TransferJobInternal.CreateJobPartMultiAsync multi;
105-
Func<TransferJobInternal, JobPartPlanHeader, StorageResourceItem, StorageResourceItem, JobPartInternal> rehydrate;
106-
if (sourceResource.IsLocalResource() && !destinationResource.IsLocalResource())
107-
{
108-
single = StreamToUriJobPart.CreateJobPartAsync;
109-
multi = StreamToUriJobPart.CreateJobPartAsync;
110-
rehydrate = DataMovementExtensions.ToStreamToUriJobPartAsync;
111-
}
112-
else if (!sourceResource.IsLocalResource() && destinationResource.IsLocalResource())
113-
{
114-
single = UriToStreamJobPart.CreateJobPartAsync;
115-
multi = UriToStreamJobPart.CreateJobPartAsync;
116-
rehydrate = DataMovementExtensions.ToUriToStreamJobPartAsync;
117-
}
118-
else if (!sourceResource.IsLocalResource() && !destinationResource.IsLocalResource())
119-
{
120-
single = ServiceToServiceJobPart.CreateJobPartAsync;
121-
multi = ServiceToServiceJobPart.CreateJobPartAsync;
122-
rehydrate = DataMovementExtensions.ToServiceToServiceJobPartAsync;
123-
}
124-
else
125-
{
126-
throw Errors.InvalidSourceDestinationParams();
127-
}
128-
129-
TransferJobInternal job = new(
130-
transferOperation: transferOperation,
131-
sourceResource: sourceResource,
132-
destinationResource: destinationResource,
133-
single,
134-
multi,
135-
transferOptions: transferOptions,
136-
checkpointer: checkpointer,
137-
errorHandling: _errorHandling,
138-
arrayPool: _arrayPool,
139-
clientDiagnostics: ClientDiagnostics);
140-
141-
int jobPartCount = await checkpointer.GetCurrentJobPartCountAsync(
142-
transferId: transferOperation.Id,
143-
cancellationToken: cancellationToken).ConfigureAwait(false);
144-
if (resumeJob && jobPartCount > 0)
145-
{
146-
JobPartPlanHeader part = await checkpointer.GetJobPartAsync(transferOperation.Id, partNumber: 0).ConfigureAwait(false);
147-
job.AppendJobPart(
148-
rehydrate(
149-
job,
150-
part,
151-
sourceResource,
152-
destinationResource));
153-
}
154-
return job;
155-
}
156-
15787
private async Task<TransferJobInternal> BuildContainerTransferJob(
15888
StorageResourceContainer sourceResource,
15989
StorageResourceContainer destinationResource,
@@ -163,25 +93,21 @@ private async Task<TransferJobInternal> BuildContainerTransferJob(
16393
bool resumeJob,
16494
CancellationToken cancellationToken)
16595
{
166-
TransferJobInternal.CreateJobPartSingleAsync single;
167-
TransferJobInternal.CreateJobPartMultiAsync multi;
96+
TransferJobInternal.CreateJobPartAsync createPart;
16897
Func<TransferJobInternal, JobPartPlanHeader, StorageResourceContainer, StorageResourceContainer, JobPartInternal> rehydrate;
16998
if (sourceResource.IsLocalResource() && !destinationResource.IsLocalResource())
17099
{
171-
single = StreamToUriJobPart.CreateJobPartAsync;
172-
multi = StreamToUriJobPart.CreateJobPartAsync;
100+
createPart = StreamToUriJobPart.CreateJobPartAsync;
173101
rehydrate = DataMovementExtensions.ToStreamToUriJobPartAsync;
174102
}
175103
else if (!sourceResource.IsLocalResource() && destinationResource.IsLocalResource())
176104
{
177-
single = UriToStreamJobPart.CreateJobPartAsync;
178-
multi = UriToStreamJobPart.CreateJobPartAsync;
105+
createPart = UriToStreamJobPart.CreateJobPartAsync;
179106
rehydrate = DataMovementExtensions.ToUriToStreamJobPartAsync;
180107
}
181108
else if (!sourceResource.IsLocalResource() && !destinationResource.IsLocalResource())
182109
{
183-
single = ServiceToServiceJobPart.CreateJobPartAsync;
184-
multi = ServiceToServiceJobPart.CreateJobPartAsync;
110+
createPart = ServiceToServiceJobPart.CreateJobPartAsync;
185111
rehydrate = DataMovementExtensions.ToServiceToServiceJobPartAsync;
186112
}
187113
else
@@ -193,8 +119,7 @@ private async Task<TransferJobInternal> BuildContainerTransferJob(
193119
transferOperation: transferOperation,
194120
sourceResource: sourceResource,
195121
destinationResource: destinationResource,
196-
single,
197-
multi,
122+
createPart,
198123
transferOptions: transferOptions,
199124
checkpointer: checkpointer,
200125
errorHandling: _errorHandling,

sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,6 @@ internal class ServiceToServiceJobPart : JobPartInternal, IAsyncDisposable
2222
/// </summary>
2323
private CommitChunkHandler _commitBlockHandler;
2424

25-
/// <summary>
26-
/// Creating job part based on a single transfer job
27-
/// </summary>
28-
private ServiceToServiceJobPart(TransferJobInternal job, int partNumber)
29-
: base(transferOperation: job._transferOperation,
30-
partNumber: partNumber,
31-
sourceResource: job._sourceResource,
32-
destinationResource: job._destinationResource,
33-
transferChunkSize: job._maximumTransferChunkSize,
34-
initialTransferSize: job._initialTransferSize,
35-
errorHandling: job._errorMode,
36-
createMode: job._creationPreference,
37-
checkpointer: job._checkpointer,
38-
progressTracker: job._progressTracker,
39-
arrayPool: job.UploadArrayPool,
40-
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
41-
statusEventHandler: job.TransferStatusEventHandler,
42-
failedEventHandler: job.TransferFailedEventHandler,
43-
skippedEventHandler: job.TransferSkippedEventHandler,
44-
singleTransferEventHandler: job.TransferItemCompletedEventHandler,
45-
clientDiagnostics: job.ClientDiagnostics,
46-
cancellationToken: job._cancellationToken)
47-
{
48-
}
49-
5025
/// <summary>
5126
/// Creating transfer job based on a storage resource created from listing.
5227
/// </summary>
@@ -119,19 +94,6 @@ public async ValueTask DisposeAsync()
11994
await DisposeHandlersAsync().ConfigureAwait(false);
12095
}
12196

122-
/// <summary>
123-
/// Called when creating a job part from a single transfer.
124-
/// </summary>
125-
public static async Task<JobPartInternal> CreateJobPartAsync(
126-
TransferJobInternal job,
127-
int partNumber)
128-
{
129-
// Create Job Part file as we're initializing the job part
130-
ServiceToServiceJobPart part = new ServiceToServiceJobPart(job, partNumber);
131-
await part.AddJobPartToCheckpointerAsync().ConfigureAwait(false);
132-
return part;
133-
}
134-
13597
/// <summary>
13698
/// Called when creating a job part from a container transfer.
13799
/// </summary>

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs

Lines changed: 26 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -22,103 +22,22 @@ internal static StorageResourceItemProperties ToStorageResourceProperties(this F
2222
};
2323
}
2424

25-
public static StreamToUriJobPart ToStreamToUriJobPartAsync(
26-
this TransferJobInternal baseJob,
27-
JobPartPlanHeader header,
28-
StorageResourceItem sourceResource,
29-
StorageResourceItem destinationResource)
30-
{
31-
// Override header values if options were specified by user.
32-
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
33-
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;
34-
StorageResourceCreationMode createPreference =
35-
baseJob._creationPreference != StorageResourceCreationMode.Default ?
36-
baseJob._creationPreference : header.CreatePreference;
37-
38-
StreamToUriJobPart jobPart = StreamToUriJobPart.CreateJobPartFromCheckpoint(
39-
job: baseJob,
40-
partNumber: Convert.ToInt32(header.PartNumber),
41-
sourceResource: sourceResource,
42-
destinationResource: destinationResource,
43-
jobPartStatus: header.JobPartStatus,
44-
initialTransferSize: initialTransferSize,
45-
transferChunkSize: transferChunkSize,
46-
createPreference: createPreference);
47-
48-
jobPart.VerifyJobPartPlanHeader(header);
49-
50-
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
51-
return jobPart;
52-
}
53-
54-
public static ServiceToServiceJobPart ToServiceToServiceJobPartAsync(
55-
this TransferJobInternal baseJob,
56-
JobPartPlanHeader header,
57-
StorageResourceItem sourceResource,
58-
StorageResourceItem destinationResource)
59-
{
60-
// Override header values if options were specified by user.
61-
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
62-
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;
63-
StorageResourceCreationMode createPreference =
64-
baseJob._creationPreference != StorageResourceCreationMode.Default ?
65-
baseJob._creationPreference : header.CreatePreference;
66-
67-
ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint(
68-
job: baseJob,
69-
partNumber: Convert.ToInt32(header.PartNumber),
70-
sourceResource: sourceResource,
71-
destinationResource: destinationResource,
72-
jobPartStatus: header.JobPartStatus,
73-
initialTransferSize: initialTransferSize,
74-
transferChunkSize: transferChunkSize,
75-
createPreference: createPreference);
76-
77-
jobPart.VerifyJobPartPlanHeader(header);
78-
79-
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
80-
return jobPart;
81-
}
82-
83-
public static UriToStreamJobPart ToUriToStreamJobPartAsync(
84-
this TransferJobInternal baseJob,
85-
JobPartPlanHeader header,
86-
StorageResourceItem sourceResource,
87-
StorageResourceItem destinationResource)
88-
{
89-
// Override header values if options were specified by user.
90-
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
91-
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;
92-
StorageResourceCreationMode createPreference =
93-
baseJob._creationPreference != StorageResourceCreationMode.Default ?
94-
baseJob._creationPreference : header.CreatePreference;
95-
96-
UriToStreamJobPart jobPart = UriToStreamJobPart.CreateJobPartFromCheckpoint(
97-
job: baseJob,
98-
partNumber: Convert.ToInt32(header.PartNumber),
99-
sourceResource: sourceResource,
100-
destinationResource: destinationResource,
101-
jobPartStatus: header.JobPartStatus,
102-
initialTransferSize: initialTransferSize,
103-
transferChunkSize: transferChunkSize,
104-
createPreference: createPreference);
105-
106-
jobPart.VerifyJobPartPlanHeader(header);
107-
108-
// TODO: When enabling resume chunked upload Add each transfer to the CommitChunkHandler
109-
return jobPart;
110-
}
111-
11225
public static StreamToUriJobPart ToStreamToUriJobPartAsync(
11326
this TransferJobInternal baseJob,
11427
JobPartPlanHeader header,
11528
StorageResourceContainer sourceResource,
11629
StorageResourceContainer destinationResource)
11730
{
31+
// If saved path equals the cotnainer Uri, its a single item trasfer.
32+
// Set the resource name to the full Uri.
11833
string childSourcePath = header.SourcePath;
119-
string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
34+
string childSourceName = header.SourcePath == sourceResource.Uri.AbsoluteUri.ToString() ?
35+
childSourcePath :
36+
childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
12037
string childDestinationPath = header.DestinationPath;
121-
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
38+
string childDestinationName = header.DestinationPath == destinationResource.Uri.AbsoluteUri.ToString() ?
39+
childDestinationPath :
40+
childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
12241
// Override header values if options were specified by user.
12342
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
12443
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;
@@ -148,8 +67,16 @@ public static ServiceToServiceJobPart ToServiceToServiceJobPartAsync(
14867
StorageResourceContainer sourceResource,
14968
StorageResourceContainer destinationResource)
15069
{
70+
// If saved path equals the cotnainer Uri, its a single item trasfer, so the resource name
71+
// does not matter. Just set it to the path.
15172
string childSourcePath = header.SourcePath;
73+
string childSourceName = header.SourcePath == sourceResource.Uri.AbsoluteUri.ToString() ?
74+
childSourcePath :
75+
childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
15276
string childDestinationPath = header.DestinationPath;
77+
string childDestinationName = header.DestinationPath == destinationResource.Uri.AbsoluteUri.ToString() ?
78+
childDestinationPath :
79+
childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
15380
// Override header values if options were specified by user.
15481
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
15582
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;
@@ -160,8 +87,8 @@ public static ServiceToServiceJobPart ToServiceToServiceJobPartAsync(
16087
ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint(
16188
job: baseJob,
16289
partNumber: Convert.ToInt32(header.PartNumber),
163-
sourceResource: sourceResource.GetStorageResourceReference(childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1), header.SourceTypeId),
164-
destinationResource: destinationResource.GetStorageResourceReference(childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1), header.DestinationTypeId),
90+
sourceResource: sourceResource.GetStorageResourceReference(childSourceName, header.SourceTypeId),
91+
destinationResource: destinationResource.GetStorageResourceReference(childDestinationName, header.DestinationTypeId),
16592
jobPartStatus: header.JobPartStatus,
16693
initialTransferSize: initialTransferSize,
16794
transferChunkSize: transferChunkSize,
@@ -179,11 +106,16 @@ public static UriToStreamJobPart ToUriToStreamJobPartAsync(
179106
StorageResourceContainer sourceResource,
180107
StorageResourceContainer destinationResource)
181108
{
182-
// Apply credentials to the saved transfer job path
109+
// If saved path equals the cotnainer Uri, its a single item trasfer, so the resource name
110+
// does not matter. Just set it to the path.
183111
string childSourcePath = header.SourcePath;
184-
string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
112+
string childSourceName = header.SourcePath == sourceResource.Uri.AbsoluteUri.ToString() ?
113+
childSourcePath :
114+
childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
185115
string childDestinationPath = header.DestinationPath;
186-
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
116+
string childDestinationName = header.DestinationPath == destinationResource.Uri.AbsoluteUri.ToString() ?
117+
childDestinationPath :
118+
childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
187119
// Override header values if options were specified by user.
188120
long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize;
189121
long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize;

0 commit comments

Comments
 (0)