Skip to content

Commit 70d8a85

Browse files
[Storage][DataMovement] Add additional debug logging to transfers (Azure#47452)
1 parent 0bfe2d3 commit 70d8a85

10 files changed

+159
-29
lines changed

sdk/storage/Azure.Storage.DataMovement/src/Azure.Storage.DataMovement.csproj

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,14 @@
2020
<ItemGroup>
2121
<PackageReference Include="System.Threading.Channels" />
2222
<PackageReference Include="Azure.Core" />
23-
</ItemGroup>
24-
<ItemGroup>
2523
<PackageReference Include="Azure.Storage.Common" />
2624
</ItemGroup>
2725
<ItemGroup>
28-
<Compile Include="$(AzureStorageSharedSources)Constants.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
29-
</ItemGroup>
30-
<ItemGroup>
26+
<Compile Include="$(AzureCoreSharedSources)AzureEventSource.cs" LinkBase="Shared\Core" />
3127
<Compile Include="$(AzureCoreSharedSources)CancellationHelper.cs" LinkBase="Shared\Core" />
3228
</ItemGroup>
3329
<ItemGroup>
30+
<Compile Include="$(AzureStorageSharedSources)Constants.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
3431
<Compile Include="$(AzureStorageSharedSources)AggregatingProgressIncrementer.cs" LinkBase="Shared\Storage" />
3532
<Compile Include="$(AzureStorageSharedSources)Errors.cs" LinkBase="Shared\Storage" />
3633
<Compile Include="$(AzureStorageSharedSources)Errors.Clients.cs" LinkBase="Shared\Storage" />
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Diagnostics.Tracing;
5+
using Azure.Core.Diagnostics;
6+
7+
namespace Azure.Storage.DataMovement
8+
{
9+
internal class DataMovementEventSource : AzureEventSource
10+
{
11+
private const string EventSourceName = "Azure-Storage-DataMovement";
12+
13+
private const int TransferQueuedEvent = 1;
14+
private const int TransferCompletedEvent = 2;
15+
private const int JobPartStatusEvent = 3;
16+
private const int EnumerationCompleteEvent = 4;
17+
private const int ResumeTransferEvent = 5;
18+
private const int ResumeEnumerationCompleteEvent = 6;
19+
20+
private DataMovementEventSource() : base(EventSourceName) { }
21+
22+
public static DataMovementEventSource Singleton { get; } = new DataMovementEventSource();
23+
24+
[Event(TransferQueuedEvent, Level = EventLevel.Informational, Message = "Transfer [{0}] Transfer queued: {1} -> {2}")]
25+
public void TransferQueued(string transferId, string source, string destination)
26+
{
27+
WriteEvent(TransferQueuedEvent, transferId, source, destination);
28+
}
29+
30+
[NonEvent]
31+
public void TransferQueued(string transferId, StorageResource source, StorageResource destination)
32+
{
33+
if (IsEnabled(EventLevel.Informational, EventKeywords.None))
34+
{
35+
TransferQueued(transferId, source.Uri.AbsoluteUri, destination.Uri.AbsoluteUri);
36+
}
37+
}
38+
39+
[Event(TransferCompletedEvent, Level = EventLevel.Informational, Message = "Transfer [{0}] Transfer completed: HasFailed={1}, HasKsipped={2}")]
40+
public void TrasnferCompleted(string trasnferId, bool hasFailed, bool hasSkipped)
41+
{
42+
WriteEvent(TransferCompletedEvent, trasnferId, hasFailed, hasSkipped);
43+
}
44+
45+
[NonEvent]
46+
public void TrasnferCompleted(string trasnferId, DataTransferStatus status)
47+
{
48+
if (IsEnabled(EventLevel.Informational, EventKeywords.None))
49+
{
50+
TrasnferCompleted(trasnferId, status.HasFailedItems, status.HasSkippedItems);
51+
}
52+
}
53+
54+
[Event(JobPartStatusEvent, Level = EventLevel.Verbose, Message = "Transfer [{0}:{1}] Status={2}, Failed={3}, Skipped={4}")]
55+
public void JobPartStatus(string transferId, int jobPart, string jobPartStatus, bool failed, bool skipped)
56+
{
57+
WriteEvent(JobPartStatusEvent, transferId, jobPart, jobPartStatus, failed, skipped);
58+
}
59+
60+
[NonEvent]
61+
public void JobPartStatus(string transferId, int jobPart, DataTransferStatus status)
62+
{
63+
if (IsEnabled(EventLevel.Verbose, EventKeywords.None))
64+
{
65+
JobPartStatus(transferId, jobPart, status.State.ToString(), status.HasFailedItems, status.HasSkippedItems);
66+
}
67+
}
68+
69+
[Event(EnumerationCompleteEvent, Level = EventLevel.Informational, Message = "Transfer [{0}] Enumeration complete: JobPartCount={1}")]
70+
public void EnumerationComplete(string transferId, int jobPartCount)
71+
{
72+
WriteEvent(EnumerationCompleteEvent, transferId, jobPartCount);
73+
}
74+
75+
[Event(ResumeTransferEvent, Level = EventLevel.Informational, Message = "Resume transfer [{0]} Transfer queued: {1} -> {2}")]
76+
public void ResumeTransfer(string transferId, string source, string destination)
77+
{
78+
WriteEvent(ResumeTransferEvent, transferId, source, destination);
79+
}
80+
81+
[NonEvent]
82+
public void ResumeTransfer(string transferId, StorageResource source, StorageResource destination)
83+
{
84+
if (IsEnabled(EventLevel.Informational, EventKeywords.None))
85+
{
86+
ResumeTransfer(transferId, source.Uri.AbsoluteUri, destination.Uri.AbsoluteUri);
87+
}
88+
}
89+
90+
[Event(ResumeEnumerationCompleteEvent, Level = EventLevel.Informational, Message = "Resume transfer [{0}] Resumed saved job parts: JobPartCount={1}")]
91+
public void ResumeEnumerationComplete(string transferId, int jobPartCount)
92+
{
93+
WriteEvent(ResumeEnumerationCompleteEvent, transferId, jobPartCount);
94+
}
95+
}
96+
}

sdk/storage/Azure.Storage.DataMovement/src/DataTransferInternalState.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public bool SetTransferState(DataTransferState state)
9494
if (DataTransferState.Completed == _status.State ||
9595
DataTransferState.Paused == _status.State)
9696
{
97+
DataMovementEventSource.Singleton.TrasnferCompleted(Id, _status);
9798
// If the _completionSource has been cancelled or the exception
9899
// has been set, we don't need to check if TrySetResult returns false
99100
// because it's acceptable to cancel or have an error occur before then.

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ internal abstract class JobPartInternal
103103
internal ClientDiagnostics ClientDiagnostics { get; }
104104

105105
/// <summary>
106-
/// If the transfer status of the job changes then the event will get added to this handler.
106+
/// If the transfer status of the job part changes then the event will get added to this handler.
107107
/// </summary>
108-
public SyncAsyncEventHandler<TransferStatusEventArgs> PartTransferStatusEventHandler { get; internal set; }
108+
public SyncAsyncEventHandler<JobPartStatusEventArgs> PartTransferStatusEventHandler { get; internal set; }
109109

110110
/// <summary>
111111
/// If the transfer status of the job changes then the event will get added to this handler.
@@ -155,7 +155,7 @@ internal JobPartInternal(
155155
ITransferCheckpointer checkpointer,
156156
TransferProgressTracker progressTracker,
157157
ArrayPool<byte> arrayPool,
158-
SyncAsyncEventHandler<TransferStatusEventArgs> jobPartEventHandler,
158+
SyncAsyncEventHandler<JobPartStatusEventArgs> jobPartEventHandler,
159159
SyncAsyncEventHandler<TransferStatusEventArgs> statusEventHandler,
160160
SyncAsyncEventHandler<TransferItemFailedEventArgs> failedEventHandler,
161161
SyncAsyncEventHandler<TransferItemSkippedEventArgs> skippedEventHandler,
@@ -293,8 +293,9 @@ internal async Task OnTransferStateChangedAsync(DataTransferState transferState)
293293
await SetCheckpointerStatusAsync().ConfigureAwait(false);
294294

295295
await PartTransferStatusEventHandler.RaiseAsync(
296-
new TransferStatusEventArgs(
296+
new JobPartStatusEventArgs(
297297
_dataTransfer.Id,
298+
PartNumber,
298299
JobPartStatus.DeepCopy(),
299300
false,
300301
_cancellationToken),
@@ -359,8 +360,9 @@ await TransferSkippedEventHandler.RaiseAsync(
359360
if (JobPartStatus.SetSkippedItem())
360361
{
361362
await PartTransferStatusEventHandler.RaiseAsync(
362-
new TransferStatusEventArgs(
363+
new JobPartStatusEventArgs(
363364
_dataTransfer.Id,
365+
PartNumber,
364366
JobPartStatus.DeepCopy(),
365367
false,
366368
_cancellationToken),
@@ -414,8 +416,9 @@ await TransferFailedEventHandler.RaiseAsync(
414416
if (JobPartStatus.SetFailedItem())
415417
{
416418
await PartTransferStatusEventHandler.RaiseAsync(
417-
new TransferStatusEventArgs(
419+
new JobPartStatusEventArgs(
418420
_dataTransfer.Id,
421+
PartNumber,
419422
JobPartStatus.DeepCopy(),
420423
false,
421424
_cancellationToken),
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Threading;
5+
6+
namespace Azure.Storage.DataMovement
7+
{
8+
internal class JobPartStatusEventArgs : DataTransferEventArgs
9+
{
10+
public int PartNumber { get; }
11+
12+
public DataTransferStatus TransferStatus { get; }
13+
14+
public JobPartStatusEventArgs(
15+
string transferId,
16+
int partNumber,
17+
DataTransferStatus transferStatus,
18+
bool isRunningSynchronously,
19+
CancellationToken cancellationToken)
20+
: base(transferId, isRunningSynchronously, cancellationToken)
21+
{
22+
PartNumber = partNumber;
23+
TransferStatus = transferStatus;
24+
}
25+
}
26+
}

sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private ServiceToServiceJobPart(TransferJobInternal job, int partNumber)
3737
checkpointer: job._checkpointer,
3838
progressTracker: job._progressTracker,
3939
arrayPool: job.UploadArrayPool,
40-
jobPartEventHandler: job.GetJobPartStatus(),
40+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
4141
statusEventHandler: job.TransferStatusEventHandler,
4242
failedEventHandler: job.TransferFailedEventHandler,
4343
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -67,7 +67,7 @@ private ServiceToServiceJobPart(
6767
checkpointer: job._checkpointer,
6868
progressTracker: job._progressTracker,
6969
arrayPool: job.UploadArrayPool,
70-
jobPartEventHandler: job.GetJobPartStatus(),
70+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
7171
statusEventHandler: job.TransferStatusEventHandler,
7272
failedEventHandler: job.TransferFailedEventHandler,
7373
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -102,7 +102,7 @@ private ServiceToServiceJobPart(
102102
checkpointer: job._checkpointer,
103103
progressTracker: job._progressTracker,
104104
arrayPool: job.UploadArrayPool,
105-
jobPartEventHandler: job.GetJobPartStatus(),
105+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
106106
statusEventHandler: job.TransferStatusEventHandler,
107107
failedEventHandler: job.TransferFailedEventHandler,
108108
skippedEventHandler: job.TransferSkippedEventHandler,

sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private StreamToUriJobPart(TransferJobInternal job, int partNumber)
3737
checkpointer: job._checkpointer,
3838
progressTracker: job._progressTracker,
3939
arrayPool: job.UploadArrayPool,
40-
jobPartEventHandler: job.GetJobPartStatus(),
40+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
4141
statusEventHandler: job.TransferStatusEventHandler,
4242
failedEventHandler: job.TransferFailedEventHandler,
4343
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -67,7 +67,7 @@ private StreamToUriJobPart(
6767
checkpointer: job._checkpointer,
6868
progressTracker: job._progressTracker,
6969
arrayPool: job.UploadArrayPool,
70-
jobPartEventHandler: job.GetJobPartStatus(),
70+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
7171
statusEventHandler: job.TransferStatusEventHandler,
7272
failedEventHandler: job.TransferFailedEventHandler,
7373
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -102,7 +102,7 @@ private StreamToUriJobPart(
102102
checkpointer: job._checkpointer,
103103
progressTracker: job._progressTracker,
104104
arrayPool: job.UploadArrayPool,
105-
jobPartEventHandler: job.GetJobPartStatus(),
105+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
106106
statusEventHandler: job.TransferStatusEventHandler,
107107
failedEventHandler: job.TransferFailedEventHandler,
108108
skippedEventHandler: job.TransferSkippedEventHandler,

sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,10 @@ internal delegate Task<JobPartInternal> CreateJobPartMultiAsync(
9797
internal StorageResourceCreationPreference _creationPreference;
9898

9999
/// <summary>
100-
/// Considering if there's more than one job part, the transfer status will need to be set to
101-
/// completed all job parts have been set to completed.
100+
/// Event handler for tracking status changes in job parts.
102101
/// </summary>
103-
public event SyncAsyncEventHandler<TransferStatusEventArgs> JobPartStatusEvents;
104-
public SyncAsyncEventHandler<TransferStatusEventArgs> GetJobPartStatus() => JobPartStatusEvents;
102+
public event SyncAsyncEventHandler<JobPartStatusEventArgs> JobPartStatusEvents;
103+
public SyncAsyncEventHandler<JobPartStatusEventArgs> GetJobPartStatusEventHandler() => JobPartStatusEvents;
105104

106105
/// <summary>
107106
/// If the transfer status of the job changes then the event will get added to this handler.
@@ -184,7 +183,7 @@ private TransferJobInternal(
184183
_errorMode = errorHandling;
185184
_creationPreference = creationPreference;
186185

187-
JobPartStatusEvents += JobPartEventAsync;
186+
JobPartStatusEvents += JobPartStatusEventAsync;
188187
TransferStatusEventHandler = statusEventHandler;
189188
TransferFailedEventHandler = failedEventHandler;
190189
TransferSkippedEventHandler = skippedEventHandler;
@@ -271,7 +270,7 @@ public void DisposeHandlers()
271270
{
272271
if (JobPartStatusEvents != default)
273272
{
274-
JobPartStatusEvents -= JobPartEventAsync;
273+
JobPartStatusEvents -= JobPartStatusEventAsync;
275274
}
276275
}
277276

@@ -336,6 +335,7 @@ public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync(
336335
yield return part;
337336
}
338337
}
338+
DataMovementEventSource.Singleton.ResumeEnumerationComplete(_dataTransfer.Id, _jobParts.Count);
339339

340340
bool isEnumerationComplete;
341341
try
@@ -465,6 +465,7 @@ private async IAsyncEnumerable<JobPartInternal> EnumerateAndCreateJobPartsAsync(
465465
}
466466
}
467467
}
468+
DataMovementEventSource.Singleton.EnumerationComplete(_dataTransfer.Id, _jobParts.Count);
468469
}
469470

470471
/// <summary>
@@ -544,11 +545,13 @@ await TransferFailedEventHandler.RaiseAsync(
544545
/// In order to properly propagate the transfer status events of each job part up
545546
/// until all job parts have completed.
546547
/// </summary>
547-
public async Task JobPartEventAsync(TransferStatusEventArgs args)
548+
public async Task JobPartStatusEventAsync(JobPartStatusEventArgs args)
548549
{
549550
DataTransferStatus jobPartStatus = args.TransferStatus;
550551
DataTransferState jobState = _dataTransfer._state.GetTransferStatus().State;
551552

553+
DataMovementEventSource.Singleton.JobPartStatus(_dataTransfer.Id, args.PartNumber, jobPartStatus);
554+
552555
// Keep track of paused, failed, and skipped which we will use to determine final job status
553556
// Since this is each Job Part coming in, the state of skipped or failed is mutually exclusive.
554557
if (jobPartStatus.State == DataTransferState.Paused)

sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,14 +310,17 @@ bool TryGetStorageResourceProvider(DataTransferProperties properties, bool getSo
310310
throw Errors.NoResourceProviderFound(false, dataTransferProperties.DestinationProviderId);
311311
}
312312

313+
StorageResource source = await sourceProvider.FromSourceAsync(dataTransferProperties, cancellationToken).ConfigureAwait(false);
314+
StorageResource destination = await destinationProvider.FromDestinationAsync(dataTransferProperties, cancellationToken).ConfigureAwait(false);
313315
DataTransfer dataTransfer = await BuildAndAddTransferJobAsync(
314-
await sourceProvider.FromSourceAsync(dataTransferProperties, cancellationToken).ConfigureAwait(false),
315-
await destinationProvider.FromDestinationAsync(dataTransferProperties, cancellationToken).ConfigureAwait(false),
316+
source,
317+
destination,
316318
transferOptions,
317319
dataTransferProperties.TransferId,
318320
true,
319321
cancellationToken).ConfigureAwait(false);
320322

323+
DataMovementEventSource.Singleton.ResumeTransfer(dataTransfer.Id, source, destination);
321324
return dataTransfer;
322325
}
323326

@@ -389,6 +392,7 @@ await _checkpointer.AddNewJobAsync(
389392
false,
390393
cancellationToken).ConfigureAwait(false);
391394

395+
DataMovementEventSource.Singleton.TransferQueued(transferId, sourceResource, destinationResource);
392396
return dataTransfer;
393397
}
394398
catch (Exception ex)

sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private UriToStreamJobPart(
3939
checkpointer: job._checkpointer,
4040
progressTracker: job._progressTracker,
4141
arrayPool: job.UploadArrayPool,
42-
jobPartEventHandler: job.GetJobPartStatus(),
42+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
4343
statusEventHandler: job.TransferStatusEventHandler,
4444
failedEventHandler: job.TransferFailedEventHandler,
4545
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -68,7 +68,7 @@ private UriToStreamJobPart(
6868
checkpointer: job._checkpointer,
6969
progressTracker: job._progressTracker,
7070
arrayPool: job.UploadArrayPool,
71-
jobPartEventHandler: job.GetJobPartStatus(),
71+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
7272
statusEventHandler: job.TransferStatusEventHandler,
7373
failedEventHandler: job.TransferFailedEventHandler,
7474
skippedEventHandler: job.TransferSkippedEventHandler,
@@ -103,7 +103,7 @@ private UriToStreamJobPart(
103103
checkpointer: job._checkpointer,
104104
progressTracker: job._progressTracker,
105105
arrayPool: job.UploadArrayPool,
106-
jobPartEventHandler: job.GetJobPartStatus(),
106+
jobPartEventHandler: job.GetJobPartStatusEventHandler(),
107107
statusEventHandler: job.TransferStatusEventHandler,
108108
failedEventHandler: job.TransferFailedEventHandler,
109109
skippedEventHandler: job.TransferSkippedEventHandler,

0 commit comments

Comments
 (0)