Skip to content

Commit 1c4d6f2

Browse files
authored
[Storage] [DataMovement] Fixed issue where transfers added concurrently to local checkpointer throws (Azure#50076)
* Fixed issue where transfers added concurrently to local checkpointer would throw collision exceptions * updated changelog
1 parent b3e2c21 commit 1c4d6f2

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Fixed issue where transfers added concurrently to the local checkpointer would throw collision exceptions intermittently.
1011

1112
### Other Changes
1213

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Collections.Concurrent;
67
using System.Collections.Generic;
78
using System.IO;
89
using System.IO.MemoryMappedFiles;
@@ -27,15 +28,15 @@ internal class LocalTransferCheckpointer : SerializerTransferCheckpointer
2728
/// <summary>
2829
/// Stores references to the memory mapped files stored by IDs.
2930
/// </summary>
30-
internal readonly Dictionary<string, JobPlanFile> _transferStates;
31+
internal readonly ConcurrentDictionary<string, JobPlanFile> _transferStates;
3132

3233
/// <summary>
3334
/// Initializes a new instance of <see cref="LocalTransferCheckpointer"/> class.
3435
/// </summary>
3536
/// <param name="folderPath">Path to the folder containing the checkpointing information to resume from.</param>
3637
public LocalTransferCheckpointer(string folderPath)
3738
{
38-
_transferStates = new Dictionary<string, JobPlanFile>();
39+
_transferStates = new ConcurrentDictionary<string, JobPlanFile>();
3940
if (string.IsNullOrEmpty(folderPath))
4041
{
4142
_pathToCheckpointer = Path.Combine(Environment.CurrentDirectory, DataMovementConstants.DefaultCheckpointerPath);
@@ -109,7 +110,7 @@ public override async Task AddNewJobAsync(
109110
transferId,
110111
headerStream,
111112
cancellationToken).ConfigureAwait(false);
112-
_transferStates.Add(transferId, jobPlanFile);
113+
AddToTransferStates(transferId, jobPlanFile);
113114
}
114115
}
115116

@@ -302,7 +303,7 @@ public override Task<bool> TryRemoveStoredTransferAsync(string transferId, Cance
302303
}
303304
}
304305

305-
_transferStates.Remove(transferId);
306+
_transferStates.TryRemove(transferId, out _);
306307
return Task.FromResult(result);
307308
}
308309

@@ -337,7 +338,8 @@ public override async Task SetJobTransferStatusAsync(
337338
// if paused or other completion state, remove the memory cache but still write state to the plan file for later resume
338339
if (status.State == TransferState.Completed || status.State == TransferState.Paused)
339340
{
340-
_transferStates.Remove(transferId);
341+
// If TryRemove fails, it's fine it may be because it does not already exist or already has been removed
342+
_transferStates.TryRemove(transferId, out _);
341343
}
342344

343345
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -406,7 +408,7 @@ private void RefreshCache()
406408
JobPlanFile jobPlanFile = JobPlanFile.LoadExistingJobPlanFile(path);
407409
if (!_transferStates.ContainsKey(jobPlanFile.Id))
408410
{
409-
_transferStates.Add(jobPlanFile.Id, jobPlanFile);
411+
AddToTransferStates(jobPlanFile.Id, jobPlanFile);
410412
}
411413
else
412414
{
@@ -433,17 +435,18 @@ private void RefreshCache()
433435
}
434436

435437
/// <summary>
436-
/// Clears cache for a given trandfer ID and repopulates from disk if any.
438+
/// Clears cache for a given transfer ID and repopulates from disk if any.
437439
/// </summary>
438440
private void RefreshCache(string transferId)
439441
{
440-
_transferStates.Remove(transferId);
442+
// If TryRemove fails, it's fine it may be because it does not already exist or already has been removed
443+
_transferStates.TryRemove(transferId, out _);
441444
JobPlanFile jobPlanFile = JobPlanFile.LoadExistingJobPlanFile(_pathToCheckpointer, transferId);
442445
if (!File.Exists(jobPlanFile.FilePath))
443446
{
444447
return;
445448
}
446-
_transferStates.Add(transferId, jobPlanFile);
449+
AddToTransferStates(transferId, jobPlanFile);
447450
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer)
448451
.Where(p => Path.GetExtension(p) == DataMovementConstants.JobPartPlanFile.FileExtension))
449452
{
@@ -477,5 +480,13 @@ private static JobPlanOperation GetOperationType(StorageResource source, Storage
477480
throw Errors.InvalidSourceDestinationParams();
478481
}
479482
}
483+
484+
private void AddToTransferStates(string transferId, JobPlanFile jobPlanFile)
485+
{
486+
if (!_transferStates.TryAdd(transferId, jobPlanFile))
487+
{
488+
throw Errors.CollisionJobPlanFile(transferId);
489+
}
490+
}
480491
}
481492
}

sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public static ArgumentException MissingTransferIdCheckpointer(string transferId)
5151
public static ArgumentException CollisionJobPart(string transferId, int jobPart)
5252
=> throw new ArgumentException($"Job Part Collision Checkpointer: The job part {jobPart} for transfer id {transferId}, already exists in the checkpointer.");
5353

54+
public static ArgumentException CollisionJobPlanFile(string transferId)
55+
=> throw new ArgumentException($"Job Plan File collision checkpointer: The job {transferId}, already exists in the checkpointer.");
56+
5457
public static ArgumentException MissingCheckpointerPath(string directoryPath)
5558
=> throw new ArgumentException($"Could not initialize the LocalTransferCheckpointer because the folderPath passed does not exist. Please create the {directoryPath}, folder path first.");
5659

0 commit comments

Comments
 (0)