Skip to content

Commit 1bc8916

Browse files
[Storage][DataMovement] Add source/destination checkpoint data to Job Plan file (Azure#39411)
1 parent b8697cc commit 1bc8916

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+838
-680
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -283,27 +283,19 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
283283
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
284284
}
285285

286-
/// <summary>
287-
/// Gets the source checkpoint data for this resource that will be written to the checkpointer.
288-
/// </summary>
289-
/// <returns>A <see cref="StorageResourceCheckpointData"/> containing the checkpoint information for this resource.</returns>
290-
protected override StorageResourceCheckpointData GetSourceCheckpointData()
286+
public override StorageResourceCheckpointData GetSourceCheckpointData()
291287
{
292-
return new BlobSourceCheckpointData();
288+
return new BlobSourceCheckpointData(BlobType.Append);
293289
}
294290

295-
/// <summary>
296-
/// Gets the destination checkpoint data for this resource that will be written to the checkpointer.
297-
/// </summary>
298-
/// <returns>A <see cref="StorageResourceCheckpointData"/> containing the checkpoint information for this resource.</returns>
299-
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
291+
public override StorageResourceCheckpointData GetDestinationCheckpointData()
300292
{
301293
return new BlobDestinationCheckpointData(
302294
BlobType.Append,
303-
_options.HttpHeaders,
304-
_options.AccessTier,
305-
_options.Metadata,
306-
_options.Tags,
295+
_options?.HttpHeaders,
296+
_options?.AccessTier,
297+
_options?.Metadata,
298+
_options?.Tags,
307299
default); // TODO: Update when we support encryption scopes
308300
}
309301

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.IO;
6+
using Azure.Storage.Blobs.Models;
7+
8+
namespace Azure.Storage.DataMovement.Blobs
9+
{
10+
/// <summary>
11+
/// Base class for Blob source and destination checkpoint data
12+
/// which contains shared fields.
13+
/// </summary>
14+
internal abstract class BlobCheckpointData : StorageResourceCheckpointData
15+
{
16+
/// <summary>
17+
/// Schema version.
18+
/// </summary>
19+
public int Version;
20+
21+
/// <summary>
22+
/// The type of blob.
23+
/// </summary>
24+
public BlobType BlobType;
25+
26+
public BlobCheckpointData(int version, BlobType blobType)
27+
{
28+
Version = version;
29+
BlobType = blobType;
30+
}
31+
}
32+
}

sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs

Lines changed: 31 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
45
using System.IO;
56
using System.Text;
67
using Azure.Core;
@@ -11,18 +12,8 @@
1112

1213
namespace Azure.Storage.DataMovement.Blobs
1314
{
14-
internal class BlobDestinationCheckpointData : StorageResourceCheckpointData
15+
internal class BlobDestinationCheckpointData : BlobCheckpointData
1516
{
16-
/// <summary>
17-
/// Schema version.
18-
/// </summary>
19-
public int Version;
20-
21-
/// <summary>
22-
/// The type of the destination blob.
23-
/// </summary>
24-
public BlobType BlobType;
25-
2617
/// <summary>
2718
/// The content headers for the destination blob.
2819
/// </summary>
@@ -65,29 +56,28 @@ public BlobDestinationCheckpointData(
6556
Metadata metadata,
6657
Tags blobTags,
6758
string cpkScope)
59+
: base(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, blobType)
6860
{
69-
Version = DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion;
70-
BlobType = blobType;
7161
ContentHeaders = contentHeaders;
72-
_contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : new byte[0];
73-
_contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : new byte[0];
74-
_contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : new byte[0];
75-
_contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : new byte[0];
76-
_cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : new byte[0];
62+
_contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : Array.Empty<byte>();
63+
_contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : Array.Empty<byte>();
64+
_contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : Array.Empty<byte>();
65+
_contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : Array.Empty<byte>();
66+
_cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : Array.Empty<byte>();
7767
AccessTier = accessTier;
7868
Metadata = metadata;
79-
_metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : new byte[0];
69+
_metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : Array.Empty<byte>();
8070
Tags = blobTags;
81-
_tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : new byte[0];
71+
_tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : Array.Empty<byte>();
8272
CpkScope = cpkScope;
83-
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : new byte[0];
73+
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : Array.Empty<byte>();
8474
}
8575

86-
protected override void Serialize(Stream stream)
76+
public override void Serialize(Stream stream)
8777
{
8878
Argument.AssertNotNull(stream, nameof(stream));
8979

90-
int currentVariableLengthIndex = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex;
80+
int currentVariableLengthIndex = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex;
9181
BinaryWriter writer = new BinaryWriter(stream);
9282

9383
// Version
@@ -97,31 +87,31 @@ protected override void Serialize(Stream stream)
9787
writer.Write((byte)BlobType);
9888

9989
// ContentType offset/length
100-
WriteVariableLengthFieldInfo(writer, _contentTypeBytes, ref currentVariableLengthIndex);
90+
WriteVariableLengthFieldInfo(writer, _contentTypeBytes.Length, ref currentVariableLengthIndex);
10191

10292
// ContentEncoding offset/length
103-
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes, ref currentVariableLengthIndex);
93+
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes.Length, ref currentVariableLengthIndex);
10494

10595
// ContentLanguage offset/length
106-
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes, ref currentVariableLengthIndex);
96+
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes.Length, ref currentVariableLengthIndex);
10797

10898
// ContentDisposition offset/length
109-
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes, ref currentVariableLengthIndex);
99+
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes.Length, ref currentVariableLengthIndex);
110100

