Skip to content

Commit 0d82cd3

Browse files
Plan memory leak (Azure#47495)
* initial implementation * test * feedback
1 parent 2b34e96 commit 0d82cd3

File tree

3 files changed

+175
-95
lines changed

3 files changed

+175
-95
lines changed

sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ private JobPlanFile(string id, string filePath)
4343
WriteLock = new SemaphoreSlim(1);
4444
}
4545

46+
private static string ToFullPath(string checkpointerPath, string transferId)
47+
{
48+
string fileName = $"{transferId}{DataMovementConstants.JobPlanFile.FileExtension}";
49+
return Path.Combine(checkpointerPath, fileName);
50+
}
51+
4652
public static async Task<JobPlanFile> CreateJobPlanFileAsync(
4753
string checkpointerPath,
4854
string id,
@@ -53,8 +59,7 @@ public static async Task<JobPlanFile> CreateJobPlanFileAsync(
5359
Argument.AssertNotNullOrEmpty(id, nameof(id));
5460
Argument.AssertNotNull(headerStream, nameof(headerStream));
5561

56-
string fileName = $"{id}{DataMovementConstants.JobPlanFile.FileExtension}";
57-
string filePath = Path.Combine(checkpointerPath, fileName);
62+
string filePath = ToFullPath(checkpointerPath, id);
5863

5964
JobPlanFile jobPlanFile = new(id, filePath);
6065
try
@@ -92,6 +97,11 @@ public static JobPlanFile LoadExistingJobPlanFile(string fullPath)
9297
return new JobPlanFile(transferId, fullPath);
9398
}
9499

100+
public static JobPlanFile LoadExistingJobPlanFile(string checkpointerPath, string transferId)
101+
{
102+
return LoadExistingJobPlanFile(ToFullPath(checkpointerPath, transferId));
103+
}
104+
95105
public void Dispose()
96106
{
97107
WriteLock.Dispose();

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 127 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal class LocalTransferCheckpointer : SerializerTransferCheckpointer
2727
/// <summary>
2828
/// Stores references to the memory mapped files stored by IDs.
2929
/// </summary>
30-
private Dictionary<string, JobPlanFile> _transferStates;
30+
internal readonly Dictionary<string, JobPlanFile> _transferStates;
3131

3232
/// <summary>
3333
/// Initializes a new instance of <see cref="LocalTransferCheckpointer"/> class.
@@ -52,10 +52,22 @@ public LocalTransferCheckpointer(string folderPath)
5252
else
5353
{
5454
_pathToCheckpointer = folderPath;
55-
InitializeExistingCheckpointer();
5655
}
5756
}
5857

58+
private bool TryGetJobPlanFile(string transferId, out JobPlanFile result)
59+
{
60+
if (!_transferStates.TryGetValue(transferId, out result))
61+
{
62+
RefreshCache(transferId);
63+
if (!_transferStates.TryGetValue(transferId, out result))
64+
{
65+
return false;
66+
}
67+
}
68+
return true;
69+
}
70+
5971
public override async Task AddNewJobAsync(
6072
string transferId,
6173
StorageResource source,
@@ -138,11 +150,11 @@ public override Task<int> CurrentJobPartCountAsync(
138150
CancellationToken cancellationToken = default)
139151
{
140152
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
141-
if (_transferStates.TryGetValue(transferId, out JobPlanFile result))
153+
if (!TryGetJobPlanFile(transferId, out JobPlanFile result))
142154
{
143-
return Task.FromResult(result.JobParts.Count);
155+
throw Errors.MissingTransferIdCheckpointer(transferId);
144156
}
145-
throw Errors.MissingTransferIdCheckpointer(transferId);
157+
return Task.FromResult(result.JobParts.Count);
146158
}
147159

148160
public override async Task<Stream> ReadJobPlanFileAsync(
@@ -155,28 +167,26 @@ public override async Task<Stream> ReadJobPlanFileAsync(
155167
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferSize);
156168

157169
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
158-
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
170+
if (!TryGetJobPlanFile(transferId, out JobPlanFile jobPlanFile))
159171
{
160-
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
161-
try
162-
{
163-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
164-
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
165-
{
166-
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
167-
}
172+
throw Errors.MissingTransferIdCheckpointer(transferId);
173+
}
168174

169-
copiedStream.Position = 0;
170-
return copiedStream;
171-
}
172-
finally
175+
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
176+
try
177+
{
178+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
179+
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
173180
{
174-
jobPlanFile.WriteLock.Release();
181+
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
175182
}
183+
184+
copiedStream.Position = 0;
185+
return copiedStream;
176186
}
177-
else
187+
finally
178188
{
179-
throw Errors.MissingTransferIdCheckpointer(transferId);
189+
jobPlanFile.WriteLock.Release();
180190
}
181191
}
182192

@@ -188,38 +198,33 @@ public override async Task<Stream> ReadJobPartPlanFileAsync(
188198
CancellationToken cancellationToken = default)
189199
{
190200
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
191-
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
201+
if (!TryGetJobPlanFile(transferId, out JobPlanFile jobPlanFile))
192202
{
193-
if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile))
194-
{
195-
int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize;
196-
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferSize);
203+
throw Errors.MissingTransferIdCheckpointer(transferId);
204+
}
205+
if (!jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile))
206+
{
207+
throw Errors.MissingPartNumberCheckpointer(transferId, partNumber);
208+
}
197209

