Skip to content

Commit c93bc3c

Browse files
authored
[Storage][DataMovement][Testing] Pause/Resume with Disaster Recovery (Azure#47577)
* initial commit * removing transferManager reference * removed extra space
1 parent 3c12e5e commit c93bc3c

File tree

1 file changed

+340
-3
lines changed

1 file changed

+340
-3
lines changed

sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs

Lines changed: 340 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,12 @@ private async Task AssertResumeTransfer(
102102
MemoryTransferCheckpointer checkpointer,
103103
StepProcessor<TransferJobInternal> jobsProcessor,
104104
StepProcessor<JobPartInternal> partsProcessor,
105-
StepProcessor<Func<Task>> chunksProcessor)
105+
StepProcessor<Func<Task>> chunksProcessor,
106+
DataTransferState initialJobState = DataTransferState.Paused)
106107
{
107108
await Task.Delay(50);
108-
int pausedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
109-
Assert.That(pausedJobsCount, Is.EqualTo(numJobs));
109+
int initialJobStateCount = GetJobsStateCount(resumedTransfers, checkpointer)[initialJobState];
110+
Assert.That(initialJobStateCount, Is.EqualTo(numJobs));
110111

111112
// process jobs on resume
112113
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume");
@@ -136,6 +137,39 @@ private async Task AssertResumeTransfer(
136137
AssertAllJobsAndPartsCompleted(numJobs, numJobParts, resumedTransfers, checkpointer);
137138
}
138139

140+
private (TransferManager TransferManager, StepProcessor<TransferJobInternal> JobProcessor, StepProcessor<JobPartInternal> PartProcessor, StepProcessor<Func<Task>> ChunkProcessor) SimulateDisaster(
141+
MemoryTransferCheckpointer checkpointer,
142+
ref TransferManager transferManager,
143+
ref StepProcessor<TransferJobInternal> jobsProcessor,
144+
ref StepProcessor<JobPartInternal> partsProcessor,
145+
ref StepProcessor<Func<Task>> chunksProcessor)
146+
{
147+
// Remove references from memory
148+
transferManager = null;
149+
jobsProcessor = null;
150+
partsProcessor = null;
151+
chunksProcessor = null;
152+
153+
// Force garbage collection
154+
GC.Collect();
155+
GC.WaitForPendingFinalizers();
156+
GC.Collect();
157+
158+
// Re-create everything
159+
(var jobsProcessor2, var partsProcessor2, var chunksProcessor2) = StepProcessors();
160+
JobBuilder jobBuilder2 = new(ArrayPool<byte>.Shared, default, new ClientDiagnostics(ClientOptions.Default));
161+
List<StorageResourceProvider> resumeProviders2 = new() { new MockStorageResourceProvider(checkpointer) };
162+
TransferManager transferManager2 = new(
163+
jobsProcessor2,
164+
partsProcessor2,
165+
chunksProcessor2,
166+
jobBuilder2,
167+
checkpointer,
168+
resumeProviders2);
169+
170+
return (transferManager2, jobsProcessor2, partsProcessor2, chunksProcessor2);
171+
}
172+
139173
[Test]
140174
[Combinatorial]
141175
public async Task PauseResumeDuringJobProcessing_ItemTransfer(
@@ -1361,6 +1395,309 @@ public async Task MultiplePausesAndResumes_ContainerTransfer(
13611395
await AssertResumeTransfer(numJobs, numJobParts, numChunks, chunksPerPart, resumedTransfers3, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
13621396
}
13631397

1398+
[Test]
1399+
[Combinatorial]
1400+
public async Task MultipleDisasterResumes_ItemTransfer(
1401+
[Values(2, 6)] int items,
1402+
[Values(333, 500, 1024)] int itemSize,
1403+
[Values(333, 1024)] int chunkSize)
1404+
{
1405+
int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize);
1406+
// TODO: below should be only `items * chunksPerPart` but can't in some cases due to
1407+
// a bug in how work items are processed on multipart uploads.
1408+
int numChunks = Math.Max(chunksPerPart - 1, 1) * items;
1409+
1410+
Uri srcUri = new("file:///foo/bar");
1411+
Uri dstUri = new("https://example.com/fizz/buzz");
1412+
1413+
(var jobsProcessor0, var partsProcessor0, var chunksProcessor0) = StepProcessors();
1414+
JobBuilder jobBuilder = new(ArrayPool<byte>.Shared, default, new ClientDiagnostics(ClientOptions.Default));
1415+
MemoryTransferCheckpointer checkpointer = new();
1416+
1417+
var resources = Enumerable.Range(0, items).Select(_ =>
1418+
{
1419+
Mock<StorageResourceItem> srcResource = new(MockBehavior.Strict);
1420+
Mock<StorageResourceItem> dstResource = new(MockBehavior.Strict);
1421+
1422+
(srcResource, dstResource).BasicSetup(srcUri, dstUri, itemSize);
1423+
1424+
return (Source: srcResource, Destination: dstResource);
1425+
}).ToList();
1426+
1427+
List<StorageResourceProvider> resumeProviders = new() { new MockStorageResourceProvider(checkpointer) };
1428+
1429+
TransferManager transferManager0 = new(
1430+
jobsProcessor0,
1431+
partsProcessor0,
1432+
chunksProcessor0,
1433+
jobBuilder,
1434+
checkpointer,
1435+
resumeProviders);
1436+
1437+
List<DataTransfer> transfers = new();
1438+
1439+
// queue jobs
1440+
foreach ((Mock<StorageResourceItem> srcResource, Mock<StorageResourceItem> dstResource) in resources)
1441+
{
1442+
DataTransfer transfer = await transferManager0.StartTransferAsync(
1443+
srcResource.Object,
1444+
dstResource.Object,
1445+
new()
1446+
{
1447+
InitialTransferSize = chunkSize,
1448+
MaximumTransferChunkSize = chunkSize,
1449+
});
1450+
transfers.Add(transfer);
1451+
1452+
// Assert that job plan file is created properly
1453+
Assert.That(checkpointer.Jobs.ContainsKey(transfer.Id), Is.True, "Error during Job plan file creation.");
1454+
Assert.That(checkpointer.Jobs[transfer.Id].Status.State, Is.EqualTo(DataTransferState.Queued), "Error during Job plan file creation.");
1455+
Assert.That(checkpointer.Jobs[transfer.Id].Parts.Count, Is.EqualTo(0), "Job Part files should not exist before job processing");
1456+
}
1457+
Assert.That(checkpointer.Jobs.Count, Is.EqualTo(transfers.Count), "Error during Job plan file creation.");
1458+
Assert.That(jobsProcessor0.ItemsInQueue, Is.EqualTo(items), "Error during initial Job queueing.");
1459+
1460+
// Simulate Disaster #1
1461+
await ((IAsyncDisposable)transferManager0).DisposeAsync();
1462+
(TransferManager transferManager1,
1463+
StepProcessor<TransferJobInternal> jobsProcessor1,
1464+
StepProcessor<JobPartInternal> partsProcessor1,
1465+
StepProcessor<Func<Task>> chunksProcessor1) = SimulateDisaster(
1466+
checkpointer,
1467+
ref transferManager0,
1468+
ref jobsProcessor0,
1469+
ref partsProcessor0,
1470+
ref chunksProcessor0);
1471+
1472+
// Resume transfer after Disaster #1
1473+
List<DataTransfer> resumedTransfers1 = await transferManager1.ResumeAllTransfersAsync(new()
1474+
{
1475+
InitialTransferSize = chunkSize,
1476+
MaximumTransferChunkSize = chunkSize,
1477+
});
1478+
1479+
await Task.Delay(50);
1480+
int queuedJobsCount = GetJobsStateCount(resumedTransfers1, checkpointer)[DataTransferState.Queued];
1481+
Assert.That(queuedJobsCount, Is.EqualTo(items));
1482+
1483+
// process jobs on resume #1
1484+
Assert.That(await jobsProcessor1.StepAll(), Is.EqualTo(items), "Error during job processing");
1485+
1486+
await Task.Delay(50);
1487+
AssertJobProcessingSuccessful(items, resumedTransfers1, checkpointer);
1488+
1489+
// Simulate Disaster #2
1490+
await ((IAsyncDisposable)transferManager1).DisposeAsync();
1491+
(TransferManager transferManager2,
1492+
StepProcessor<TransferJobInternal> jobsProcessor2,
1493+
StepProcessor<JobPartInternal> partsProcessor2,
1494+
StepProcessor<Func<Task>> chunksProcessor2) = SimulateDisaster(
1495+
checkpointer,
1496+
ref transferManager1,
1497+
ref jobsProcessor1,
1498+
ref partsProcessor1,
1499+
ref chunksProcessor1);
1500+
1501+
// Resume transfer after Disaster #2
1502+
List<DataTransfer> resumedTransfers2 = await transferManager2.ResumeAllTransfersAsync();
1503+
1504+
await Task.Delay(50);
1505+
int inProgressJobsCount = GetJobsStateCount(resumedTransfers2, checkpointer)[DataTransferState.InProgress];
1506+
Assert.That(inProgressJobsCount, Is.EqualTo(items));
1507+
1508+
// process jobs on resume #2
1509+
Assert.That(await jobsProcessor2.StepAll(), Is.EqualTo(items), "Error during job processing");
1510+
1511+
await Task.Delay(50);
1512+
AssertJobProcessingSuccessful(items, resumedTransfers2, checkpointer);
1513+
1514+
// process job parts on resume #2
1515+
Assert.That(await partsProcessor2.StepAll(), Is.EqualTo(items), "Error job during part processing");
1516+
1517+
await Task.Delay(50);
1518+
AssertPartProcessingSuccessful(items, resumedTransfers2, checkpointer);
1519+
1520+
// Simulate Disaster #3
1521+
await ((IAsyncDisposable)transferManager2).DisposeAsync();
1522+
(TransferManager transferManager3,
1523+
StepProcessor<TransferJobInternal> jobsProcessor3,
1524+
StepProcessor<JobPartInternal> partsProcessor3,
1525+
StepProcessor<Func<Task>> chunksProcessor3) = SimulateDisaster(
1526+
checkpointer,
1527+
ref transferManager2,
1528+
ref jobsProcessor2,
1529+
ref partsProcessor2,
1530+
ref chunksProcessor2);
1531+
1532+
// Resume transfer after Disaster #3
1533+
List<DataTransfer> resumedTransfers3 = await transferManager3.ResumeAllTransfersAsync();
1534+
1535+
// Finish the resume transfer #3
1536+
await AssertResumeTransfer(
1537+
items,
1538+
items,
1539+
numChunks,
1540+
chunksPerPart,
1541+
resumedTransfers3,
1542+
checkpointer,
1543+
jobsProcessor3,
1544+
partsProcessor3,
1545+
chunksProcessor3,
1546+
DataTransferState.InProgress);
1547+
}
1548+
1549+
[Test]
1550+
[Combinatorial]
1551+
public async Task MultipleDisasterResumes_ContainerTransfer(
1552+
[Values(2, 6)] int numJobs,
1553+
[Values(333, 500, 1024)] int itemSize,
1554+
[Values(333, 1024)] int chunkSize)
1555+
{
1556+
static int GetItemCountFromContainerIndex(int i) => i * 2;
1557+
1558+
int numJobParts = Enumerable.Range(1, numJobs).Select(GetItemCountFromContainerIndex).Sum();
1559+
int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize);
1560+
// TODO: below should be only `items * chunksPerPart` but can't in some cases due to
1561+
// a bug in how work items are processed on multipart uploads.
1562+
int numChunks = Math.Max(chunksPerPart - 1, 1) * numJobParts;
1563+
1564+
Uri srcUri = new("file:///foo/bar");
1565+
Uri dstUri = new("https://example.com/fizz/buzz");
1566+
1567+
(var jobsProcessor0, var partsProcessor0, var chunksProcessor0) = StepProcessors();
1568+
JobBuilder jobBuilder = new(ArrayPool<byte>.Shared, default, new ClientDiagnostics(ClientOptions.Default));
1569+
MemoryTransferCheckpointer checkpointer = new();
1570+
1571+
var resources = Enumerable.Range(1, numJobs).Select(i =>
1572+
{
1573+
Mock<StorageResourceContainer> srcResource = new(MockBehavior.Strict);
1574+
Mock<StorageResourceContainer> dstResource = new(MockBehavior.Strict);
1575+
(srcResource, dstResource).BasicSetup(srcUri, dstUri, GetItemCountFromContainerIndex(i), itemSize);
1576+
return (Source: srcResource, Destination: dstResource);
1577+
}).ToList();
1578+
1579+
List<StorageResourceProvider> resumeProviders = new() { new MockStorageResourceProvider(checkpointer) };
1580+
1581+
TransferManager transferManager0 = new(
1582+
jobsProcessor0,
1583+
partsProcessor0,
1584+
chunksProcessor0,
1585+
jobBuilder,
1586+
checkpointer,
1587+
resumeProviders);
1588+
1589+
List<DataTransfer> transfers = new();
1590+
1591+
// queue jobs
1592+
foreach ((Mock<StorageResourceContainer> srcResource, Mock<StorageResourceContainer> dstResource) in resources)
1593+
{
1594+
DataTransfer transfer = await transferManager0.StartTransferAsync(
1595+
srcResource.Object,
1596+
dstResource.Object,
1597+
new()
1598+
{
1599+
InitialTransferSize = chunkSize,
1600+
MaximumTransferChunkSize = chunkSize,
1601+
});
1602+
transfers.Add(transfer);
1603+
1604+
// Assert that job plan file is created properly
1605+
Assert.That(checkpointer.Jobs.ContainsKey(transfer.Id), Is.True, "Error during Job plan file creation.");
1606+
Assert.That(checkpointer.Jobs[transfer.Id].Status.State, Is.EqualTo(DataTransferState.Queued), "Error during Job plan file creation.");
1607+
Assert.That(checkpointer.Jobs[transfer.Id].Parts.Count, Is.EqualTo(0), "Job Part files should not exist before job processing");
1608+
}
1609+
Assert.That(checkpointer.Jobs.Count, Is.EqualTo(transfers.Count), "Error during Job plan file creation.");
1610+
Assert.That(jobsProcessor0.ItemsInQueue, Is.EqualTo(numJobs), "Error during initial Job queueing.");
1611+
1612+
// Simulate Disaster #1
1613+
await ((IAsyncDisposable)transferManager0).DisposeAsync();
1614+
(TransferManager transferManager1,
1615+
StepProcessor<TransferJobInternal> jobsProcessor1,
1616+
StepProcessor<JobPartInternal> partsProcessor1,
1617+
StepProcessor<Func<Task>> chunksProcessor1) = SimulateDisaster(
1618+
checkpointer,
1619+
ref transferManager0,
1620+
ref jobsProcessor0,
1621+
ref partsProcessor0,
1622+
ref chunksProcessor0);
1623+
1624+
// Resume transfer after Disaster #1
1625+
List<DataTransfer> resumedTransfers1 = await transferManager1.ResumeAllTransfersAsync(new()
1626+
{
1627+
InitialTransferSize = chunkSize,
1628+
MaximumTransferChunkSize = chunkSize,
1629+
});
1630+
1631+
await Task.Delay(50);
1632+
int queuedJobsCount = GetJobsStateCount(resumedTransfers1, checkpointer)[DataTransferState.Queued];
1633+
Assert.That(queuedJobsCount, Is.EqualTo(numJobs));
1634+
1635+
// process jobs on resume #1
1636+
Assert.That(await jobsProcessor1.StepAll(), Is.EqualTo(numJobs), "Error during job processing");
1637+
1638+
await Task.Delay(50);
1639+
AssertJobProcessingSuccessful(numJobs, resumedTransfers1, checkpointer);
1640+
1641+
// Simulate Disaster #2
1642+
await ((IAsyncDisposable)transferManager1).DisposeAsync();
1643+
(TransferManager transferManager2,
1644+
StepProcessor<TransferJobInternal> jobsProcessor2,
1645+
StepProcessor<JobPartInternal> partsProcessor2,
1646+
StepProcessor<Func<Task>> chunksProcessor2) = SimulateDisaster(
1647+
checkpointer,
1648+
ref transferManager1,
1649+
ref jobsProcessor1,
1650+
ref partsProcessor1,
1651+
ref chunksProcessor1);
1652+
1653+
// Resume transfer after Disaster #2
1654+
List<DataTransfer> resumedTransfers2 = await transferManager2.ResumeAllTransfersAsync();
1655+
1656+
await Task.Delay(50);
1657+
int inProgressJobsCount = GetJobsStateCount(resumedTransfers2, checkpointer)[DataTransferState.InProgress];
1658+
Assert.That(inProgressJobsCount, Is.EqualTo(numJobs));
1659+
1660+
// process jobs on resume #2
1661+
Assert.That(await jobsProcessor2.StepAll(), Is.EqualTo(numJobs), "Error during job processing");
1662+
1663+
await Task.Delay(50);
1664+
AssertJobProcessingSuccessful(numJobs, resumedTransfers2, checkpointer);
1665+
1666+
// process job parts on resume #2
1667+
Assert.That(await partsProcessor2.StepAll(), Is.EqualTo(numJobParts), "Error job during part processing");
1668+
1669+
await Task.Delay(50);
1670+
AssertPartProcessingSuccessful(numJobParts, resumedTransfers2, checkpointer);
1671+
1672+
// Simulate Disaster #3
1673+
await ((IAsyncDisposable)transferManager2).DisposeAsync();
1674+
(TransferManager transferManager3,
1675+
StepProcessor<TransferJobInternal> jobsProcessor3,
1676+
StepProcessor<JobPartInternal> partsProcessor3,
1677+
StepProcessor<Func<Task>> chunksProcessor3) = SimulateDisaster(
1678+
checkpointer,
1679+
ref transferManager2,
1680+
ref jobsProcessor2,
1681+
ref partsProcessor2,
1682+
ref chunksProcessor2);
1683+
1684+
// Resume transfer after Disaster #3
1685+
List<DataTransfer> resumedTransfers3 = await transferManager3.ResumeAllTransfersAsync();
1686+
1687+
// Finish the resume transfer #3
1688+
await AssertResumeTransfer(
1689+
numJobs,
1690+
numJobParts,
1691+
numChunks,
1692+
chunksPerPart,
1693+
resumedTransfers3,
1694+
checkpointer,
1695+
jobsProcessor3,
1696+
partsProcessor3,
1697+
chunksProcessor3,
1698+
DataTransferState.InProgress);
1699+
}
1700+
13641701
public enum PauseLocation
13651702
{
13661703
PauseProcessHalfway,

0 commit comments

Comments
 (0)