111101
// CacheControl offset/length
112-
WriteVariableLengthFieldInfo(writer, _cacheControlBytes, ref currentVariableLengthIndex);
102+
WriteVariableLengthFieldInfo(writer, _cacheControlBytes.Length, ref currentVariableLengthIndex);
113103

114104
// AccessTier
115105
writer.Write((byte)AccessTier.ToJobPlanAccessTier());
116106

117107
// Metadata offset/length
118-
WriteVariableLengthFieldInfo(writer, _metadataBytes, ref currentVariableLengthIndex);
108+
WriteVariableLengthFieldInfo(writer, _metadataBytes.Length, ref currentVariableLengthIndex);
119109

120110
// Tags offset/length
121-
WriteVariableLengthFieldInfo(writer, _tagsBytes, ref currentVariableLengthIndex);
111+
WriteVariableLengthFieldInfo(writer, _tagsBytes.Length, ref currentVariableLengthIndex);
122112

123113
// CpkScope offset/length
124-
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes, ref currentVariableLengthIndex);
114+
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes.Length, ref currentVariableLengthIndex);
125115

126116
writer.Write(_contentTypeBytes);
127117
writer.Write(_contentEncodingBytes);
@@ -141,7 +131,10 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)
141131

142132
// Version
143133
int version = reader.ReadInt32();
144-
CheckSchemaVersion(version);
134+
if (version != DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion)
135+
{
136+
throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString());
137+
}
145138

146139
// BlobType
147140
BlobType blobType = (BlobType)reader.ReadByte();
@@ -168,7 +161,11 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)
168161

169162
// AccessTier
170163
JobPlanAccessTier jobPlanAccessTier = (JobPlanAccessTier)reader.ReadByte();
171-
AccessTier accessTier = new AccessTier(jobPlanAccessTier.ToString());
164+
AccessTier? accessTier = default;
165+
if (!jobPlanAccessTier.Equals(JobPlanAccessTier.None))
166+
{
167+
accessTier = new AccessTier(jobPlanAccessTier.ToString());
168+
}
172169

173170
// Metadata offset/length
174171
int metadataOffset = reader.ReadInt32();
@@ -267,7 +264,7 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)
267264
private int CalculateLength()
268265
{
269266
// Length is fixed size fields plus length of each variable length field
270-
int length = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex;
267+
int length = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex;
271268
length += _contentTypeBytes.Length;
272269
length += _contentEncodingBytes.Length;
273270
length += _contentLanguageBytes.Length;
@@ -278,13 +275,5 @@ private int CalculateLength()
278275
length += _cpkScopeBytes.Length;
279276
return length;
280277
}
281-
282-
private static void CheckSchemaVersion(int version)
283-
{
284-
if (version != DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion)
285-
{
286-
throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString());
287-
}
288-
}
289278
}
290279
}

sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,48 @@
22
// Licensed under the MIT License.
33

44
using System.IO;
5+
using Azure.Core;
6+
using Azure.Storage.Blobs.Models;
57

68
namespace Azure.Storage.DataMovement.Blobs
79
{
8-
internal class BlobSourceCheckpointData : StorageResourceCheckpointData
10+
internal class BlobSourceCheckpointData : BlobCheckpointData
911
{
10-
public override int Length => 0;
12+
public BlobSourceCheckpointData(BlobType blobType)
13+
: base(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, blobType)
14+
{
15+
}
16+
17+
public override int Length => DataMovementBlobConstants.SourceCheckpointData.DataSize;
1118

12-
protected override void Serialize(Stream stream)
19+
public override void Serialize(Stream stream)
1320
{
21+
Argument.AssertNotNull(stream, nameof(stream));
22+
BinaryWriter writer = new BinaryWriter(stream);
23+
24+
// Version
25+
writer.Write(Version);
26+
27+
// BlobType
28+
writer.Write((byte)BlobType);
29+
}
30+
31+
internal static BlobSourceCheckpointData Deserialize(Stream stream)
32+
{
33+
Argument.AssertNotNull(stream, nameof(stream));
34+
BinaryReader reader = new BinaryReader(stream);
35+
36+
// Version
37+
int version = reader.ReadInt32();
38+
if (version != DataMovementBlobConstants.SourceCheckpointData.SchemaVersion)
39+
{
40+
throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString());
41+
}
42+
43+
// BlobType
44+
BlobType blobType = (BlobType)reader.ReadByte();
45+
46+
return new BlobSourceCheckpointData(blobType);
1447
}
1548
}
1649
}

sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,23 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
129129
}
130130
}
131131

132+
public override StorageResourceCheckpointData GetSourceCheckpointData()
133+
{
134+
// Source blob type does not matter for container
135+
return new BlobSourceCheckpointData(BlobType.Block);
136+
}
137+
138+
public override StorageResourceCheckpointData GetDestinationCheckpointData()
139+
{
140+
return new BlobDestinationCheckpointData(
141+
_options?.BlobType ?? BlobType.Block,
142+
_options?.BlobOptions?.HttpHeaders,
143+
_options?.BlobOptions?.AccessTier,
144+
_options?.BlobOptions?.Metadata,
145+
_options?.BlobOptions?.Tags,
146+
default); // TODO: Update when we support encryption scopes
147+
}
148+
132149
private string ApplyOptionalPrefix(string path)
133150
=> IsDirectory
134151
? string.Join("/", DirectoryPrefix, path)

0 commit comments

Comments
 (0)