Skip to content

Commit 2397c0b

Browse files
authored
Update blob-storage-databus-cleanup-function sample to NServiceBus 10 (#7520)
1 parent 49d77af commit 2397c0b

20 files changed

+573
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
Microsoft Visual Studio Solution File, Format Version 12.00
2+
# Visual Studio Version 17
3+
VisualStudioVersion = 17.9.34526.213
4+
MinimumVisualStudioVersion = 15.0.26730.12
5+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SenderAndReceiver", "SenderAndReceiver\SenderAndReceiver.csproj", "{B656A5F2-29C7-43EE-86D9-FEC497962600}"
6+
EndProject
7+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DataBusBlobCleanupFunctions", "DataBusBlobCleanupFunctions\DataBusBlobCleanupFunctions.csproj", "{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}"
8+
EndProject
9+
Global
10+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
11+
Debug|Any CPU = Debug|Any CPU
12+
Release|Any CPU = Release|Any CPU
13+
EndGlobalSection
14+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
15+
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
16+
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Debug|Any CPU.Build.0 = Debug|Any CPU
17+
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Release|Any CPU.ActiveCfg = Release|Any CPU
18+
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Release|Any CPU.Build.0 = Release|Any CPU
19+
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
20+
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Debug|Any CPU.Build.0 = Debug|Any CPU
21+
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Release|Any CPU.ActiveCfg = Release|Any CPU
22+
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Release|Any CPU.Build.0 = Release|Any CPU
23+
EndGlobalSection
24+
GlobalSection(SolutionProperties) = preSolution
25+
HideSolutionNode = FALSE
26+
EndGlobalSection
27+
GlobalSection(ExtensibilityGlobals) = postSolution
28+
SolutionGuid = {F5306B81-00A3-4AEA-8E05-AB387F2542AC}
29+
EndGlobalSection
30+
EndGlobal
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
6+
<LangVersion>preview</LangVersion>
7+
<OutputType>Exe</OutputType>
8+
<ImplicitUsings>enable</ImplicitUsings>
9+
<Nullable>enable</Nullable>
10+
</PropertyGroup>
11+
12+
<ItemGroup>
13+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.*" />
18+
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.*" />
19+
<PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.*" />
20+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.*" />
21+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.*" />
22+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.*" />
23+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.*" />
24+
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.*" />
25+
</ItemGroup>
26+
27+
<ItemGroup>
28+
<None Update="host.json" CopyToOutputDirectory="PreserveNewest" />
29+
<None Update="local.settings.json" CopyToOutputDirectory="PreserveNewest" CopyToPublishDirectory="Never" />
30+
</ItemGroup>
31+
32+
</Project>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.5.002.0
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DataBusBlobCleanupFunctions", "DataBusBlobCleanupFunctions.csproj", "{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {65FDA38E-030A-45D0-99D6-3606931E9935}
24+
EndGlobalSection
25+
EndGlobal
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using Microsoft.Azure.Functions.Worker;
2+
using Microsoft.DurableTask;
3+
using Microsoft.DurableTask.Client;
4+
using Microsoft.Extensions.Logging;
5+
6+
public class DataBusBlobCreated(ILogger<DataBusBlobCreated> logger)
7+
{
8+
#region DataBusBlobCreatedFunction
9+
10+
[Function(nameof(DataBusBlobCreated))]
11+
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")] Stream blob, string name, Uri uri, IDictionary<string, string> metadata, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
12+
{
13+
logger.LogInformation("Blob created at {uri}", uri);
14+
15+
var instanceId = name;
16+
var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);
17+
18+
if (existingInstance != null)
19+
{
20+
logger.LogInformation("{DataBusCleanupOrchestratorName} has already been started for blob {uri}.", DataBusCleanupOrchestratorName, uri);
21+
return;
22+
}
23+
24+
var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(metadata);
25+
26+
if (validUntilUtc == DateTime.MaxValue)
27+
{
28+
logger.LogError("Could not parse the 'ValidUntil' value for blob {uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", uri);
29+
return;
30+
}
31+
32+
await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(DataBusCleanupOrchestratorName, new DataBusBlobData
33+
{
34+
Name = name,
35+
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
36+
},
37+
new StartOrchestrationOptions()
38+
{
39+
InstanceId = instanceId
40+
}, cancellationToken);
41+
}
42+
43+
#endregion
44+
45+
static readonly string DataBusCleanupOrchestratorName = nameof(DataBusCleanupOrchestrator);
46+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
public class DataBusBlobData
2+
{
3+
public required string Name { get; set; }
4+
5+
public required string ValidUntilUtc { get; set; }
6+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+

2+
using System.Globalization;
3+
4+
public static class DataBusBlobTimeoutCalculator
5+
{
6+
#region GetValidUntil
7+
8+
public static DateTime GetValidUntil(IDictionary<string, string> blobMetadata)
9+
{
10+
if (blobMetadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
11+
{
12+
return ToUtcDateTime(validUntilUtcString);
13+
}
14+
15+
return DateTime.MaxValue;
16+
}
17+
18+
#endregion
19+
20+
/// <summary>
21+
/// Converts the <see cref="DateTime" /> to a <see cref="string" /> suitable for transport over the wire.
22+
/// </summary>
23+
public static string ToWireFormattedString(DateTime dateTime)
24+
{
25+
return dateTime.ToUniversalTime().ToString(format, CultureInfo.InvariantCulture);
26+
}
27+
28+
/// <summary>
29+
/// Converts a wire-formatted <see cref="string" /> from <see cref="ToWireFormattedString" /> to a UTC
30+
/// <see cref="DateTime" />.
31+
/// </summary>
32+
public static DateTime ToUtcDateTime(string wireFormattedString)
33+
{
34+
if (string.IsNullOrWhiteSpace(wireFormattedString))
35+
{
36+
throw new ArgumentNullException(nameof(wireFormattedString));
37+
}
38+
39+
if (wireFormattedString.Length != format.Length)
40+
{
41+
throw new FormatException(errorMessage);
42+
}
43+
44+
var year = 0;
45+
var month = 0;
46+
var day = 0;
47+
var hour = 0;
48+
var minute = 0;
49+
var second = 0;
50+
var microSecond = 0;
51+
52+
for (var i = 0; i < format.Length; i++)
53+
{
54+
var digit = wireFormattedString[i];
55+
56+
switch (format[i])
57+
{
58+
case 'y':
59+
Guard(digit);
60+
year = year * 10 + (digit - '0');
61+
break;
62+
63+
case 'M':
64+
Guard(digit);
65+
month = month * 10 + (digit - '0');
66+
break;
67+
68+
case 'd':
69+
Guard(digit);
70+
day = day * 10 + (digit - '0');
71+
break;
72+
73+
case 'H':
74+
Guard(digit);
75+
hour = hour * 10 + (digit - '0');
76+
break;
77+
78+
case 'm':
79+
Guard(digit);
80+
minute = minute * 10 + (digit - '0');
81+
break;
82+
83+
case 's':
84+
Guard(digit);
85+
second = second * 10 + (digit - '0');
86+
break;
87+
88+
case 'f':
89+
Guard(digit);
90+
microSecond = microSecond * 10 + (digit - '0');
91+
break;
92+
}
93+
}
94+
95+
return AddMicroseconds(new DateTime(year, month, day, hour, minute, second, DateTimeKind.Utc), microSecond);
96+
97+
void Guard(char digit)
98+
{
99+
if (digit >= '0' && digit <= '9')
100+
{
101+
return;
102+
}
103+
104+
throw new FormatException(errorMessage);
105+
}
106+
107+
DateTime AddMicroseconds(DateTime self, int microseconds)
108+
{
109+
return self.AddTicks(microseconds * ticksPerMicrosecond);
110+
}
111+
}
112+
113+
const string format = "yyyy-MM-dd HH:mm:ss:ffffff Z";
114+
const string errorMessage = "String was not recognized as a valid DateTime.";
115+
const int ticksPerMicrosecond = 10;
116+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#nullable disable
2+
3+
using Microsoft.Azure.Functions.Worker;
4+
using Microsoft.DurableTask;
5+
using Microsoft.Extensions.Logging;
6+
7+
public class DataBusCleanupOrchestrator(ILogger<DataBusCleanupOrchestrator> logger)
8+
{
9+
#region DataBusCleanupOrchestratorFunction
10+
11+
[Function(nameof(DataBusCleanupOrchestrator))]
12+
public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context)
13+
{
14+
var blobData = context.GetInput<DataBusBlobData>();
15+
16+
logger.LogInformation("Orchestrating deletion for blob at {name} with ValidUntilUtc of {validUntilUtc}", blobData.Name, blobData.ValidUntilUtc);
17+
18+
var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);
19+
20+
DateTime timeoutUntil;
21+
22+
//Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
23+
do
24+
{
25+
timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;
26+
27+
logger.LogInformation("Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Name}. Currently {context.CurrentUtcDateTime}.", timeoutUntil, validUntilUtc, blobData.Name, context.CurrentUtcDateTime);
28+
29+
await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
30+
} while (validUntilUtc > timeoutUntil);
31+
32+
await context.CallActivityAsync("DeleteBlob", blobData);
33+
}
34+
35+
#endregion
36+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using System.Net;
2+
using Azure.Storage.Blobs;
3+
using Azure.Storage.Blobs.Models;
4+
using Microsoft.AspNetCore.Http;
5+
using Microsoft.AspNetCore.Mvc;
6+
using Microsoft.Azure.Functions.Worker;
7+
using Microsoft.DurableTask;
8+
using Microsoft.DurableTask.Client;
9+
using Microsoft.Extensions.Logging;
10+
11+
public class DataBusOrchestrateExistingBlobs(BlobContainerClient blobContainerClient, ILogger<DataBusOrchestrateExistingBlobs> logger)
12+
{
13+
#region DataBusOrchestrateExistingBlobsFunction
14+
15+
[Function(nameof(DataBusOrchestrateExistingBlobs))]
16+
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
17+
{
18+
var counter = 0;
19+
20+
try
21+
{
22+
var segment = blobContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, cancellationToken: cancellationToken).AsPages();
23+
24+
await foreach (var blobPage in segment)
25+
{
26+
foreach (var blobItem in blobPage.Values)
27+
{
28+
var instanceId = blobItem.Name;
29+
30+
var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);
31+
32+
if (existingInstance != null)
33+
{
34+
logger.LogInformation("{name} has already been started for blob {blobItemName}.", nameof(DataBusCleanupOrchestrator), blobItem.Name);
35+
continue;
36+
}
37+
38+
var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blobItem.Metadata);
39+
40+
if (validUntilUtc == DateTime.MaxValue)
41+
{
42+
logger.LogError("Could not parse the 'ValidUntilUtc' value for blob {name}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", blobItem.Name);
43+
continue;
44+
}
45+
46+
await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(nameof(DataBusCleanupOrchestrator), new DataBusBlobData
47+
{
48+
Name = blobItem.Name,
49+
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
50+
},
51+
new StartOrchestrationOptions()
52+
{
53+
InstanceId = instanceId
54+
}, cancellationToken);
55+
56+
counter++;
57+
}
58+
}
59+
}
60+
catch (Exception exception)
61+
{
62+
var result = new ObjectResult(exception.Message)
63+
{
64+
StatusCode = (int)HttpStatusCode.InternalServerError
65+
};
66+
67+
return result;
68+
}
69+
70+
var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);
71+
72+
return new OkObjectResult(message);
73+
}
74+
75+
#endregion
76+
}

0 commit comments

Comments
 (0)