198-
await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
199-
try
200-
{
201-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath))
202-
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
203-
{
204-
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
205-
}
206-
207-
copiedStream.Position = 0;
208-
return copiedStream;
209-
}
210-
finally
211-
{
212-
jobPartPlanFile.WriteLock.Release();
213-
}
214-
}
215-
else
210+
int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize;
211+
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferSize);
212+
213+
await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
214+
try
215+
{
216+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath))
217+
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
216218
{
217-
throw Errors.MissingPartNumberCheckpointer(transferId, partNumber);
219+
await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false);
218220
}
221+
222+
copiedStream.Position = 0;
223+
return copiedStream;
219224
}
220-
else
225+
finally
221226
{
222-
throw Errors.MissingTransferIdCheckpointer(transferId);
227+
jobPartPlanFile.WriteLock.Release();
223228
}
224229
}
225230

@@ -261,7 +266,7 @@ public override Task<bool> TryRemoveStoredTransferAsync(string transferId, Cance
261266

262267
List<string> filesToDelete = new List<string>();
263268

264-
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
269+
if (TryGetJobPlanFile(transferId, out JobPlanFile jobPlanFile))
265270
{
266271
filesToDelete.Add(jobPlanFile.FilePath);
267272
}
@@ -303,6 +308,7 @@ public override Task<bool> TryRemoveStoredTransferAsync(string transferId, Cance
303308

304309
public override Task<List<string>> GetStoredTransfersAsync(CancellationToken cancellationToken = default)
305310
{
311+
RefreshCache();
306312
return Task.FromResult(_transferStates.Keys.ToList());
307313
}
308314

@@ -316,26 +322,37 @@ public override async Task SetJobTransferStatusAsync(
316322

317323
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
318324

319-
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
325+
if (!TryGetJobPlanFile(transferId, out JobPlanFile jobPlanFile))
320326
{
321-
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
322-
try
323-
{
324-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
325-
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
326-
{
327-
accessor.Write(0, (int)status.ToJobPlanStatus());
328-
accessor.Flush();
329-
}
330-
}
331-
finally
327+
throw Errors.MissingTransferIdCheckpointer(transferId);
328+
}
329+
330+
// if completed successfully, get rid of all checkpointing info
331+
if (status.HasCompletedSuccessfully)
332+
{
333+
await TryRemoveStoredTransferAsync(transferId, cancellationToken).ConfigureAwait(false);
334+
return;
335+
}
336+
337+
// if paused or other completion state, remove the memory cache but still write state to the plan file for later resume
338+
if (status.State == DataTransferState.Completed || status.State == DataTransferState.Paused)
339+
{
340+
_transferStates.Remove(transferId);
341+
}
342+
343+
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
344+
try
345+
{
346+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
347+
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
332348
{
333-
jobPlanFile.WriteLock.Release();
349+
accessor.Write(0, (int)status.ToJobPlanStatus());
350+
accessor.Flush();
334351
}
335352
}
336-
else
353+
finally
337354
{
338-
throw Errors.MissingTransferIdCheckpointer(transferId);
355+
jobPlanFile.WriteLock.Release();
339356
}
340357
}
341358

@@ -350,47 +367,39 @@ public override async Task SetJobPartTransferStatusAsync(
350367

351368
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
352369

353-
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
370+
if (!TryGetJobPlanFile(transferId, out JobPlanFile jobPlanFile))
354371
{
355-
if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile file))
356-
{
357-
await file.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
358-
try
359-
{
360-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(file.FilePath, FileMode.Open))
361-
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
362-
{
363-
accessor.Write(0, (int)status.ToJobPlanStatus());
364-
accessor.Flush();
365-
}
366-
}
367-
finally
368-
{
369-
file.WriteLock.Release();
370-
}
371-
}
372-
else
372+
throw Errors.MissingTransferIdCheckpointer(transferId);
373+
}
374+
if (!jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile file))
375+
{
376+
throw Errors.MissingPartNumberCheckpointer(transferId, partNumber);
377+
}
378+
await file.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
379+
try
380+
{
381+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(file.FilePath, FileMode.Open))
382+
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
373383
{
374-
throw Errors.MissingPartNumberCheckpointer(transferId, partNumber);
384+
accessor.Write(0, (int)status.ToJobPlanStatus());
385+
accessor.Flush();
375386
}
376387
}
377-
else
388+
finally
378389
{
379-
throw Errors.MissingTransferIdCheckpointer(transferId);
390+
file.WriteLock.Release();
380391
}
381392
}
382393

