Skip to content

Commit 654eebc

Browse files
[Storage][DataMovement] Added support for disabling the transfer checkpointer (Azure#46286)
1 parent 26f9eec commit 654eebc

17 files changed

+400
-166
lines changed

sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
## 12.0.0-beta.6 (Unreleased)
44

55
### Features Added
6+
- Added support to disable checkpointing via `TransferCheckpointStoreOptions.Disabled`.
67

78
### Breaking Changes
9+
- Removed the constructor for `TransferCheckpointStoreOptions` and replaced with a static builder method `Local`.
10+
- Changed `TransferCheckpointStoreOptions.CheckpointerPath` to internal.
811

912
### Bugs Fixed
1013

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ public StorageResourceWriteToOffsetOptions() { }
224224
}
225225
public partial class TransferCheckpointStoreOptions
226226
{
227-
public TransferCheckpointStoreOptions(string localCheckpointerPath) { }
228-
public string CheckpointerPath { get { throw null; } }
227+
internal TransferCheckpointStoreOptions() { }
228+
public static Azure.Storage.DataMovement.TransferCheckpointStoreOptions Disabled() { throw null; }
229+
public static Azure.Storage.DataMovement.TransferCheckpointStoreOptions Local(string localCheckpointerPath) { throw null; }
229230
}
230231
public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
231232
{

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ public StorageResourceWriteToOffsetOptions() { }
224224
}
225225
public partial class TransferCheckpointStoreOptions
226226
{
227-
public TransferCheckpointStoreOptions(string localCheckpointerPath) { }
228-
public string CheckpointerPath { get { throw null; } }
227+
internal TransferCheckpointStoreOptions() { }
228+
public static Azure.Storage.DataMovement.TransferCheckpointStoreOptions Disabled() { throw null; }
229+
public static Azure.Storage.DataMovement.TransferCheckpointStoreOptions Local(string localCheckpointerPath) { throw null; }
229230
}
230231
public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
231232
{

sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ namespace Azure.Storage.DataMovement
1111
{
1212
internal static partial class CheckpointerExtensions
1313
{
14-
internal static TransferCheckpointer GetCheckpointer(this TransferCheckpointStoreOptions options)
14+
internal static TransferCheckpointer BuildCheckpointer(TransferCheckpointStoreOptions options)
1515
{
16-
if (!string.IsNullOrEmpty(options?.CheckpointerPath))
16+
if (options?.Enabled == false)
1717
{
18-
return new LocalTransferCheckpointer(options.CheckpointerPath);
18+
return new DisabledTransferCheckpointer();
1919
}
2020
else
2121
{
22-
// Default TransferCheckpointer
23-
return new LocalTransferCheckpointer(default);
22+
return new LocalTransferCheckpointer(options?.CheckpointerPath);
2423
}
2524
}
2625

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Collections.Generic;
5+
using System.IO;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Azure.Storage.DataMovement
10+
{
11+
internal class DisabledTransferCheckpointer : TransferCheckpointer
12+
{
13+
public DisabledTransferCheckpointer() { }
14+
15+
public override Task AddNewJobAsync(
16+
string transferId,
17+
StorageResource source,
18+
StorageResource destination,
19+
CancellationToken cancellationToken = default)
20+
{
21+
return Task.CompletedTask;
22+
}
23+
24+
public override Task AddNewJobPartAsync(
25+
string transferId,
26+
int partNumber,
27+
Stream headerStream,
28+
CancellationToken cancellationToken = default)
29+
{
30+
return Task.CompletedTask;
31+
}
32+
33+
public override Task<int> CurrentJobPartCountAsync(string transferId, CancellationToken cancellationToken = default)
34+
{
35+
return Task.FromResult(0);
36+
}
37+
38+
public override Task<List<string>> GetStoredTransfersAsync(CancellationToken cancellationToken = default)
39+
{
40+
return Task.FromResult(new List<string>());
41+
}
42+
43+
public override Task<Stream> ReadJobPartPlanFileAsync(
44+
string transferId,
45+
int partNumber,
46+
int offset,
47+
int length,
48+
CancellationToken cancellationToken = default)
49+
{
50+
throw Errors.CheckpointerDisabled("ReadJobPartPlanFileAsync");
51+
}
52+
53+
public override Task<Stream> ReadJobPlanFileAsync(
54+
string transferId,
55+
int offset,
56+
int length,
57+
CancellationToken cancellationToken = default)
58+
{
59+
throw Errors.CheckpointerDisabled("ReadJobPlanFileAsync");
60+
}
61+
62+
public override Task SetJobPartTransferStatusAsync(
63+
string transferId,
64+
int partNumber,
65+
DataTransferStatus status,
66+
CancellationToken cancellationToken = default)
67+
{
68+
return Task.CompletedTask;
69+
}
70+
71+
public override Task SetJobTransferStatusAsync(
72+
string transferId,
73+
DataTransferStatus status,
74+
CancellationToken cancellationToken = default)
75+
{
76+
return Task.CompletedTask;
77+
}
78+
79+
public override Task<bool> TryRemoveStoredTransferAsync(string transferId, CancellationToken cancellationToken = default)
80+
{
81+
return Task.FromResult(false);
82+
}
83+
84+
public override Task WriteToJobPlanFileAsync(
85+
string transferId,
86+
int fileOffset,
87+
byte[] buffer,
88+
int bufferOffset,
89+
int length,
90+
CancellationToken cancellationToken = default)
91+
{
92+
return Task.CompletedTask;
93+
}
94+
}
95+
}

sdk/storage/Azure.Storage.DataMovement/src/JobPlanExtensions.cs

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,66 +2,12 @@
22
// Licensed under the MIT License.
33

44
using System;
5-
using System.IO;
6-
using System.Threading;
7-
using System.Threading.Tasks;
85
using Azure.Storage.DataMovement.JobPlan;
96

107
namespace Azure.Storage.DataMovement
118
{
129
internal static partial class JobPlanExtensions
1310
{
14-
internal static async Task<string> GetHeaderLongValue(
15-
this TransferCheckpointer checkpointer,
16-
string transferId,
17-
int startIndex,
18-
int streamReadLength,
19-
int valueLength,
20-
CancellationToken cancellationToken)
21-
{
22-
string value;
23-
using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync(
24-
transferId: transferId,
25-
partNumber: 0,
26-
offset: startIndex,
27-
length: streamReadLength,
28-
cancellationToken: cancellationToken).ConfigureAwait(false))
29-
{
30-
BinaryReader reader = new BinaryReader(stream);
31-
32-
// Read Path Length
33-
byte[] pathLengthBuffer = reader.ReadBytes(DataMovementConstants.LongSizeInBytes);
34-
long pathLength = pathLengthBuffer.ToLong();
35-
36-
// Read Path
37-
byte[] pathBuffer = reader.ReadBytes(valueLength);
38-
value = pathBuffer.ToString(pathLength);
39-
}
40-
return value;
41-
}
42-
43-
internal static async Task<byte> GetByteValue(
44-
this TransferCheckpointer checkpointer,
45-
string transferId,
46-
int startIndex,
47-
CancellationToken cancellationToken)
48-
{
49-
byte value;
50-
using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync(
51-
transferId: transferId,
52-
partNumber: 0,
53-
offset: startIndex,
54-
length: DataMovementConstants.OneByte,
55-
cancellationToken: cancellationToken).ConfigureAwait(false))
56-
{
57-
BinaryReader reader = new BinaryReader(stream);
58-
59-
// Read Byte
60-
value = reader.ReadByte();
61-
}
62-
return value;
63-
}
64-
6511
internal static JobPlanStatus ToJobPlanStatus(this DataTransferStatus transferStatus)
6612
{
6713
if (transferStatus == default)

sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,5 +128,8 @@ public static ArgumentException NoResourceProviderFound(bool isSource, string pr
128128

129129
public static ArgumentException UnexpectedPropertyType(string propertyName, params string[] expectedTypes)
130130
=> new ArgumentException($"Unexpected property type encountered for storage resource property {propertyName}: {string.Join(",", (string[])expectedTypes)}");
131+
132+
public static InvalidOperationException CheckpointerDisabled(string method)
133+
=> new InvalidOperationException($"Unable to perform {method}. The transfer checkpointer is disabled.");
131134
}
132135
}

sdk/storage/Azure.Storage.DataMovement/src/TransferCheckpointStoreOptions.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,24 @@ namespace Azure.Storage.DataMovement
1010
public class TransferCheckpointStoreOptions
1111
{
1212
/// <summary>
13-
/// The local folder where the checkpoint information will be stored.
13+
/// Whether checkpointing should be enabled or not.
14+
/// </summary>
15+
internal bool Enabled { get; private set; }
16+
17+
/// <summary>
18+
/// The folder where the checkpoint information will be stored.
1419
/// </summary>
15-
public string CheckpointerPath { get; private set; }
20+
internal string CheckpointerPath { get; private set; }
21+
22+
/// <summary>
23+
/// Sets the checkpoint options to disable transfer checkpointing.
24+
/// <para>NOTE: All pause/resume functionality will be disabled.</para>
25+
/// </summary>
26+
/// <returns></returns>
27+
public static TransferCheckpointStoreOptions Disabled()
28+
{
29+
return new TransferCheckpointStoreOptions(false, default);
30+
}
1631

1732
/// <summary>
1833
/// Sets the checkpointer options to use a Local Checkpointer where
@@ -21,14 +36,15 @@ public class TransferCheckpointStoreOptions
2136
/// <param name="localCheckpointerPath">
2237
/// The local folder where the checkpoint information will be stored.
2338
/// </param>
24-
public TransferCheckpointStoreOptions(string localCheckpointerPath)
39+
public static TransferCheckpointStoreOptions Local(string localCheckpointerPath)
2540
{
26-
CheckpointerPath = localCheckpointerPath;
41+
return new TransferCheckpointStoreOptions(true, localCheckpointerPath);
2742
}
2843

29-
internal TransferCheckpointStoreOptions(TransferCheckpointStoreOptions options)
44+
internal TransferCheckpointStoreOptions(bool enabled, string localCheckpointerPath)
3045
{
31-
CheckpointerPath = options.CheckpointerPath;
46+
Enabled = enabled;
47+
CheckpointerPath = localCheckpointerPath;
3248
}
3349
}
3450
}

sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public TransferManager(TransferManagerOptions options = default)
6464
new(ArrayPool<byte>.Shared,
6565
options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure,
6666
new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)),
67-
(options?.CheckpointerOptions != default ? new TransferCheckpointStoreOptions(options.CheckpointerOptions) : default)
68-
?.GetCheckpointer() ?? CreateDefaultCheckpointer(),
67+
CheckpointerExtensions.BuildCheckpointer(options?.CheckpointerOptions),
6968
options?.ResumeProviders != null ? new List<StorageResourceProvider>(options.ResumeProviders) : new(),
7069
default)
7170
{}
@@ -209,6 +208,11 @@ public virtual async Task<List<DataTransfer>> ResumeAllTransfersAsync(
209208
CancellationToken cancellationToken = default)
210209
{
211210
cancellationToken = LinkCancellation(cancellationToken);
211+
if (_checkpointer is DisabledTransferCheckpointer)
212+
{
213+
throw Errors.CheckpointerDisabled("ResumeAllTransfersAsync");
214+
}
215+
212216
List<DataTransfer> transfers = new();
213217
await foreach (DataTransferProperties properties in GetResumableTransfersAsync().ConfigureAwait(false))
214218
{
@@ -235,6 +239,10 @@ public virtual async Task<DataTransfer> ResumeTransferAsync(
235239
cancellationToken = LinkCancellation(cancellationToken);
236240
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
237241
Argument.AssertNotNullOrWhiteSpace(transferId, nameof(transferId));
242+
if (_checkpointer is DisabledTransferCheckpointer)
243+
{
244+
throw Errors.CheckpointerDisabled("ResumeTransferAsync");
245+
}
238246

239247
if (!await _checkpointer.IsResumableAsync(transferId, cancellationToken).ConfigureAwait(false))
240248
{
@@ -392,22 +400,6 @@ private async Task<DataTransfer> BuildAndAddTransferJobAsync(
392400
}
393401
#endregion
394402

395-
/// <summary>
396-
/// Returns a default checkpointer if not specified by the user already.
397-
///
398-
/// By default a local folder will be used to store the job transfer files.
399-
/// </summary>
400-
/// <returns>
401-
/// A <see cref="LocalTransferCheckpointer"/> using the folder
402-
/// where the application is stored with and making a new folder called
403-
/// .azstoragedml to store all the job plan files.
404-
/// </returns>
405-
private static LocalTransferCheckpointer CreateDefaultCheckpointer()
406-
{
407-
// Return checkpointer
408-
return new LocalTransferCheckpointer(default);
409-
}
410-
411403
private async Task SetDataTransfers()
412404
{
413405
_dataTransfers.Clear();

0 commit comments

Comments
 (0)