Skip to content

Commit 02569a9

Browse files
[Storage] Use try/finally for locks in LocalTransferCheckpointer (Azure#46350)
1 parent 78dadc9 commit 02569a9

File tree

1 file changed

+58
-47
lines changed

1 file changed

+58
-47
lines changed

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,22 @@ public override async Task<Stream> ReadJobPlanFileAsync(
153153
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
154154
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
155155
{
156-
// Lock MMF
157156
await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false);
157+
try
158+
{
159+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
160+
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
161+
{
162+
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
163+
}
158164

159-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
160-
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
165+
copiedStream.Position = 0;
166+
return copiedStream;
167+
}
168+
finally
161169
{
162-
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
170+
jobPlanFile.WriteLock.Release();
163171
}
164-
165-
// Release MMF
166-
jobPlanFile.WriteLock.Release();
167-
copiedStream.Position = 0;
168-
return copiedStream;
169172
}
170173
else
171174
{
@@ -187,20 +190,22 @@ public override async Task<Stream> ReadJobPartPlanFileAsync(
187190
int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize;
188191
Stream copiedStream = new PooledMemoryStream(ArrayPool<byte>.Shared, maxArraySize);
189192

190-
// MMF lock
191193
await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
192-
193-
// Open up MemoryMappedFile
194-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath))
195-
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
194+
try
196195
{
197-
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
196+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath))
197+
using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read))
198+
{
199+
await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false);
200+
}
201+
202+
copiedStream.Position = 0;
203+
return copiedStream;
204+
}
205+
finally
206+
{
207+
jobPartPlanFile.WriteLock.Release();
198208
}
199-
200-
// MMF release
201-
jobPartPlanFile.WriteLock.Release();
202-
copiedStream.Position = 0;
203-
return copiedStream;
204209
}
205210
else
206211
{
@@ -224,18 +229,20 @@ public override async Task WriteToJobPlanFileAsync(
224229
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
225230
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
226231
{
227-
// Lock MMF
228232
await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false);
229-
230-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
231-
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(fileOffset, length, MemoryMappedFileAccess.Write))
233+
try
232234
{
233-
accessor.WriteArray(0, buffer, bufferOffset, length);
234-
accessor.Flush();
235+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
236+
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(fileOffset, length, MemoryMappedFileAccess.Write))
237+
{
238+
accessor.WriteArray(0, buffer, bufferOffset, length);
239+
accessor.Flush();
240+
}
241+
}
242+
finally
243+
{
244+
jobPlanFile.WriteLock.Release();
235245
}
236-
237-
// Release MMF
238-
jobPlanFile.WriteLock.Release();
239246
}
240247
else
241248
{
@@ -306,18 +313,20 @@ public override async Task SetJobTransferStatusAsync(
306313

307314
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
308315
{
309-
// Lock MMF
310316
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
311-
312-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
313-
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
317+
try
314318
{
315-
accessor.Write(0, (int)status.ToJobPlanStatus());
316-
accessor.Flush();
319+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))
320+
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
321+
{
322+
accessor.Write(0, (int)status.ToJobPlanStatus());
323+
accessor.Flush();
324+
}
325+
}
326+
finally
327+
{
328+
jobPlanFile.WriteLock.Release();
317329
}
318-
319-
// Release MMF
320-
jobPlanFile.WriteLock.Release();
321330
}
322331
else
323332
{
@@ -340,18 +349,20 @@ public override async Task SetJobPartTransferStatusAsync(
340349
{
341350
if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile file))
342351
{
343-
// Lock MMF
344352
await file.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
345-
346-
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(file.FilePath, FileMode.Open))
347-
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
353+
try
348354
{
349-
accessor.Write(0, (int)status.ToJobPlanStatus());
350-
accessor.Flush();
355+
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(file.FilePath, FileMode.Open))
356+
using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length))
357+
{
358+
accessor.Write(0, (int)status.ToJobPlanStatus());
359+
accessor.Flush();
360+
}
361+
}
362+
finally
363+
{
364+
file.WriteLock.Release();
351365
}
352-
353-
// Release MMF
354-
file.WriteLock.Release();
355366
}
356367
else
357368
{

0 commit comments

Comments
 (0)