383394
/// <summary>
384-
/// Takes the path of the checkpointer reads all the files in the top directory level
385-
/// and populates the _transferStates.
395+
/// Clears cached transfer states and repopulates by enumerating directory.
386396
/// </summary>
387-
private void InitializeExistingCheckpointer()
397+
private void RefreshCache()
388398
{
389-
// Enumerate the filesystem
390-
IEnumerable<string> checkpointFiles = Directory.EnumerateFiles(_pathToCheckpointer);
399+
_transferStates.Clear();
391400

392401
// First, retrieve all valid job plan files
393-
foreach (string path in checkpointFiles
402+
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer)
394403
.Where(p => Path.GetExtension(p) == DataMovementConstants.JobPlanFile.FileExtension))
395404
{
396405
// TODO: Should we check for valid schema version inside file now?
@@ -406,7 +415,7 @@ private void InitializeExistingCheckpointer()
406415
}
407416

408417
// Retrieve all valid job part plan files stored in the checkpointer path.
409-
foreach (string path in checkpointFiles
418+
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer)
410419
.Where(p => Path.GetExtension(p) == DataMovementConstants.JobPartPlanFile.FileExtension))
411420
{
412421
// Ensure each file has the correct format
@@ -423,6 +432,32 @@ private void InitializeExistingCheckpointer()
423432
}
424433
}
425434

435+
/// <summary>
436+
/// Clears cache for a given trandfer ID and repopulates from disk if any.
437+
/// </summary>
438+
private void RefreshCache(string transferId)
439+
{
440+
_transferStates.Remove(transferId);
441+
JobPlanFile jobPlanFile = JobPlanFile.LoadExistingJobPlanFile(_pathToCheckpointer, transferId);
442+
if (!File.Exists(jobPlanFile.FilePath))
443+
{
444+
return;
445+
}
446+
_transferStates.Add(transferId, jobPlanFile);
447+
foreach (string path in Directory.EnumerateFiles(_pathToCheckpointer)
448+
.Where(p => Path.GetExtension(p) == DataMovementConstants.JobPartPlanFile.FileExtension))
449+
{
450+
// Ensure each file has the correct format
451+
if (JobPartPlanFileName.TryParseJobPartPlanFileName(path, out JobPartPlanFileName partPlanFileName) &&
452+
partPlanFileName.Id == transferId)
453+
{
454+
jobPlanFile.JobParts.TryAdd(
455+
partPlanFileName.JobPartNumber,
456+
JobPartPlanFile.CreateExistingPartPlanFile(partPlanFileName));
457+
}
458+
}
459+
}
460+
426461
private static JobPlanOperation GetOperationType(StorageResource source, StorageResource destination)
427462
{
428463
if (source.IsLocalResource() && !destination.IsLocalResource())

0 commit comments

Comments
 (0)